MeteorCat / Pekko的框架设计

Created Mon, 16 Jun 2025 16:51:16 +0800 Modified Wed, 29 Oct 2025 23:25:00 +0800
6101 Words

一般 Actor 不会是单个项目独享的, 可能会分开大量架构(比如游戏关卡,游戏战斗等)来集群处理, 这种可以作为集群在其他项目之中启动处理.

pekko 内部封装其实已经很好了, 工作之中发现基本没什么需要封装, 除非有必要否则可以直接引入构建高级应用

实际上也是考虑到常常会用到搭建 Actor 管理器关系, 所以才打算学习 Erlang 处理方式让其支持 Supervisor-Worker 架构.

默认都是采用 pekko 的强类型 actor 构建, 弱类型后面传输的内容不好做控制

默认做个接口类方便作为数据容器继承:

/**
 * 基础的数据载体, 用于被其他类型做实现衍生
 */
public interface ActorCommand {
}

动态实现有种泛型静态反射构建的抽象类, 可以用来做些特殊的动态抽象:


import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;

import java.lang.reflect.Constructor;

/**
 * 自定义 Actor 运行层扩展
 */
public abstract class AbstractActorSupervisor extends AbstractBehavior<ActorCommand> {

    /**
     * Actor 名称
     */
    protected String name;

    /**
     * 构造方法
     * 默认的
     */
    public AbstractActorSupervisor(ActorContext<ActorCommand> context, String name) {
        super(context);
        this.name = name;
    }


    /**
     * 静态获取反射实例化
     *
     * @param clazz 衍生 AbstractActorSupervisor
     * @param name  Supervisor 名称
     * @param <T>   泛型 AbstractActorSupervisor 类
     * @return Behavior<ActorCommand>
     */
    public static <T extends AbstractActorSupervisor> Behavior<ActorCommand> create(Class<T> clazz, String name) {
        // 依赖反射的可以动态实例化
        return Behaviors.setup((ActorContext<ActorCommand> ctx) -> {
            final Constructor<T> constructor = clazz.getConstructor(ActorContext.class, String.class);
            final var logger = ctx.getLog();
            logger.info("create supervisor class: {}", clazz.getName());
            return constructor.newInstance(ctx, name);
        });
    }
}

这里主要方便自己去构建实现自己的应用层:

public class SimpleActor extends AbstractActorSupervisor {
    // 其他方法实现
}

// 外部可以采用 SimpleActor.create(SimpleActor.class,"simple") 来构建
// 第一个参数就制定了要求子类实现
SimpleActor.create(SimpleActor.class,"simple");

不过需要注意, 这种采用静态实例化是采用反射实现的, 也就是带有众所周知的问题: 性能损耗

反射虽然降低构建过程的性能, 但是可以带来更好的泛用性方便下级实现去构建出 Actor, 就像下面那样去处理获取静态对象类的 奇技淫巧:

/**
 *  抽象任务进程对象
 *  ActorCommand: 自定义的抽象通用接口
 */
public abstract class AbstractWorker extends AbstractBehavior<ActorCommand> {
    /**
     * Actor 名称
     */
    protected String name;

    /**
     * 构造方法
     *
     * @param context 上下文
     * @param name    Actor名称
     */
    public AbstractWorker(ActorContext<ActorCommand> context, String name) {
        super(context);
        this.name = name;
    }


    /**
     * 静态获取反射实例化
     *
     * @param clazz 衍生 AbstractWorker
     * @param name  Supervisor 名称
     * @param <T>   泛型 AbstractWorker 类
     * @return Behavior<ActorCommand>
     */
    public static <T extends AbstractWorker> Behavior<ActorCommand> create(Class<T> clazz, String name) {
        // 依赖反射的可以动态实例化
        return Behaviors.setup((ActorContext<ActorCommand> ctx) -> {
            final Constructor<T> constructor = clazz.getConstructor(ActorContext.class, String.class);
            final var logger = ctx.getLog();
            logger.info("create worker class: {}", clazz.getName());
            return constructor.newInstance(ctx, name);
        });
    }

}

// ------------------------------------------------------------------------------

/**
 * 抽象任务管理器
 */
public abstract class AbstractManager extends AbstractBehavior<ActorCommand> {

    /**
     * 管理的衍生任务类型
     * 用于传入 spawn 
     */
    protected final Class<? extends AbstractWorker> clazz;


    /**
     * 构造方法
     *
     * @param context 上下文
     */
    public AbstractManager(ActorContext<ActorCommand> context, Class<? extends AbstractWorker> clazz) {
        super(context);
        this.clazz = clazz;
    }


    // ---------------------------------------------------------------------------
    // 这里演示下怎么依靠反射机制来动态生成 AbstractWorker
    // ---------------------------------------------------------------------------


    /**
     * 创建任务进程信号
     * 用于创建
     *
     * @param name
     */
    public record WorkerCreate(
            // Worker名称
            String name
    ) implements Signal {

    }

    /**
     * 默认拦截消息回调
     *
     * @return Receive
     */
    @Override
    public Receive<ActorCommand> createReceive() {
        return newReceiveBuilder()
                .onSignal(WorkerCreate.class, (children) -> {
                    final var ctx = getContext();
                    final var log = ctx.getLog();

                    // 采用静态实例化
                    // 这里内部已经将 Actor 构建挂载完成, 等待接入到管理器
                    final var runtime = AbstractWorker.create(clazz, children.name());

                    // 挂载到管理器之中
                    ActorRef<Command> job = ctx.spawn(runtime, children.name());

                    // 让子Actor监控父Actor的异常退出
                    ctx.watch(job);

                    // 打印构建日志
                    log.info("worker {} started! address: {}", children.name(), job.path());
                    return Behaviors.same();
                })
                .build();
    }
}

上面就是比较简单的利用反射机制+抽象类, 后续只需要继承 AbstractManager + AbstractWorker 两个抽象类就可以动态衍生, 这样的好处就是外层不需要多次包装重复的代码(像 Actor-Spawn 实际上还是比较常用).

上面的方法如果为了节约反射性能, 去裸写管理器都可以, 这里仅仅是作为可能会被多次使用的通用构建抽象

还有另外的 interface(接口) + creater(创建器) 方法, 这种方法更通用点且无须采用反射功能; 利用接口定义必须要处理的功能之后主要任务 actor 接收封装构建回调方法来进行 spawn 分离:

import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.Signal;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Receive;
import org.apache.pekko.actor.typed.javadsl.ReceiveBuilder;

import java.util.function.Function;


/**
 * 通用Actor系统抽象
 * <p>
 * 要求实例化的时候传入动态子进程构建器来动态生成实例化 Actor
 *
 * @param <T>
 */
public abstract class ActorBehavior<T> extends AbstractBehavior<T> {


    /**
     * Actor进程属性, 用于分离进程
     *
     * @param name
     * @param bootstrap
     * @param <T>
     */
    public record JobProps<T>(
            String name,
            ActorBehavior<T> bootstrap
    ) {
    }

    /**
     * Actor进程创建信号
     *
     * @param name
     */
    public record JobSpawn(
            String name
    ) implements Signal {
    }

    /**
     * Actor进程停止信号
     *
     * @param address
     * @param <T>
     */
    public record JobStop<T>(
            ActorRef<T> address
    ) implements Signal {
    }


    /**
     * Actor消息拦截器
     * <p>
     * 考虑到拦截器是基本会用到的, 直接在上层定义处理好处理
     */
    protected ReceiveBuilder<T> builder = newReceiveBuilder();


    /**
     * 动态构建器, 用于分离Actor的时候返回动态对象
     */
    protected final Function<JobProps<T>, Behavior<T>> creator;


    /**
     * 构造方法
     *
     * @param context Actor 上下文
     * @param creator Actor 动态分离构建器
     */
    public ActorBehavior(ActorContext<T> context, Function<JobProps<T>, Behavior<T>> creator) {
        super(context);
        this.creator = creator;
    }


    /**
     * 暴露给底层的消息拦截处理
     *
     * @param builder 消息拦截器
     * @return 消息拦截器
     */
    protected ReceiveBuilder<T> onCreateReceive(ReceiveBuilder<T> builder) {
        return builder;
    }


    /**
     * Actor 拦截回调
     */
    @Override
    public Receive<T> createReceive() {
        builder.onSignal(JobSpawn.class, this::onJobSpawn);
        builder.onSignal(JobStop.class, this::onJobStop);
        return onCreateReceive(builder).build();
    }


    /**
     * 动态构建 Actor 进程分离
     *
     * @param signal 信号
     * @return Behavior
     */
    private Behavior<T> onJobSpawn(JobSpawn signal) {
        final var context = getContext();
        final var log = context.getLog();
        final var behavior = creator.apply(new JobProps<>(signal.name, this));
        final var runtime = context.spawn(behavior, signal.name);
        context.watch(runtime);
        log.info("job spawned: {}", signal.name);
        return this;
    }


    /**
     * 分离的 Actor 进程停止
     *
     * @param signal 信号
     * @return Behavior
     */
    private Behavior<T> onJobStop(JobStop<T> signal) {
        final var context = getContext();
        final var log = context.getLog();
        context.unwatch(signal.address);
        context.stop(signal.address);
        log.info("job stoped: {}", signal.address);
        return this;
    }
}

// ------------------------------------------------------------
// 后续如果就只需要实现抽象类就能处理
// ------------------------------------------------------------


/**
 * 应用入口
 */
public class FusionGameApplication {

    /**
     * Actor父类管理器
     * <p>
     * 这里就是抽象出来的 Actor实现
     */
    public static final class EchoSupervisor extends ActorBehavior<String> {

        /**
         * 构造方法
         * <p>
         * 把构造方法私有避免外部自己去初始化, 让他采用静态实例化调用方便控制实例化的流程
         */
        private EchoSupervisor(ActorContext<String> context, Function<JobProps<String>, Behavior<String>> creator) {
            super(context, creator);
        }

        /**
         * 静态实例化方法, 用于避免外部来进行自定义实例化
         */
        public static Behavior<String> create(Function<JobProps<String>, Behavior<String>> creator) {
            return Behaviors.setup(ctx -> new EchoSupervisor(ctx, creator));
        }
    }


    /**
     * Actor子类处理器
     */
    public static final class EchoWorker extends AbstractBehavior<String> {

        /**
         * Actor进程传入的信息
         * <p>
         * 内部带有名称和父级Actor地址
         */
        final ActorBehavior.JobProps<String> job;

        /**
         * 构造方法
         */
        private EchoWorker(ActorContext<String> context, ActorBehavior.JobProps<String> job) {
            super(context);
            this.job = job;
        }


        /**
         * 消息拦截回调
         */
        @Override
        public Receive<String> createReceive() {
            return newReceiveBuilder()
                    .onAnyMessage((msg) -> {
                        getContext().getLog().info("fork job({}): {}", job.name(), msg);
                        return Behaviors.same();
                    })
                    .build();
        }


        /**
         * 静态实例化
         */
        public static Behavior<String> create(ActorBehavior.JobProps<String> job) {
            return Behaviors.setup(ctx -> new EchoWorker(ctx, job));
        }
    }

    /**
     * 方法入口
     *
     * @param args 参数
     */
    public static void main(String[] args) {
        // 生成配置
        Config defaultReference = ConfigFactory.defaultReference();
        Config config = ConfigFactory.load(defaultReference);
        config = ConfigFactory
                .parseString("pekko.loglevel = DEBUG")
                .withFallback(config);
        config = ConfigFactory
                .parseString("pekko.actor.allow-java-serialization = on")
                .withFallback(config);

        // 生成系统
        ActorSystem<?> system = ActorSystem.create(Behaviors.empty(), "sys", config);
        ActorRef<?> echoRef = system.systemActorOf(EchoSupervisor.create(EchoWorker::create), "echo", Props.empty());
        echoRef.unsafeUpcast().tell(new ActorBehavior.JobSpawn(UUID.randomUUID().toString()));
    }
}

这样的功能抽象小包装其实就差不多可以了, 主要 动态构建 Actor 的情况不多见, 只在 网络会话|关卡创建|多人联机级 的情况比较多看到, 其他时间都是 针对 Actor 的自娱自乐 .

Actor 自娱自乐 概念需要留意, 大部分不涉及多人联机都是基于策划玩法游玩属于自己的关卡并修改自己 存档 数据

但是一般来说除非项目需要引入集群来分散游戏业务负载, 比如需要把大规模计算任务需求移交给另外服务器来计算, 否则依靠 Local Actor 足够承担简单的业务就行了.

在实践中发现集群其实也是一把双刃剑, 虽然把计算压力分散到其他的服务器, 但别忘了服务器之间还有数据转发的损失; 如果拆分业务太细且服务器分布过多, 会导致业务瓶颈全在转发过程, 所以尽可能不要把集群拆分到2个以上.

实际上单个 Actor 挂载在进程内存然后直接做内存的数据交换时候效率性能最好, 这样可以避免多余的网络传输交换

另外还有个问题: 为什么 websocket 网络库不采用 pekko-http|akka-http 集成的功能, 而是要引入外部网络库(netty|vert-x)集成

这是有原因的, 大部分游戏初期先上到 h5 平台后续项目需求强度上来之后, 游戏需求变动有时候就开始塞垃圾需求; 比如 大世界和多人SLG经营 这种, 这是否多人联机转发就要开始死扣性能开销, 需要采用 tcp|udp 甚至于 kcp; 如果采用默认集成 http 库后面扩展起来也比较麻烦, 不如前期就把网络层单独提取出来单独通用集成.

所以在最开始一定要封装自己的网络库 actor 对象, 并且自己|第三方编写网络库要嵌入在 actor 框架之中.

vert-x 嵌入

网络库封装其实比较简单, 只要是需要动态 spawn子Actor 那部分需要处理, 我习惯采用 vert-x 第三方工具库:

<!-- 引入网络库和Actor准备集成 -->
<dependencies>
    <!-- vertx 组件依赖,只需要核心库的网络支持  -->
    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-core</artifactId>
        <version>${vertx.version}</version>
    </dependency>

    <!-- Actor依赖 -->
    <dependency>
        <groupId>org.apache.pekko</groupId>
        <artifactId>pekko-actor-typed_${scala.binary.version}</artifactId>
    </dependency>

    <!-- logback依赖 -->
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>${logback.version}</version>
    </dependency>

</dependencies>

如果最多再加个单元测试库 junit 就完全可以网络封装的 actor 业务开发, 但是后续纠结的问题就有下面这些:

  1. 是提取封装成通用的 AbstractBehavior 还是直接引入之后业务开发, 如果封装通用就不得不涉及到反射的问题
  2. 需要采用 Maven 单项目或者 maven 多项目构建把功能分离出来处理或者合并 parent 项目内, 多个项目分布可能会加大项目复杂性
  3. 需要预留好集群业务转发功能吗? 或者说游戏服务架构是要采用分服还是世界服开发?
  4. GM 外部后台推送服务(奖励推送|邮件发放|玩家管理)应该是集成在一起还是分离出来接口处理?

这是最开始考虑的点, 有的开发者习惯前期考虑到这种问题避免后续扩展的时候需要对项目进行大量该动, 而有的习惯先上手搭建后期微调整体架构, 按照每个人不同习惯就习惯处理.

这里我从头开始构建出对应 Actor, 为了防止 pekko 版本变动导致接口对应不上, 我会尽可能把接口参数暴露给外部:

import io.vertx.core.AsyncResult;
import io.vertx.core.http.HttpServer;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.Signal;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Receive;
import org.apache.pekko.actor.typed.javadsl.ReceiveBuilder;


/**
 * 抽象的 WebSocket Actor 层
 *
 * @param <S>
 * @param <R>
 */
public abstract class AbstractWebSocketFactory<S, R> extends AbstractBehavior<S> {


    /**
     * 基础服务器句柄
     */
    protected final HttpServer server;


    /**
     * 服务器句柄状态结构
     */
    public enum State implements Signal {
        Start, // 启动服务
        Close // 关闭服务
    }


    /**
     * 构造方法
     *
     * @param context 上下问
     * @param server  服务器句柄
     */
    public AbstractWebSocketFactory(ActorContext<S> context, HttpServer server) {
        super(context);
        this.server = server;
    }


    /**
     * 内部消息拦截回调
     *
     * @return 拦截回调返回
     */
    @Override
    public Receive<S> createReceive() {
        final var builder = onReceive();

        // 封装抽象出来的回调
        if (builder != null) {
            builder.onSignal(State.class, State.Start::equals, this::onStartServer);
            builder.onSignal(State.class, State.Close::equals, this::onCloseServer);
        }

        return builder == null ? null : builder.build();
    }

    /**
     * 要求实现类返回具体的消息拦截器用于实现自定义
     *
     * @return 消息拦截器
     */
    public abstract ReceiveBuilder<S> onReceive();


    /**
     * 启动服务器
     */
    private Behavior<S> onStartServer(State ignore) {
        final var ctx = getContext();
        final var log = ctx.getLog();

        // 服务器消息异步回调
        server.webSocketHandler(socket -> {
            // 这里就是 spawn 相关内容
            log.info("websocket connect by {}", socket);
        });


        // 启动监听
        server.listen()
                .onSuccess(this::onSuccess)
                .onFailure(this::onFailure);
        return this;
    }


    /**
     * 服务器启动时候成功回调
     * <p>
     * 用于底层实现类去捕获对应成功回调处理
     *
     * @param server 服务器信息
     */
    public void onSuccess(HttpServer server) {

    }

    /**
     * 服务器启动时候异常回调
     * <p>
     * 用于底层实现类去捕获对应异常回调处理
     *
     * @param throwable 异常信息
     */
    public void onFailure(Throwable throwable) {

    }


    /**
     * 关闭服务器
     */
    private Behavior<S> onCloseServer(State ignore) {
        server.close(this::onClose);
        return this;
    }

    /**
     * 服务器主动关闭的回调处理
     * <p>
     * 用于底层实现类去捕获对应退出回调处理
     *
     * @param future 异步运行时
     */
    public void onClose(AsyncResult<Void> future) {

    }
}

这里就是我习惯的封装方式, 尽可能保留上层第三方库的扩展方法不去过度封装.

其实这个的 State 枚举实现我还是不满意的, 应该直接定义功能类传入回调方法等参数这样更加灵活

这里可以先不用管会话信息的 Socket 来运行下启动服务确认启动成功:

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import org.apache.pekko.actor.typed.*;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.ReceiveBuilder;

import java.time.Duration;

/**
 * 应用入口
 */
public class FusionGameApplication {


    /**
     * 临时实现的 WebSocket 运行时
     */
    public static class WebSocketRuntime extends AbstractWebSocketFactory<Void, Void> {

        /**
         * 构造方法
         *
         * @param context 上下问
         * @param server  服务器句柄
         */
        private WebSocketRuntime(ActorContext<Void> context, HttpServer server) {
            super(context, server);
        }

        @Override
        public ReceiveBuilder<Void> onReceive() {
            return newReceiveBuilder();
        }


        /**
         * 静态实例化
         */
        public static Behavior<Void> create(HttpServer server) {
            return Behaviors.setup(ctx -> new WebSocketRuntime(ctx, server));
        }
    }

    /**
     * 方法入口
     *
     * @param args 参数
     */
    public static void main(String[] args) {
        // 生成配置
        Config defaultReference = ConfigFactory.defaultReference();
        Config config = ConfigFactory.load(defaultReference);
        config = ConfigFactory
                .parseString("pekko.loglevel = DEBUG")
                .withFallback(config);
        config = ConfigFactory
                .parseString("pekko.actor.allow-java-serialization = on")
                .withFallback(config);

        // 生成系统
        ActorSystem<?> system = ActorSystem.create(Behaviors.empty(), "sys", config);

        // 生成服务器句柄
        Vertx vertx = Vertx.vertx();
        HttpServerOptions options = new HttpServerOptions()
                .setHost("127.0.0.1")
                .setPort(8080)
                .setSsl(false);
        HttpServer server = vertx.createHttpServer(options);


        // 生成 WebSocket Actor 并且推送 启动信号
        ActorRef<?> websocket = system.systemActorOf(WebSocketRuntime.create(server), "websocket", Props.empty());
        websocket.unsafeUpcast().tell(AbstractWebSocketFactory.State.Start);

        // 测试5s之后推送关闭服务器信号
        Scheduler scheduler = system.scheduler();
        scheduler.scheduleOnce(Duration.ofSeconds(5), () -> {
            websocket.unsafeUpcast().tell(AbstractWebSocketFactory.State.Close);
        }, system.executionContext());
    }
}

这里启动之后可以采用 Postman 之类的连接客户端, 在5秒之后将会自动关闭服务器; 对于游戏服务器来说, 客户端连接服务相当于 网关, 必须能够被手动控制开服和关服维护.

接下来按照上面说明的知识点, 我们需要将每个 WebSocket 会话都作为 子Actor 挂载在其中, 也就是需要重新构建另外的 AbstractBehavior 来让 WebSocket 句柄移交给他做处理:

import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Receive;
import org.apache.pekko.actor.typed.javadsl.ReceiveBuilder;

/**
 * 抽象的Websocket会话Actor
 *
 * @param <S>
 * @param <W>
 */
public abstract class AbstractWebSocketSession<S, W> extends AbstractBehavior<W> {

    /**
     * Actor地址名
     */
    protected final String name;

    protected final ActorRef<S> bootstrap;

    protected final ServerWebSocket socket;

    public AbstractWebSocketSession(ActorContext<W> context, String name, ActorRef<S> bootstrap, ServerWebSocket socket) {
        super(context);
        this.name = name;
        this.bootstrap = bootstrap;
        this.socket = socket;

        // WebSocket异步回调处理
        if (socket != null) {

            // todo: 注意以下方法是有陷阱的

            // 连接的时候调用回调, 让底层实现拦截
            this.onConnected(socket);

            // 数据帧分包回调
            socket.frameHandler(handler -> {
                switch (handler.type()) {
                    // Ping消息
                    case PING -> this.onPingMessage(socket);

                    // Pong消息
                    case PONG -> this.onPongMessage(socket);


                    // 字符串回调
                    case TEXT -> this.onTextMessage(socket, handler.textData());

                    // 二进制回调
                    case BINARY -> this.onBinaryMessage(socket, handler.binaryData());

                    // 关闭回调
                    case CLOSE -> {
                        final Short status = handler.closeStatusCode();
                        final String reason = handler.closeReason();
                        this.onDisconnected(socket, status, reason);
                    }
                }
            });

            // 异常回调
            socket.exceptionHandler(handler -> {
                this.onThrowable(socket, handler);

                // 关闭Actor
            });
        }
    }

    /**
     * 消息拦截器
     */
    @Override
    public Receive<W> createReceive() {
        final ReceiveBuilder<W> builder = onReceive();
        return builder == null ? null : builder.build();
    }

    /**
     * 要求实现类继承构建消息拦截器回调
     */
    public abstract ReceiveBuilder<W> onReceive();

    /**
     * 要求实现类继承构建会话连接的回调
     */
    public abstract void onConnected(ServerWebSocket socket);

    /**
     * 要求实现类继承构建会话关闭回调
     */
    public abstract void onDisconnected(ServerWebSocket socket, Short status, String reason);

    /**
     * 要求实现类继承构建会话异常回调
     */
    public abstract void onThrowable(ServerWebSocket socket, Throwable throwable);


    // ----------------------------
    // 以下是一些消息回调
    public void onPingMessage(ServerWebSocket socket) {

    }

    public void onPongMessage(ServerWebSocket socket) {
    }

    public void onTextMessage(ServerWebSocket socket, String text) {

    }

    public void onBinaryMessage(ServerWebSocket socket, Buffer binary) {

    }
}

代码看起来流程也是正确的, 我实现这些抽象发现启动没问题, 但是当你直接准备消息处理之后就发现 Actor 系统异常; 这些异常都是集中在底层实现的 onXXX 方法子中, 具体表现就是 getContext() 完全失效了, 可以停下来思考为什么会出现这种问题?

这里结合 vert-x 是异步任务库和 pekko 的任务安全性来思考为什么回调方法内部调用 actor 工具会异常

其实上面已经揭示了某些问题, 那就是 vert-x 的异步运行时和 pekko 内部运行时冲突了, 同时还有以下问题:

// vert-x 内部传入的回调方法并不是在该线程执行
// 当被唤起 frameHandler 任务的时候内部线程池分配调用 
socket.frameHandler(..function..)

// 这里看起来流程是正确的, 但是当他集成在 pekko 之中就出现问题, 内部无论调用 getContent() 最终都是无法捕获
// pekko 默认不允许跨线程做复杂操作(如动态构建 actor 等, pekko尽可能采用tell消息投递让内部运行)
// 实际操作下来这也是正确的: `对于actor来说, 不要在异步线程中进行任何操作而是要把他包装成消息发送到内部处理`
// vert-x 这种问题会频繁出现包装第三方用到异步架构, 因为没办法保证不同线程异步处理的数据安全性
// 所以涉及到异步调用 this.XXX 的功能都是违背 actor 规则

基于这种方式, 集成第三方库的功能都不应该直接通过 this.xxxx 功能调用到 actor 自身任何属性和方法.

请记住: 无论怎么更替第三方的库, 消息传递方法必须采用 actor 内部投递而非直接调用内部方法和修改内部属性

还有一点需要说明, 初始化功能(onConnected)其实不需要处理, 因为当初始化的时候就代表 socket 已经成功初始化成功.

不过外部可能需要在 spawn 启动时候做初始化, 比如需要实例化加载些专属服务功能服务等, 可以查询下Behaviors.aroundXXX功能

处理完之后的抽象会话功能类:

import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.Signal;
import org.apache.pekko.actor.typed.javadsl.*;

/**
 * 抽象的Websocket会话Actor
 *
 * @param <S>
 * @param <W>
 */
public abstract class AbstractWebSocketSession<S, W> extends AbstractBehavior<W> {


    /**
     * Actor地址名
     */
    protected final String name;

    /**
     * 上级Actor地址
     */
    protected final ActorRef<S> bootstrap;

    /**
     * 上级监听让出的 Socket 对象
     */
    protected final ServerWebSocket socket;

    /**
     * 数据帧回调
     */
    protected static class FrameWrapped {

        /**
         * 文本回调
         *
         * @param socket
         * @param text
         */
        record Text(
                ServerWebSocket socket,
                String text
        ) implements Signal {
        }

        /**
         * 二进制回调
         *
         * @param socket
         * @param binary
         */
        record Binary(
                ServerWebSocket socket,
                Buffer binary
        ) implements Signal {

        }

        /**
         * 关闭回调
         *
         * @param socket
         * @param status
         * @param reason
         */
        record Close(
                ServerWebSocket socket,
                Short status,
                String reason
        ) implements Signal {

        }

        /**
         * 异常回调
         *
         * @param socket
         * @param throwable
         */
        record Throwable(
                ServerWebSocket socket,
                java.lang.Throwable throwable
        ) implements Signal {
        }
    }


    /**
     * 构造方法
     */
    public AbstractWebSocketSession(ActorContext<W> context, String name, ActorRef<S> bootstrap, ServerWebSocket socket) {
        super(context);
        this.name = name;
        this.bootstrap = bootstrap;
        this.socket = socket;

        // WebSocket异步回调处理
        if (socket != null) {


            // 数据帧分包回调
            socket.frameHandler(handler -> {
                switch (handler.type()) {

                    // 字符串回调
                    case TEXT ->
                            context.getSelf().unsafeUpcast().tell(new FrameWrapped.Text(socket, handler.textData()));

                    // 二进制回调
                    case BINARY ->
                            context.getSelf().unsafeUpcast().tell(new FrameWrapped.Binary(socket, handler.binaryData()));

                    // 关闭回调
                    case CLOSE -> {
                        final Short status = handler.closeStatusCode();
                        final String reason = handler.closeReason();
                        context.getSelf().unsafeUpcast().tell(new FrameWrapped.Close(socket, status, reason));
                    }
                }
            });

            // 异常回调
            socket.exceptionHandler(handler -> {
                context.getSelf().unsafeUpcast().tell(new FrameWrapped.Throwable(socket, handler));
            });
        }
    }


    /**
     * 消息拦截器
     */
    @Override
    public Receive<W> createReceive() {
        final ReceiveBuilder<W> builder = onReceive();

        if (builder != null) {
            builder.onSignal(FrameWrapped.Text.class, (handler) -> {
                onTextMessage(handler.socket, handler.text);
                return this;
            });

            builder.onSignal(FrameWrapped.Binary.class, (handler) -> {
                onBinaryMessage(handler.socket, handler.binary);
                return this;
            });

            builder.onSignal(FrameWrapped.Throwable.class, (handler) -> {
                onThrowable(handler.socket, handler.throwable);
                return this;
            });

            builder.onSignal(FrameWrapped.Close.class, (handler) -> {
                onDisconnected(handler.socket, handler.status, handler.reason);

                // 停止这里并没有释放出 actor, 可以思考下这里怎么处理当作额外扩展知识
                return this;
            });
        }

        return builder == null ? null : builder.build();
    }

    /**
     * 要求实现类继承构建消息拦截器回调
     */
    public abstract ReceiveBuilder<W> onReceive();


    /**
     * 要求实现类继承构建会话关闭回调
     */
    public abstract void onDisconnected(ServerWebSocket socket, Short status, String reason);

    /**
     * 要求实现类继承构建会话异常回调
     */
    public abstract void onThrowable(ServerWebSocket socket, Throwable throwable);


    // ----------------------------
    // 以下是一些消息回调

    public void onTextMessage(ServerWebSocket socket, String text) {

    }

    public void onBinaryMessage(ServerWebSocket socket, Buffer binary) {

    }
}

最后的测试单元实例化如下:

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.ServerWebSocket;
import org.apache.pekko.actor.typed.*;
import org.apache.pekko.actor.typed.javadsl.*;

import java.util.Arrays;
import java.util.function.Function;

/**
 * 应用入口
 */
public class FusionGameApplication {


    /**
     * 临时实现的 WebSocket 运行时
     */
    public static class WebSocketRuntime extends AbstractWebSocketFactory<Void, Void> {

        /**
         * 构造方法
         *
         * @param context 上下问
         * @param server  服务器句柄
         * @param creator 动态构建器
         */
        private WebSocketRuntime(ActorContext<Void> context, HttpServer server, Function<Props<Void>, Behavior<Void>> creator) {
            super(context, server, creator);
        }

        @Override
        public ReceiveBuilder<Void> onReceive() {
            return newReceiveBuilder();
        }


        /**
         * 静态实例化
         */
        public static Behavior<Void> create(HttpServer server, Function<Props<Void>, Behavior<Void>> creator) {
            return Behaviors.setup(ctx -> new WebSocketRuntime(ctx, server, creator));
        }
    }

    /**
     * WebSocket会话对象
     */
    public static class WebSocketWorker extends AbstractWebSocketSession<Void, Void> {


        public WebSocketWorker(ActorContext<Void> context, String name, ActorRef<Void> bootstrap, ServerWebSocket socket) {
            super(context, name, bootstrap, socket);
        }

        @Override
        public ReceiveBuilder<Void> onReceive() {
            return newReceiveBuilder();
        }


        /**
         * 关闭回调
         */
        @Override
        public void onDisconnected(ServerWebSocket socket, Short status, String reason) {
            getContext().getLog().info("close:{} - {}", status, reason);
        }

        @Override
        public void onThrowable(ServerWebSocket socket, Throwable throwable) {

        }

        @Override
        public void onTextMessage(ServerWebSocket socket, String text) {
            getContext().getLog().info("text:{}", text);
        }

        @Override
        public void onBinaryMessage(ServerWebSocket socket, Buffer binary) {
            getContext().getLog().info("binary:{}", Arrays.toString(binary.getBytes()));
        }
    }

    /**
     * 方法入口
     *
     * @param args 参数
     */
    public static void main(String[] args) {
        // 生成配置
        Config defaultReference = ConfigFactory.defaultReference();
        Config config = ConfigFactory.load(defaultReference);
        config = ConfigFactory
                .parseString("pekko.loglevel = DEBUG")
                .withFallback(config);
        config = ConfigFactory
                .parseString("pekko.actor.allow-java-serialization = on")
                .withFallback(config);

        // 生成系统
        ActorSystem<?> system = ActorSystem.create(Behaviors.empty(), "sys", config);

        // 生成服务器句柄
        Vertx vertx = Vertx.vertx();
        HttpServerOptions options = new HttpServerOptions()
                .setHost("127.0.0.1")
                .setPort(8080)
                .setSsl(false);
        HttpServer server = vertx.createHttpServer(options);


        // 生成 WebSocket Actor 并且推送 启动信号
        ActorRef<?> websocket = system.systemActorOf(WebSocketRuntime.create(server, (props) -> {
            return Behaviors.setup(ctx -> new WebSocketWorker(
                    ctx, props.name(),
                    props.bootstrap(),
                    props.socket()
            ));
        }), "websocket", Props.empty());
        websocket.unsafeUpcast().tell(AbstractWebSocketFactory.State.Start);
    }
}

这里其实简化有些需要说明的功能点, 比如父信号关闭处理回收(关闭型号必须要父类去关闭)等; 不过基本功能都已经完备只差一些周边系统功能开发, 而且这里仅仅作为参考实例说明, 现实当中的功能处理往往更加复杂.