最开始 pekko 本身是起源于 akka, 因为 akka 转为商业许可而演变出来的开源替代版本,
两者在早期的 abi 基本上是类似, 而后续 akka 追加不少商业的功能支持.
akka搭建服务需要买授权码, 为了避免商业纠纷问题推荐采用
pekko学习
两者在使用中没什么差异, 更多差异来源于本身的 类型系统 差别: actor 和 actor-typed
最明显的就是 pekko 当中的声明 actor 不同的抽象定义:
// 弱类型定义类
class Worker extends AbstractActor{
// do something
}
// 强类型泛型类
class Worker extends AbstractBehavior<T>{
// do something
}
AbstractActor: 基于弱类型的Actor抽象. 允许接收任何类型的消息, 在处理消息时需要通过模式匹配或其他方式来判断消息的具体类型并进行相应的处理. 这种方式在编译时无法对消息类型进行检查, 可能导致运行时出现类型不匹配的错误.AbstractBehavior<T>:基于强类型的Actor抽象. 它通过类型参数<T>明确指定了Actor所能接收的消息类型并在编译时会进行严格的类型检查, 确保只有正确类型的消息才能发送给Actor从而提高了代码的稳定性和可维护性.
AbstractActor 方式内部本身采用 Object 装载传输消息, 在编译的时候没办法做类型检查(毕竟最后数据都用 Object 包装),
现在在网上大部分教程都是以这种方式构建 actor.
AbstractBehavior<T> 方式更加现代化, 可以在通过类型系统在编译的时候就知道通过 T 做错误拦截和功能限制.
这两者没有什么优劣之分, 弱类型的 Actor 定义可以无视消息结构对象从而更加灵活(但是需要开发者能够敏锐知道传递对象异常),
强类型定义的 Actor 能够提供更加健壮规范的消息传递消息设计(多人协作的时候能够严格要求消息的类型防止乱传参数).
另外这两者的所有函数方法也是完全完全不一样的, 能够看到有时候使用 tell(Message(),NoSender()) 这种方式推送消息,
但是用强类型的 Actor 调用的时候发现没有该调用方法.
这里需要多翻下官方文档, pekko 的版本接口变动也是很频繁的, 甚至选用 scala 版本也带有差异
后续这里主要讲解的是强类型模式, 因为网上大部分都是采用弱类型处理所以这里仅仅补充后续进阶的处理.
我个人多次实践之后, 在强类型
Actor之中最好是对所有权(ownership)有具体概念, 对于声明的类型最好是你操作的数据结构体
弱类型 Actor
这里提供个早期弱类型版本的 pekko 声明定义:
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Cancellable;
import org.apache.pekko.actor.Scheduler;
import java.time.Duration;
/**
* 弱类型Actor类声明
* <p/>
* <pre>
* // 构建 ActorSystem 系统
* ActorSystem system = ActorSystem.create("Workers");
* system.actorOf(Props.create(ExampleWorker.class), "ExampleWorker");
* </pre>
*/
public class ExampleWorker extends AbstractActor {
/**
* Actor 定时调度器
*/
final Scheduler scheduler = getContext().getSystem().scheduler();
/**
* Actor 定时执行器
*/
Cancellable timer = null;
/**
* 内部定义事件结构
*/
interface Events {
}
/**
* Java17后续版本的 record 数据类, 内部数值必须依赖外部传入的 final
*/
public record GetTimestampEvent(
Long timestamp
) implements Events {
}
/**
* 字符串消息事件
*
* @param text
*/
public record GetTextEvent(String text) implements Events {
}
/**
* Actor启动之后回调
*/
@Override
public void preStart() throws Exception {
super.preStart();
// 启动周期性定时器不断推送时间
// 初始延迟 5 秒,之后每 5 秒发送一次消息
timer = scheduler.schedule(
Duration.ofSeconds(5),// 初始延迟
Duration.ofSeconds(5),// 周期延迟
self(),// ActorRef 地址
// 下面开始就是执行类似 tell(....) 的方法
"Hello", // 推送消息结构
getContext().getDispatcher(), // 获取 Actor 调度器, 其实就是线程池调度器
ActorRef.noSender() // 匿名来源
);
}
/**
* Actor 关闭时候回调
*/
@Override
public void postStop() throws Exception {
super.postStop();
// 关闭定时器
if (timer != null && !timer.isCancelled()) {
timer.cancel();
}
}
/**
* Actor消息拦截器回调
*/
@Override
public Receive createReceive() {
return receiveBuilder()
// match 拦截匹配对应消息类型结构
// 弱类型的匹配比较特殊, 只需要回调 Function<消息结构(传入),void(传出)>
.match(GetTimestampEvent.class, this::getTimestampEvent)
// match 支持 hook 条件匹配执行, 用于精细化条件处理
// 后续方法都支持这种条件匹配的
// 下面就是对消息再处理, 只有通过这个条件才允许被 actor 转发
.match(GetTextEvent.class, msg -> !msg.text.isBlank(), this::getTextEvent)
// 对实例化的消息结构进行 Equals 匹配功能
// 实际上就是获取消息 Object, 之后 "Hello".equals(object) 比较匹配
.matchEquals("Hello", this::getHelloEvent)
// 数据不做检查的匹配
// 注意匹配之后传入的其实就是原生弱类型 Object 对象
.matchUnchecked(GetTextEvent.class, (obj) -> {
// 消息执
if (obj instanceof GetTextEvent instance) {
// instance.text;
}
})
// matchAny 就类似默认去拦截不匹配的所有消息
// 其他消息默认调用 unhandled 代表消息不清楚怎么处理, unhandled 是上层自带方法
.matchAny(this::unhandled)
.build();
}
private void getTimestampEvent(GetTimestampEvent event) {
// Actor 推送消息结构
final var self = getSelf();
// 第二个参数代表消息发送者引用, ActorRef.noSender() 就是代表匿名发送者不需要做响应
// 如果设置设置 ActorRef.noSender() 就是希望对象 Actor 无须回复结果结构
// 比较常用的场景就是需要推送不需要等执行回复消息就直接设置 ActorRef.noSender()
// 如果需要执行之后异步回调的话, 就需要把发送者的 ActorRef 发送过去等待 Actor 通知回复
self.tell(new GetTextEvent(event.timestamp.toString()), ActorRef.noSender());
// 这里就是获取消息传递过来的地址
final var sender = getSender();
sender.tell(new GetTextEvent(event.timestamp.toString()), ActorRef.noSender());
}
private void getTextEvent(GetTextEvent event) {
final var ctx = getContext();
final var self = getSelf();
// 这里就是消息转发功能
// forward(...) 和 tell(..., ActorRef.noSender()) 差别很大
// forward 会把原生的 sender 保持不变打包发送给其他 ActorContext
// tell(..., ActorRef.noSender()) 会将 getSender() 的方法改写成转发 Actor 而非源 Actor
self.forward(new GetTextEvent(event.text), ctx);
}
private void getHelloEvent(String text) {
}
}
通过上面的 matchUnchecked 非检查拦截就知道全部都依赖 Object 装载,
也就是只要满足包装成 Object 就能通过消息拦截, 但是没办法从编译的时刻识别异常:
tell(1000L);// 也就是意味着随便消息会被装箱成 Object 推送
tell("test");// 无论怎么样 Actor 都必须接收
tell("b");// actor 没有方法拒绝消息发送到指定队列
如果规范一点只允许某个消息结构推送的情况其实都没差别, 但是项目多人编写不对消息规划处理的时候就很容易推送乱七八糟的数据过来且没办法做强制类型匹配.
强类型 Actor
对于强类型 Actor 这是基本上变动挺大, 而且对类型做出更加严谨规划:
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.PostStop;
import org.apache.pekko.actor.typed.PreRestart;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.Receive;
import java.util.function.Function;
/**
* 强类型 Actor 功能
* <p/>
* <pre>
* var actorSystem = ActorSystem.create(ExampleTypedWorker::create(),"example-typed");
* // 信号类型不是自定义强类型, 需要不检查类型推送消息
* actorSystem.unsafeUpcast().tell(PreRestart.class);
* </pre>
*
* @param <T>
*/
public class ExampleTypedWorker<T> extends AbstractBehavior<ExampleTypedWorker.Events<T>> {
/**
* 时间消息结构
*
* @param <D>
*/
public interface Events<D> {
D data();
}
/**
* 获取时间戳事件
*
* @param timestamp
*/
public record GetTimestampEvent(
Long timestamp
) implements Events<Long> {
@Override
public Long data() {
return timestamp;
}
}
/**
* 必须引入构造方法
* 强类型Actor现在必须定一个带有 ActorContext 传入对象的
* 注意:这里最好构建成 private 用于内部实例化
*
* @param context Actor上下文
*/
private ExampleTypedWorker(ActorContext<Events<T>> context) {
super(context);
}
/**
* 采用静态实例化返回, 可以有效防止外部进行手动实例化
*/
public static <T> Behavior<Events<T>> create() {
return Behaviors.setup(ExampleTypedWorker::new);
}
/**
* 静态附带动态多个参数实例化
* 默认 Behaviors.setup 第二种实例化方法, 用于构造实例化的时候的参数签名不和继承的构造方法一致
* <p/>
* <pre>
* // 动态参数, 在原来基础上附带了其他传入参数
* ExampleTypedWorker(ActorContext<Events<T>> context,Long time){
* }
* // Behaviors.setup 支持传入回调方法作为动态构造, 只要求最后返回的 Behavior<Events<T> 对象
* Behaviors.setup((ctx)->new ExampleTypedWorker(ctx,System.currentTimeMillis()))
* </pre>
*/
public static <T> Behavior<Events<T>> create(Function<ActorContext<Events<T>>, Behavior<Events<T>>> creator) {
return Behaviors.setup(creator::apply);
}
/**
* 新的消息拦截回调方法
*/
@Override
public Receive<Events<T>> createReceive() {
return newReceiveBuilder()
// 强类型的 Actor 摒弃了原来回调函数的 关闭|启动 回调方法, 改由信号机制处理
// 采用信号机制推送内部定义 PostStop.class 信号代表关闭回调处理
// 需要注意内部定义只有 重启 和 关闭 信号, 其他可以自己去定义
.onSignal(PostStop.class, signal -> {
System.out.println("Actor stoped");
return this;
})
// 拦截重启信号回调
.onSignal(PreRestart.class, signal -> {
System.out.println("Actor restart");
return this;
})
// 这里会异常报错, 提示类型没有匹配
//.onMessage(GetTimestampEvent.class,event->{
// return this;
//})
// 其他需要额外说明
.build();
}
}
强类型的 Actor 上面的例子和弱类型差别很大, 基本上最大差异如下:
依赖类型所有权: 消息传递现在必须按照规定泛型传递新增信号处理: 信号处理更像是之前的弱类型包装, 用来处理内部独有不依赖强类型的消息传递移除启动|关闭回调: 后续的关闭|重启都是依赖消息拦截注入到Actor里面追加 Behaviors.setup 装载: 强类型的Actor实例化都需要通过注册Behavior才能被挂载依赖构造现在需要 ActorContext<?> 上下文: 通过Behaviors装载的Actor构造方法必须接收上下文对象异步返回:强类型不再采用Sender返回地址, 而是采用内部封装getContext().ask()实现异步返回大量兼容 unchecked 方法和功能: 用于某些消息结构需要不做类型检查的消息推送和处理场景定时器机制不变: 定时器处理方法已经保持原样消息回调机制强依赖定义的所有权类型对象:onMessage不能再沿用弱类型Actor的定义回调, 而是需要依赖传入的类型从而处理消息- ……
还有大量底层变动没有说明, 但是从上面也能看出强类型突出 严谨可控 的特性.
这里还需要说明下, 后续的消息类型不要设置为泛型, 就像上面那种情况:
/**
* 没办法做消息匹配的 Actor 类
* 如果继承强类型的实现, 最好类型不含泛型声明, 否则不好做类型匹配
*/
public class ExampleTypedWorker<T> extends AbstractBehavior<ExampleTypedWorker.Events<T>> {
/**
* 时间消息结构
* 注意: 不要把泛型直接应用在 AbstractBehavior 上
* @param <D>
*/
public interface Events<D> {
D data();
}
/**
* 获取时间戳事件
* 注意: 哪怕这里继承重写强类型在做 onMessage 匹配也会异常
* 所以没有必要的话最好直接定义成衍生虚类或者不含泛型接口
* @param timestamp
*/
public record GetTimestampEvent(
Long timestamp
) implements Events<Long> {
@Override
public Long data() {
return timestamp;
}
}
}
所以这里改进下采用抽象类或者抽象接口处理就行了:
import org.apache.pekko.actor.typed.*;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.Receive;
import java.util.function.Function;
/**
* 强类型 Actor 功能
* <p/>
* <pre>
* var actorSystem = ActorSystem.create(ExampleTypedWorker::create(),"example-typed");
* // 信号类型不是自定义强类型, 需要不检查类型推送消息
* actorSystem.unsafeUpcast().tell(PreRestart.class);
* </pre>
*/
public class ExampleTypedWorker extends AbstractBehavior<ExampleTypedWorker.AbstractEvent> {
/**
* 时间消息结构
*/
public interface Event {
Long timestamp();
}
/**
* 让其他功能继承的抽象类实现
*/
public static abstract class AbstractEvent implements Event {
/**
* 实例化的时候初始化的构建时间戳
*/
protected final Long timestamp = System.currentTimeMillis();
/**
* 默认返回实例化时间
*/
@Override
public Long timestamp() {
return timestamp;
}
}
/**
* 获取时间戳事件
* 继承抽象的事件对象
*/
public static class PrintTimestampEvent extends AbstractEvent {
}
/**
* 必须引入构造方法
* 强类型Actor现在必须定一个带有 ActorContext 传入对象的
* 注意:这里最好构建成 private 用于内部实例化
*
* @param context Actor上下文
*/
private ExampleTypedWorker(ActorContext<AbstractEvent> context) {
super(context);
}
/**
* 采用静态实例化返回, 可以有效防止外部进行手动实例化
*/
public static Behavior<AbstractEvent> create() {
return Behaviors.setup(ExampleTypedWorker::new);
}
/**
* 静态附带动态多个参数实例化
* 默认 Behaviors.setup 第二种实例化方法, 用于构造实例化的时候的参数签名不和继承的构造方法一致
* <p/>
* <pre>
* // 动态参数, 在原来基础上附带了其他传入参数
* ExampleTypedWorker(ActorContext<Events<T>> context,Long time){
* }
* // Behaviors.setup 支持传入回调方法作为动态构造, 只要求最后返回的 Behavior<Events<T> 对象
* Behaviors.setup((ctx)->new ExampleTypedWorker(ctx,System.currentTimeMillis()))
* </pre>
*/
public static Behavior<AbstractEvent> create(Function<ActorContext<AbstractEvent>, Behavior<AbstractEvent>> creator) {
return Behaviors.setup(creator::apply);
}
/**
* 新的消息拦截回调方法
*/
@Override
public Receive<AbstractEvent> createReceive() {
return newReceiveBuilder()
// 强类型的 Actor 摒弃了原来回调函数的 关闭|启动 回调方法, 改由信号机制处理
// 采用信号机制推送内部定义 PostStop.class 信号代表关闭回调处理
// 需要注意内部定义只有 重启 和 关闭 信号, 其他可以自己去定义
.onSignal(PostStop.class, signal -> {
System.out.println("Actor stoped");
return this;
})
// 拦截重启信号回调
.onSignal(PreRestart.class, signal -> {
System.out.println("Actor restart");
return this;
})
// 匹配拦截抽象对象消息
.onMessage(PrintTimestampEvent.class, this::printTimestampEvent)
.build();
}
/**
* 消息拦截回调
*/
private Behavior<AbstractEvent> printTimestampEvent(PrintTimestampEvent event) {
System.out.println("Get timestamp = " + event.timestamp());
return this;
}
}
至此基本上就能可以满足一些简单 Actor 业务, 但是 Actor 主要需要解决的还有 状态维护.
状态维护
之前看到大部分功能都是采用无状态消息推送, 但是有些功能是要有维护状态保持的; 特别是游戏功能基本上都要做大量状态保持, 比如玩家购买道具之后需要获知购物结果从而执行流程.
这种模式之下需要用到 请求 - 响应 的数据同步流程, 这里说个最常用的流程:
# 1. 玩家发起购买请求
# 2. 服务端确认目前金额时候满足扣除
# 3. 需要获取玩家的 [执行结果, 变动值, 结算金额] 推送到专门日志记录
# 4. 确认扣除成功的时候需要推送奖励数据给来源 ActorRef, 让其去通知结算结果
这种请求流程基本上是一体流程, 也就是整个流程都是按照顺序流程执行,
一般来说很多耗时间的操作都是分开在不同 Actor(比如玩家操作,物品流水,订单充值日志等),
这种情况下有时候就需要把多个 Actor 操作串连起来获取处理结果.
网络情况是很复杂的, 很多时候会出现延迟或者系统扣除这种操作, 如果不进行数据校正会导致本不应该操作的数据变得可以操作
这里就需要支持异步数据处理, 让多个 Actor 的执行方式运行在同一时刻,
并且还需要区分出 弱类型 和 强类型 的异步差别.
这里先说明下 弱类型Actor 需要 org.apache.pekko.pattern.Patterns 内部功能来实现异步操作:
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.pattern.Patterns;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
/**
* 弱类型Actor类声明
* <p/>
* <pre>
* // 构建 ActorSystem 系统
* ActorSystem system = ActorSystem.create("Workers");
* system.actorOf(Props.create(ExampleWorker.class), "ExampleWorker");
* </pre>
*/
public class ExampleWorker extends AbstractActor {
/**
* 内部定义事件结构
*/
interface Events {
}
/**
* 请求合并字符串
*/
public static class MergeStringsRequestEvent implements Events {
final String[] strings;
public MergeStringsRequestEvent(String[] strings) {
this.strings = strings;
}
}
/**
* 获取最后合并字符串结果
*/
public static class MergeStringsResponseEvent implements Events {
final String result;
public MergeStringsResponseEvent(String result) {
this.result = result;
}
}
/**
* Actor消息拦截器回调
*/
@Override
public Receive createReceive() {
return receiveBuilder()
// 匹配合并消息的处理
.match(MergeStringsRequestEvent.class, this::mergeStringsRequest)
// 测试用于匹配测试获取异步结果执行
// 只需要传入String数组就会被执行
.match(String[].class, strings -> strings.length > 1, (strings) -> {
// 获取目标 ActorRef, 我们这里是本地所以直接采用 self 代表发送给我们自己的 actor
final ActorRef self = getSelf();
// 推送响应的超时时间, 防止异常超时
final Duration duration = Duration.ofSeconds(5);
// 执行异步获取内容
CompletionStage<Object> stage = Patterns.ask(
self, // 请求地址
new MergeStringsRequestEvent(strings), // 推送消息
duration // 超时时间
);
// 按照异步结果分发处理, 需要判断转化成结果
CompletionStage<MergeStringsResponseEvent> future = stage.thenApply(response -> {
// 确认类型正确
if (response instanceof MergeStringsResponseEvent) {
return (MergeStringsResponseEvent) response;
} else {
throw new IllegalArgumentException("Unexpected response type: " + response.getClass());
}
});
// 最后得出响应直接处理
future.thenAccept(mergeStringsResponseEvent -> {
// 响应结果获取
System.out.println("Merge strings result: " + mergeStringsResponseEvent.result);
}).exceptionally((throwable) -> {
// 异常处理
System.err.println("Merge Strings failed: " + throwable.getMessage());
return null;
});
// 上面两个处理方式其实可以合并一起, 但是为了让人看到处理流程而分开说明
})
// matchAny 就类似默认去拦截不匹配的所有消息
// 其他消息默认调用 unhandled 代表消息不清楚怎么处理, unhandled 是上层自带方法
.matchAny(this::unhandled)
.build();
}
/**
* 获取数据合并字符串命令
*/
public void mergeStringsRequest(MergeStringsRequestEvent event) {
StringBuilder builder = new StringBuilder(event.strings.length);
for (String string : event.strings) {
builder.append(string);
}
String result = builder.toString();
// 返回响应, 一般来说 self 作为返回地址可以由消息结果推送而来
// 比如 MergeStringsRequestEvent 定义 ActorRef 成员, 用来给返回远程 Actor 地址
final ActorRef self = getSelf(); // 作为来源者推送的 ActorRef 地址
final ActorRef sender = getSender(); // 远程请求推送者 ActorRef 地址
sender.tell(new MergeStringsResponseEvent(result), self);
}
}
弱类型 Actor 都挺容易的, 主要本身也没有强类型限制这么多, 所以可以直接采用 Object 装载容器.
getSelf()和getSender()获取的都是ActorRef, 但是因为集群问题导致有本地 Actor 和远程 Actor 地址差异
而对于强类型来说就比较复杂一点, 需要协调类型处理且可以不需要 Patterns 类相关:
import org.apache.pekko.actor.typed.*;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.Receive;
import org.apache.pekko.pattern.StatusReply;
import org.slf4j.Logger;
import java.time.Duration;
import java.util.function.Function;
/**
* 强类型 Actor 功能
* <p/>
* <pre>
* var actorSystem = ActorSystem.create(ExampleTypedWorker::create(),"example-typed");
* // 信号类型不是自定义强类型, 需要不检查类型推送消息
* actorSystem.unsafeUpcast().tell(PreRestart.class);
* </pre>
*/
public class ExampleTypedWorker extends AbstractBehavior<ExampleTypedWorker.AbstractEvent> {
/**
* 时间消息结构
*/
public interface Event {
Long timestamp();
}
/**
* 让其他功能继承的抽象类实现
*/
public static abstract class AbstractEvent implements Event {
/**
* 实例化的时候初始化的构建时间戳
*/
protected final Long timestamp = System.currentTimeMillis();
/**
* 默认返回实例化时间
*/
@Override
public Long timestamp() {
return timestamp;
}
}
/**
* 请求计算斐波那契数列
* <p/>
* <pre>
* // 计算公式
* private int calculateFibonacci(int n) {
* if (n <= 0) return 0;
* if (n == 1) return 1;
* return calculateFibonacci(n - 1) + calculateFibonacci(n - 2);
* }
* </pre>
*/
public static class FibonacciRequestEvent extends AbstractEvent {
/**
* 计算结构响应的 actor 地址
* <p/>
* <pre>
* ActorRef<StatusReply<?>> 是强类型用于响应异步操作的地址句柄
* </pre>
*/
final ActorRef<StatusReply<FibonacciResponseEvent>> reply;
final Integer number;
public FibonacciRequestEvent(ActorRef<StatusReply<FibonacciResponseEvent>> reply, Integer number) {
this.reply = reply;
this.number = number;
}
}
/**
* 最后斐波那契数列计算的结果
*/
public static class FibonacciResponseEvent extends AbstractEvent {
final Integer number;
public FibonacciResponseEvent(Integer number) {
this.number = number;
}
/**
* 计算公式
*/
public static int CalculateFibonacci(int number) {
if (number <= 0) return 0;
if (number == 1) return 1;
return CalculateFibonacci(number - 1) + CalculateFibonacci(number - 2);
}
}
/**
* 推送测试计算信号
*
* @param number
*/
public record FibonacciSignal(
int number
) implements Signal {
}
/**
* 必须引入构造方法
* 强类型Actor现在必须定一个带有 ActorContext 传入对象的
* 注意:这里最好构建成 private 用于内部实例化
*
* @param context Actor上下文
*/
private ExampleTypedWorker(ActorContext<AbstractEvent> context) {
super(context);
}
/**
* 采用静态实例化返回, 可以有效防止外部进行手动实例化
*/
public static Behavior<AbstractEvent> create() {
return Behaviors.setup(ExampleTypedWorker::new);
}
/**
* 静态附带动态多个参数实例化
* 默认 Behaviors.setup 第二种实例化方法, 用于构造实例化的时候的参数签名不和继承的构造方法一致
* <p/>
* <pre>
* // 动态参数, 在原来基础上附带了其他传入参数
* ExampleTypedWorker(ActorContext<Events<T>> context,Long time){
* }
* // Behaviors.setup 支持传入回调方法作为动态构造, 只要求最后返回的 Behavior<Events<T> 对象
* Behaviors.setup((ctx)->new ExampleTypedWorker(ctx,System.currentTimeMillis()))
* </pre>
*/
public static Behavior<AbstractEvent> create(Function<ActorContext<AbstractEvent>, Behavior<AbstractEvent>> creator) {
return Behaviors.setup(creator::apply);
}
/**
* 新的消息拦截回调方法
*/
@Override
public Receive<AbstractEvent> createReceive() {
return newReceiveBuilder()
// 拦截计算信号
.onMessage(FibonacciRequestEvent.class, this::fibonacciRequestEvent)
// 推送计算信号
.onSignal(FibonacciSignal.class, (signal) -> {
// 拦截前置条件
return signal.number > 0;
}, (signal) -> {
// 获取运行时和构建配置
final ActorContext<AbstractEvent> ctx = getContext();
final ActorRef<AbstractEvent> self = ctx.getSelf();
final Logger logger = ctx.getLog();
final Duration timeout = Duration.ofSeconds(5);// 5s超时
// 构建异步执行, 这里
ctx.askWithStatus(
FibonacciResponseEvent.class, // 声明返回的类型
self, // 声明任务发起者的 ActorRef 地址
timeout, // 声明任务超时时间
// 请求指令发送时候的 hook, ref 是默认强类型附带的响应 ActorRef 地址
(ActorRef<StatusReply<FibonacciResponseEvent>> ref) -> {
// 这里就是和弱类型区别最大的地方
// 这个函数主要回调返回独立对应结果响应地址, 用于接收者拿到之后按照地址返回响应
return new FibonacciRequestEvent(ref, signal.number);
},
// 响应回调, 主要的拦截处理在此内部
(response, throwable) -> {
if (response != null) {
// 参数正确直接返回处理
// 这里可以做些回调参数处理
return response;
} else {
// 转化异常
logger.error(throwable.getMessage());
return null;
}
}
);
return Behaviors.same();
})
.onAnyMessage(event -> Behaviors.same())
.build();
}
/**
* 计算功能回调
*/
private Behavior<AbstractEvent> fibonacciRequestEvent(FibonacciRequestEvent event) {
// 计算得出最终结果
int resultNumber = FibonacciResponseEvent.CalculateFibonacci(event.number);
var result = new FibonacciResponseEvent(resultNumber);
// 这里就需要返回响应
final var ctx = getContext();
// 因为是本机所以只需要发给自己的 actor
// 如果是居群或者构建其他子母 actor 地址需要传入 reply 声明地址
// StatusReply.success | StatusReply.error 就是强类型响应的包装结构体
event.reply.tell(StatusReply.success(result));
// 保持状态机状态
return Behaviors.same();
}
}
具体官方文档参见: interaction-patterns
而对于双向消息传递最好遵循以下规则:
消息不可变性:确保消息类为不可变对象(如使用record或final字段),避免多线程环境下的状态不一致。单一职责:Actor专注于处理单一类型的请求, 复杂业务可通过Actor层级子母级别Actor结构拆分日志与监控:在Actor中添加日志记录关键操作(如请求接收、响应发送), 便于调试和故障排查
pekko|akka 早期的双向操作会阻塞任务线程, 而最新版本则修复这些问题改由内部异步响应操作.
另外 pekko|akka 还有更加高级的 流转发(Stream) 概念, 用来支持更加高效的数据传输.
Stream 处理
Stream 主要用于构建响应式流处理应用, 支持高吞吐量、低延迟的数据处理,
适合处理大规模、流式的数据(如实时日志、事件流、微服务消息等)
一般在 Java 之中都会把响应式流命名为
Flow
按照响应式程序来说有以下启动流程:
Source:数据流的起点(如文件、网络套接字、集合、Actor 等)Flow:数据处理阶段(可链式组合,如过滤map、映射filter、聚合aggregate 等)Sink:数据流的终点(如打印到控制台、写入数据库、发送到 Actor 等)
这是目前常见的封装模式, 其实内部有些特性对于游戏服务端来说提升可能比较小; 因为本质上游戏服务端基本上都是挂载在同个进程数据交换玩家数据降低延迟, 同时服务端会尽可能压缩数据包传输导致每个消息包都很小.
这里涉及到大数据相关业务流程, 可能会在另外篇章说明; 基本上了解基础 Actor, 后续扩展就是集群功能的情况.
Actor 管理
静态 Actor 能满足日常简单业务处理, 但是有时候想要动态分配 子Acotr 让其挂载业务,
最常用的功能就是 Socket 是要分裂 子Actor 负责同时监听, 否则在相同 Actor 会导致消息功能全部积压在相同 Actor.
官方提供了怎么去辅助构建 Actor 生命周期并且委托管理器监控, 可以按照官方说明的基础之后改动:
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.PostStop;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.Receive;
/**
* Actor程序管理器
* 注: 创建和关闭不要让外部能够操作到, 实在需要动态关闭启动是要让 Actor 集成消息拦截处理
*/
public class ProgramManager extends AbstractBehavior<ProgramManager.Command> {
/**
* 构造方法
*
* @param context Actor 上下文
*/
public ProgramManager(ActorContext<Command> context) {
super(context);
}
/**
* 强类型类型
*/
public interface Command {
}
/**
* 自定义的子Actor进程停止信号
*/
public record JobDone(String name) {
}
/**
* 创建子 Actor
*
* @param name 子Actor名称
* @param replyToWhenDone 子Actor关闭响应
*/
private record JobSpawn(String name, ActorRef<JobDone> replyToWhenDone) implements Command {
/**
* 创建不允许外部创建
*
* @param name 子Actor名
* @param replyToWhenDone 关闭响应的地址
*/
private JobSpawn {
}
}
/**
* 关闭子 Actor
*/
private record JobTerminated(String name, ActorRef<JobDone> replyToWhenDone) implements Command {
}
/**
* 子 Actor 对象
*/
public static class Job extends AbstractBehavior<Command> {
/**
* Actor 名称
*/
final String name;
/**
* 不允许外部自行实例化
*/
private Job(ActorContext<Command> context, String name) {
super(context);
this.name = name;
}
/**
* 静态构建
*/
public static Behavior<Command> create(String name) {
return Behaviors.setup(ctx -> new Job(ctx, name));
}
/**
* 消息拦截器
*/
@Override
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onSignal(PostStop.class, this::onPostStop)
.build();
}
/**
* Actor 停止信号处理
*/
private Behavior<Command> onPostStop(PostStop event) {
final var ctx = getContext();
final var log = ctx.getLog();
log.info("Worker {} stoped", name);
return this;
}
}
/**
* 消息拦截器
*/
@Override
public Receive<Command> createReceive() {
return newReceiveBuilder()
// 监听消息创建子Actor
.onMessage(JobSpawn.class, this::onJobSpawn)
// 监听关闭 Terminated, 推送会自动推送给子Actor
.onMessage(JobTerminated.class, this::onJobTerminated)
.build();
}
/**
* 动态创建子 Actor
*/
private Behavior<Command> onJobSpawn(JobSpawn message) {
final var ctx = getContext();
final var log = ctx.getLog();
log.info("job {} started!", message.name);
// 创建子Actor
ActorRef<Command> job = ctx.spawn(
Job.create(message.name),
message.name
);
// 这里 context.watch 和 context.watchWith 是内部子Actor退出返回
// Actor监控去监听事件, 也是把自己父Actor指定监听子Actor
// 当子Actor内部触发 Behaviors.stoped(...) 会被通知父Actor
// ctx.watch(...) 和 ctx.watchWith(...) 两者差别:
// watch: 返回的是系统默认的 Terminated 异常, 可以去拦截系统的信号
// watchWith:返回的是自定义 Terminated 异常, 会把第二个参数返回推送到消息之中
ctx.watchWith(job, new JobTerminated(
message.name,
message.replyToWhenDone
));
return this;
}
/**
* 推送关闭子Actor消息
*/
private Behavior<Command> onJobTerminated(JobTerminated message) {
final var ctx = getContext();
final var log = ctx.getLog();
log.info("job {} stoped!", message.name);
// 传递关闭消息, 让子类Actor自信去捕获处理
message.replyToWhenDone.tell(new JobDone(message.name));
return this;
}
}
这里就是创建动态Actor管理器的流程, 通过 JobSpawn.class 消息拦截去动态创建 Actor,
并且创建之后把父类的 JobTerminated.class 消息自动转发给子类让其自己去拦截处理.
注意: 如果关闭子Actor要记得手动卸载父Actor消息监听转发, 需要 context.unwatch(target) 来手动卸载.
配置加载
可以看到之前官方默认直接 ActorSystem.create 之后就没有相关配置处理了,
但是其实对于 pekko|akka 的配置是十分复杂的, 只是默认配置隐藏在包内部的 reference.conf.
官方手册: 配置说明
项目规模到达一定程度之后, 就需要去自定义相关配置, 而现在就是需要抛弃系统默认配置:
import org.apache.pekko.actor.typed.ActorSystem;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
// 如果不传入手动配置的话, 其实生成默认流程如下
Config conf = ConfigFactory.defaultReference(); // 加载默认的包配置
// 创建 Actor 系统并且加载
ActorSystem<Void> sys = ActorSystem.create(rootBehavior, "MySystem", conf);
这就是你默认不传入配置启动 Actor 流程, 这里 defaultReference 默认是读取 pekko 包内自带配置.
这里官方推荐先加载系统默认配置, 之后在原来配置基础上覆盖重写配置:
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
/**
* 启动应用入口
*/
public class ActorConfigApp {
/**
* 启动入口
*/
public static void main(String[] args) {
// 首先加载系统默认配置
Config defaultReference = ConfigFactory.defaultReference();
// 构建自定义配置并加载系统默认配置, 这里可以合并成一条但为了直观点分开编写
Config config = ConfigFactory.load(defaultReference);
// 打印默认配置
// 能看到加载到配置信息: Config(SimpleConfigObject({......}))
// 可以看到内部检索 loglevel 关键字, 可以看到 "loglevel":"INFO"
System.out.println(config.toString());
// 我们尝试重写这个系统配置
// 把 loglevel 设置为 DEBUG
// 这里可以通过文件加载覆盖: ConfigFactory.parseFile(文件路径)
// 也可以通过 ConfigFactory.parseFile("xxx.yyy=zzz"), 层级配置以点号分割
// 注意: withFallback 其实就是把 ConfigFactory.parseXXX 的配置写入到传入的配置对象里面覆盖
config = ConfigFactory
.parseString("pekko.log-config-on-start = on")
.withFallback(config);
config = ConfigFactory
.parseString("pekko.loglevel = DEBUG")
.withFallback(config);
// 最后打印下重写配置
System.out.println(config.toString());
// todo: 创建自定义 Actor 系统
}
}
一般来说日常使用直接可以简化处理成以下内容:
// 合并配置
String filename = "application.conf";
File file = Paths.get(filename).toFile();
Config config = ConfigFactory.load(ConfigFactory.defaultReference());
config = ConfigFactory
.parseFile(file)
.withFallback(config);
// 这里最后得出本地文件合并配置
System.out.println(config.toString());
官方文档对于每个配置都说明了, 具体配置对象功能和默认数值建议查看官方文档.