MeteorCat / Pekko和VertX互相集成(二)

Created Sun, 01 Jun 2025 18:29:00 +0800 Modified Wed, 29 Oct 2025 23:25:05 +0800
3148 Words

之前已经展示封装成功能 Actor, 可以看到我们封装类和 skynet 类似, 在官方 vert-x 文档可以看到比较简单的 WebSocket 集成设置:

import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.WebSocket;

public class WebSocketServerExample {

    public static void main(String[] args) {
        // 创建Vertx实例
        Vertx vertx = Vertx.vertx();

        // 创建Http服务器选项
        HttpServerOptions options = new HttpServerOptions()
                .setPort(8080);

        // 创建Http服务器
        HttpServer server = vertx.createHttpServer(options);

        // 监听HTTP请求并升级为WebSocket连接
        server.webSocketHandler(webSocket -> {
            System.out.println("WebSocket连接已建立");

            // 处理接收到的WebSocket消息
            webSocket.handler(buffer -> {
                String message = buffer.toString();
                System.out.println("收到消息: " + message);

                // 发送响应消息
                webSocket.writeTextMessage("你发送的消息是: " + message);
            });

            // 处理WebSocket连接关闭事件
            webSocket.closeHandler(v -> {
                System.out.println("WebSocket连接已关闭");
            });
        });

        // 启动Http服务器
        server.listen();
    }
}

所以看起来对于基本的 WebSocket 服务器只需要传入两个参数:

  • Vertx: 运行时
  • HttpServerOptions: Http服务器配置

那么很简单集成抽象实现:


/**
 * WebSocket的Actor管理器
 */
public class WebSocketSupervisor extends AbstractSupervisor<IEvent<?>> {

    /**
     * VertX运行时
     */
    final Vertx vertx;

    /**
     * Http启动配置
     * <p>
     * 用于重启服务器的时候释放掉 httpServer 句柄并重新构建
     */
    final HttpServerOptions options;

    /**
     * 构造方法,不允许外部实例化
     *
     * @param context  Actor运行时
     * @param vertx    VertX运行时
     * @param options  Http配置信息
     * @param capacity 子Actor容量
     */
    private WebSocketSupervisor(ActorContext<IEvent<?>> context, Vertx vertx, HttpServerOptions options, int capacity) {
        super(context, capacity);
        this.vertx = vertx;
        this.options = options;
    }

    /**
     * 构造方法, 不允许外部实例化
     *
     * @param context Actor运行时
     * @param vertx   VertX运行时
     * @param options Http配置信息
     */
    private WebSocketSupervisor(ActorContext<IEvent<?>> context, Vertx vertx, HttpServerOptions options) {
        super(context);
        this.vertx = vertx;
        this.options = options;
    }

    /**
     * 静态构建
     */
    public static Behavior<IEvent<?>> create(Vertx vertx, HttpServerOptions options) {
        return Behaviors.setup(ctx -> new WebSocketSupervisor(ctx, vertx, options));
    }

    /**
     * 静态构建
     */
    public static Behavior<IEvent<?>> create(Vertx vertx, HttpServerOptions options, Consumer<WebSocketSupervisor> creator) {
        return Behaviors.setup(ctx -> {
            final var owner = new WebSocketSupervisor(ctx, vertx, options);
            creator.accept(owner);
            return owner;
        });
    }

    // todo:其他扩展功能
}

以上暂时仅搭建空白的服务框架, 但是外部只需要直接传入 VertX 相关配置就可以运行:

final var vertx = Vertx.vertx();
final var options = new HttpServerOptions()
        .setHost("127.0.0.1") // 监听hostname
        .setPort(8088) // 监听端口
        .setSsl(false); // 是否启用 https

// 创建 WebSocket运行时, 启动之后这里会提示说消息没有被拦截 `unhandled` , 这是正常的现象
// to Actor[pekko://websocket/user] was unhandled. [1] dead letters encountered
final var runtime = WebSocketSupervisor.create(vertx, options);
final var actor = ActorSystem.create(
        runtime,
        "websocket");

这是否就开始规划功能了, VertX 需要 开启关闭 功能来给 开服|停服 通知:

/**
 * WebSocket的Actor管理器
 */
public class WebSocketSupervisor extends AbstractSupervisor<IEvent<?>> {
    /**
     * Http服务句柄
     */
    HttpServer server = null;


    /**
     * 启动WebSocket服务, 用于开服启动
     */
    public void startHttpServer() {
        final var logger = getContext().getLog();
        if (server != null) {
            logger.warn("websocket server already started");
            return;
        }


        logger.info("websocket server started");


        // 创建HTTP服务,
        server = vertx.createHttpServer(options);
        server.webSocketHandler(webSocket -> {
            // todo:至此就是会话层的关系

            // 消息回调
            webSocket.frameHandler(frameHandler -> {

                // 这里测试下触发关服, 只需要客户端发送 'stop' 关键字就自动关闭
                // 这里其实是错误示范, 因为这种关于 actor 操作应该放入内部的消息队列, 也就是 tell 让其按照顺序去执行
                // 直接不走 Actor 关闭服务器会导致分离出来的子 actor 消息推送强制中断.
                if (frameHandler.isText()) {
                    final var msg = frameHandler.textData();
                    if (msg.equals("stop")) {
                        stopHttpServer();
                    }
                }

            });

            webSocket.closeHandler(closeHandler -> {
                logger.warn("websocket handler closed");
            });
        });


        // 启动监听
        server.listen().onFailure(throwable -> {
            server.close();
            server = null;
            logger.error("websocket start failed", throwable);
        });
    }

    /**
     * 停服推送
     */
    public void stopHttpServer() {
        final var logger = getContext().getLog();
        if (server == null) {
            logger.warn("websocket service not started");
            return;
        }

        // 所有子 Actor 通知关闭
        final var ctx = getContext();
        for (final var worker : workers.values()) {
            ctx.stop(worker);
        }
        server.close();
    }

}

这里初始化构建启动运行时的方式如下:

/**
 * 启动应用入口
 */
public class WebSocketApp {


    /**
     * 消息回调方法
     * 这里面的 OnStart | OnStop 是自定义的事件类
     */
    private static void init(WebSocketSupervisor supervisor) {
        var builder = supervisor.getReceiveBuilder();
        // 启动服务
        builder.onMessage(OnStart.class, (event) -> {
            System.out.println("WebSocket started");
            supervisor.startHttpServer();
            return supervisor;
        });

        // 关闭服务
        builder.onMessage(OnStop.class, (event) -> {
            System.out.println("WebSocket stopped");
            supervisor.stopHttpServer();
            return Behaviors.same();
        });
    }


    /**
     * 核心入口方法
     */
    public static void main(String[] args) {
        final var vertx = Vertx.vertx();
        final var options = new HttpServerOptions()
                .setHost("127.0.0.1") // 监听hostname
                .setPort(8088) // 监听端口
                .setSsl(false); // 是否启用 https

        // 创建 WebSocket运行时
        final var runtime = WebSocketSupervisor.create(vertx, options, WebSocketApp::init);
        final var actor = ActorSystem.create(
                runtime,
                "websocket");


        actor.tell(new OnStart());

    }
}

可以用 Postman 这种客户端直接测试连接, 推送 stop 字符串就会把启动的 websocket 服务关闭; 注意这里的 OnStart | OnStop 是我自己自定义的事件类用来拦截消息的.

这就是我们目前跑起来支持 actorwebsocket 网关程序, 虽然简单但是基本网络传输功能已经设计完成; 接下来就是封装 Session(会话) 层面的子 actor, 用于 代理(Agent) 客户端交互请求.

关键词: 代理者, 用于代理 websocketOpen|Close|Message|Error 推送到 actor 对象中心让其自己拦截处理

这里可以暂停一下思考下后续的会话功能类应该怎么去封装, 另外还需要说明下 Actor 内部的回调拦截 ReceiveBuilder:

// 创建拦截器
builder = ReceiveBuilder.create();

// 拦截器当中比较常用的两个函数:
builder.onMessage(...); 细致化消息拦截回调
builder.onSignal(...); 简单的消息信号处理

// 这两种函数最明显的差别就是类型规则声明
// 在 actor 的 message 之中, 需要去自定义传递消息内容类型, 并且后续消息必须继承或者派生该消息类
// 有时候我们不想要设计通用功能接口适应 message 的时候, 可以采用实现 org.apache.pekko.actor.typed.Signal 类来做消息信号结构
// 之前我们采用自己定义 IEvent 就是需要这样比较复杂的派生, 要求对消息自定义被通用衍生继承实现

Signal 方式其实只需要实现接口就行, 样例如下:

import org.apache.pekko.actor.typed.Signal;

/**
 * record 是 Java11 新特性, 用于作为数据容器类
 */
public record OnStart(Long timestamp) implements Signal {

    // 做些内部自己数据操作功能
}

Signal 机制其实更加符合作为二次封装功能, 因为不需要强制声明传输数据类型:

// 这里看到具体实现的 ReceiveBuilder 构建器, 这里需要注意: 实际类型 T 需要强制声明
ReceiveBuilder<T> builder = ReceiveBuilder.create();

// 类似于 ? 未知声明, 也是 Java 新特性
ReceiveBuilder<?> builder = ReceiveBuilder.create();

// 如果采用 onMessage 编写回调拦截必须强制对 T 类型进行衍生
ReceiveBuilder<?> builder = ReceiveBuilder.create();
builder.onMessage(OnOpenEvent.class,(event)->{
    // 异常, 因为回调传入的 event 没办法匹配出传入声明类型
    // Inferred type 'M' for type parameter 'M' is not within its bound; should extend 'T'
    return null;
});


// 而采用 onSignal 就没有这么多顾虑, 不需要明确指定衍生类型
ReceiveBuilder<?> builder = ReceiveBuilder.create();
builder.onSignal(OnOpenEvent.class,(event)->{
    // 直接能够获取 event 对应类型
    return null;
});

这是内部的 actor 需要注意的点, 有的官方文档没有提到所以需要说明下, 接下来就是继续后面的 子actor 部分.

首先需要先定义个回调抽象接口, 需要涵盖以下接口暴露给下层外部 Actor( 其实就是定义状态机当中的所有状态 ):

  • OnOpen: 开启会话
  • OnClose: 关闭会话
  • OnText: 文本回调
  • OnBinary: 二进制回调
  • OnPing: Ping回调
  • OnPong: Pong回调
  • OnError: 错误回调

其实相对来说接口比较好设计, 但是需要思考怎么和 Actor 工作模型集合在一起:

/**
 * 简单的会话回调, 这里的 T 是为了方便后续底层衍生
 */
public interface WebSocketBehavior<T> {

    /**
     * 会话连接时候的回调
     *
     * @param socket WebSocket 会话句柄
     * @return Actor 状态机的状态
     */
    Behavior<T> onOpen(ServerWebSocket socket, ActorRef<?> supervisor);

    /**
     * 推送Open信号给Actor
     *
     * @param socket     WebSocket 会话句柄
     * @param supervisor 推送的 actor 句柄
     */
    void tellOnOpen(ServerWebSocket socket, ActorRef<?> supervisor);


    /**
     * Text内容时候的回调
     *
     * @param socket  会话对象
     * @param message 数据内容
     * @return Actor 状态机的状态
     */
    Behavior<T> onTextMessage(ServerWebSocket socket, String message);

    /**
     * 推送Text内容给Actor
     *
     * @param socket  会话对象
     * @param message 数据内容
     */
    void tellOnTextMessage(ServerWebSocket socket, String message);

    /**
     * Binary内容时候的回调
     *
     * @param socket 会话对象
     * @param buffer 数据内容
     * @return Actor 状态机的状态
     */
    Behavior<T> onBinaryMessage(ServerWebSocket socket, Buffer buffer);

    /**
     * 推送Binary内容给Actor
     *
     * @param socket 会话对象
     * @param buffer 数据内容
     */
    void tellOnBinaryMessage(ServerWebSocket socket, Buffer buffer);


    /**
     * 关闭连接时候的回调
     *
     * @param socket 会话对象
     * @param code   关闭时候的状态码
     * @param reason 关闭时候的消息
     * @return Actor 状态机的状态
     */
    Behavior<T> onClose(ServerWebSocket socket, Short code, String reason);

    /**
     * 推送关闭连接给 actor
     *
     * @param socket 会话对象
     * @param code   关闭时候的状态码
     * @param reason 关闭时候的消息
     */
    void tellOnClose(ServerWebSocket socket, Short code, String reason);


    /**
     * 会话异常时候的回调
     *
     * @param socket 会话对象
     * @param error  异常错误
     * @return Actor 状态机的状态
     */
    Behavior<T> onError(ServerWebSocket socket, Throwable error);


    /**
     * 推送会话异常给 actor
     *
     * @param socket 会话对象
     * @param error  异常错误
     */
    void tellOnError(ServerWebSocket socket, Throwable error);

    /**
     * 用于给底层实现消息拦截器
     *
     * @param builder 消息拦截器句柄
     * @return 最后再封装的的消息拦截器
     */
    default ReceiveBuilder<T> onInit(ReceiveBuilder<T> builder) {
        return builder;
    }


    /**
     * 默认封装起来的的消息拦截器
     *
     * @return 消息拦截器
     */
    default Receive<T> receive() {
        ReceiveBuilder<T> builder = ReceiveBuilder.create();
        builder.onSignal(OnOpenEvent.class, (signal) -> this.onOpen(signal.socket))
                .onSignal(OnTextMessageEvent.class, (event) -> this.onTextMessage(event.socket, event.message))
                .onSignal(OnBinaryMessageEvent.class, (event) -> this.onBinaryMessage(event.socket, event.buffer))
                .onSignal(OnCloseEvent.class, (event) -> this.onClose(event.socket, event.code, event.reason))
                .onSignal(OnErrorEvent.class, (event) -> this.onError(event.socket, event.error))
        ;
        return onInit(builder).build();
    }

    /**
     * 开启连接信号
     *
     * @param socket
     */
    record OnOpenEvent(
            ServerWebSocket socket
    ) implements Signal {
    }

    /**
     * 文本传输信号
     *
     * @param socket
     * @param message
     */
    record OnTextMessageEvent(
            ServerWebSocket socket,
            String message
    ) implements Signal {

    }


    /**
     * 二进制传输信号
     *
     * @param socket
     * @param buffer
     */
    record OnBinaryMessageEvent(
            ServerWebSocket socket,
            Buffer buffer
    ) implements Signal {

    }

    /**
     * 关闭连接信号
     *
     * @param socket
     * @param code
     * @param reason
     */
    record OnCloseEvent(
            ServerWebSocket socket,
            Short code,
            String reason
    ) implements Signal {

    }

    /**
     * 连接异常信号
     *
     * @param socket
     * @param error
     */
    record OnErrorEvent(
            ServerWebSocket socket,
            Throwable error
    ) implements Signal {

    }
}

这样定义接口的好处就是具体实现类不需要关心底层处理细节, 只需要将实现类转为 Actor 对象并实现该接口, 在构建 ActorSystem 的时候引入即可.

如果学习过 ErlangActor 设计, 后续基本上就能知道该怎么去处理, 基本上就是 Supervisor + Worker 架构设计; 很多时候需要借鉴不同开发语言和思想, 从而实现自己心目当中所需的功能.

可以按照自己思路来编, 但是写的时候会出现异常( 第三方库用到了跨线程操作 ):

No message is currently processed by the actor, but ActorContext was called from Thread

后续需要说明的问题: 第三方 Actor 异步处理与多线程处理