MeteorCat / Pekko和VertX互相集成(一)

Created Wed, 07 May 2025 13:51:05 +0800 Modified Wed, 29 Oct 2025 23:25:05 +0800
2718 Words

最近需要部署搭建个 WebSocket 的游戏网关服务端, 考虑之后从 Java 之中选取设计方案; 而这里首先排除 Spring 全家桶系列, 因为内部集成太过冗余且对底层操作也比较麻烦, 最后敲定方案采用仅仅对网络工具方法抽象的 Vert-X.

这里 Actor 最开始考虑 akka, 但是目前最新版本已经从开源协议转为商业协议, 为了后续规避潜在的商业纠纷最后采用同源的开源替代 pekko.

而消息交换协议, 最开始在单纯二进制和 protbuf 之间考虑, 主要问题是 protobuf 协议生成的类文件对于项目侵入太严重; 所以考虑在三之后选择性能更好且通用型更强的 msgpack.

建议对 WebSocket 协议规范 rfc6455 进行学习, 因为相对于 TCP 来说底层已经做好 帧(Frame) 概念处理, 所以直接只用处理消息接收和推送即可.

对于 WebSocket 来说基本上传递数据格式已经内置数据类型:

  • 0x1: 文本内容
  • 0x2: 二进制内容
  • 0x8: 关闭连接
  • 0x9: Ping
  • 0xA: Pong

这主要是区分好消息格式发送什么, 用来推送给 Actor 内部消息来处理不同的业务逻辑.

另外需要设计好 actor 拓扑图, 需要对 actor 功能做基本设计分布:

[ActorSystem,最基础的系统]
           |
           |
[Supervisor,核心管理器负责监控所有 Worker] --- [Worker:WebSocket,网络层负责协调客户端和Actor关联,同时负责动态会话创建]
                                        |           |
                                        |           |
                                        |           |- [Worker * N:(UUID) Session, 每个会话都动态创建 actor]
                                        |           |
                                        |           |- [会话Actor是不断动态创建的]......
                                        |
                                        |- [Proxy:Cluster,集群业务代理转发]

我这里是按照以前项目 ErlangMMO 框架设计出来的基本网络架构, 每个启动游戏进程其实就是单服, 比如 微信1区 对应 192.168.1.10 单台服务器的 进程A, 微信2区 对应 192.168.1.10 单台服务器的 进程B, 以此类推出扩展手动选服的游戏服务器.

为什么要每个会话动态创建 Actor , 而不是构建单个 Actor 来读写?

官方 akka/pekko 对于 actor 要求十分明确: 状态隔离任务分解.

这里可以设想下, 如果采用单 actor 作为 Socket 管理会出现什么情况:

  • 崩溃连锁: 假设某个原因某个 Session 异常导致线程崩溃的时候, 所有 Socket 需要一起崩溃吗?
  • 请求频繁: Actor 模型单个对象本身负载能力有限, 基于消息队列转发 Socket 消息会造成大量消息堆积和产生死信

日常可能出问题就上面这些, 其中频繁请求导致队列堆积就是造成单个 Actor 占用异常的最大问题

上面就是必须思考的问题, 包括后续做业务的时候需要考虑 怎么把 Actor 功能分流缓解请求压力.

另外还有个依赖问题, 也就是比较经典的: 是 VertX 依赖 Pekko ? 还是 Pekko 依赖 VertX ?

  • Vert-X 在类内部再启动 Pekko 初始化 ActorSystem
  • Pekko 初始化之后在 ActorSystem 挂载 Vert-X 单独服务

其实从上面就已经看出来了, 当引入 Actor-FSM 概念的时候就要明确: 所有的服务都要抽象单个或多个Actor.

Actor 内部其实就是 有限状态机(FSM,Finite State Machine)

还有一点就是 必须要为外部控制关闭|停止服务器做好预留接口, 这样才能直接手动在即时执行游戏维护的关闭服务器操作.

那么这样抽象出来的话, Vert-X-Actor 需要负担以下系统功能:

  • OnStart: 启动 WebSocket 服务
  • OnStop: 关闭 WebSocket 服务
  • OnRestart: 可有可无的的重启功能, 实际上就是 Stop->Start 启动服务
  • OnConnect: 客户端会话连接加入时候动态创建 Actor
  • OnClose: 客户端会话连接关闭时候需要通知 Actor 关闭和退出
  • OnRequest: 消息请求事件, 需要区分 WebSocketTextBinary 等类型
  • OnResponse: 消息响应事件, 由 Actor 内部发起通知客户端

为了降低理解难度, 这里都是基于 本地Actor 来做开发, 而不涉及 集群Actor 等业务拆分

这里都是提供抽象成 事件(Event) 或者 信号(Signal) 处理, 但是需要先学习概念: 依赖反转.

之所以要学习 依赖反转 概念是要学习怎么把 Vert-XPekko 集成在一起, 不要像下面一样编写出来的功能:

class WebSocketActor {
    private Vertx vertx;// Vert运行时
    private HttpServer server; // WebSocket服务

    /**
     * 构造方法
     */
    public WebSocketActor() {
        this.vertx = Vertx.vert();
        this.server = vertx.createHttpServer();
    }
}

// 直接外部启用初始化
// var listen = new WebSocketActor();

像这种功能类实例化看起来没有问题, 但其实内部扩展性是有问题的: 如果外部需要 Vertx 重用的是否该怎么办?

// 单个运行时
var vertx = Vertx.vert();

// 需要创建WebSocket应用A
var server = vertx.createHttpServer();

// 内部集成的功能类该怎么办? 没办法引用到同个 vertx 运行时
var listen = new WebSocketActor();
// 难道每个服务都创建独立不共享的 Vert 运行时吗?
// 如果把功能过于集中内部, 那么以后做功能迭代维护的时候就会很麻烦
// 而且这个功能后续可能会被频繁用到, 所以更加需要构建成通用组件来处理

所以在后续应该将 Vert-X 运行时作为组件引入, 这里需要采用 依赖反转 :

/**
 * 泛型的通用事件抽象
 * <p>
 * 所有的事件对象都是需要继承这个抽象通用类, 就像以下示例:
 * <pre>
 * class StopEvent implements IEvent<Long>{
 *  private Long timestamp;
 *  public Long message(){
 *      return timestamp;
 *  }
 * }
 * </pre>
 *
 * @param <T> 消息传递类型
 */
public interface IEvent<T> {

    /**
     * 获取消息主体内容
     */
    T message();
}

以上就是通用消息结构, 用于对消息进行统一规范化传递; 之后就是抽象出来的 Actor 管理器:

import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.Props;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Receive;
import org.apache.pekko.actor.typed.javadsl.ReceiveBuilder;

import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;

/**
 * 抽象的 ActorSupervisor
 * <p>
 * 内部采用 Map 做映射管理多个 actorRef 做 worker 容器,
 * 如果想要使用必须继承该抽象类从而去实现底层方法
 *
 * @param <T> 消息类型
 */
public abstract class AbstractSupervisor<T> extends AbstractBehavior<T> {

    /**
     * 工作任务 Actor 池
     */
    protected final Map<String, ActorRef<T>> workers;

    /**
     * 拦截消息监听器
     */
    protected final ReceiveBuilder<T> builder = ReceiveBuilder.create();


    /**
     * 默认构造方法
     *
     * @param context Actor上下文
     */
    public AbstractSupervisor(ActorContext<T> context) {
        super(context);
        // 采用系统默认MAP的容量
        this.workers = new HashMap<>();
    }

    /**
     * 默认构造方法
     *
     * @param context  Actor上下文
     * @param capacity Map的元素容量
     */
    public AbstractSupervisor(ActorContext<T> context, int capacity) {
        super(context);
        this.workers = new HashMap<>(capacity);
    }


    /**
     * 添加Actor节点
     *
     * @param behavior 子Actor运行状态
     * @param name     子Actor的节点名称
     * @return ActorRef
     */
    public ActorRef<T> spawn(Behavior<T> behavior, String name) {
        final var ctx = getContext();
        final var ref = ctx.spawn(behavior, name);
        return workers.put(name, ref);
    }


    /**
     * 添加Actor节点
     *
     * @param behavior 子Actor运行状态
     * @param name     子Actor的节点名称
     * @param props    子Actor的属性
     * @return ActorRef
     */
    public ActorRef<T> spawn(Behavior<T> behavior, String name, Props props) {
        final var ctx = getContext();
        final var ref = ctx.spawn(behavior, name, props);
        return workers.put(name, ref);
    }

    /**
     * 回调方法做初始化子Actor
     *
     * @param creator 子Actor构建器
     */
    public void spawn(BiConsumer<ActorContext<T>, Map<String, ActorRef<T>>> creator) {
        creator.accept(getContext(), workers);
    }

    /**
     * 停止Actor节点
     *
     * @param name 子Actor的节点名称
     * @return ActorRef|null
     */
    public ActorRef<T> stop(String name) {
        final var ctx = getContext();
        final var ref = workers.get(name);
        if (ref != null) {
            ctx.stop(ref);
            return workers.remove(name);
        }
        return null;
    }


    /**
     * 获取消息拦截器
     *
     * @return ReceiveBuilder
     */
    public ReceiveBuilder<T> getReceiveBuilder() {
        return builder;
    }


    /**
     * 默认消息拦截器构建方法
     *
     * @return Receive
     */
    @Override
    public Receive<T> createReceive() {
        return builder.build();
    }
}

后续就是去实现 Actor 管理器, 从而具体集成搭建第三方包功能, 这里实现个简单的 回写(echo) 服务:

import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;

import java.util.function.Consumer;

/**
 * 回写Actor管理器
 */
public final class EchoSupervisor extends AbstractSupervisor<IEvent<String>> {

    /**
     * 内部定义的消息传递包
     *
     * @param msg
     */
    public record Message(
            String msg
    ) implements IEvent<String> {

        @Override
        public String message() {
            return msg;
        }
    }

    /**
     * 不允许外部实例化
     *
     * @param context Actor上下文
     */
    private EchoSupervisor(ActorContext<IEvent<String>> context) {
        super(context);
    }

    /**
     * 不允许外部实例化
     *
     * @param context  Actor上下文
     * @param capacity 子Actor容器大小
     */
    private EchoSupervisor(ActorContext<IEvent<String>> context, int capacity) {
        super(context, capacity);
    }


    /**
     * 静态构建
     */
    public static Behavior<IEvent<String>> create() {
        return Behaviors.setup(EchoSupervisor::new);
    }

    /**
     * 静态构建
     */
    public static Behavior<IEvent<String>> create(Consumer<EchoSupervisor> creator) {
        return Behaviors.setup(ctx -> {
            final var owner = new EchoSupervisor(ctx);
            creator.accept(owner);
            return owner;
        });
    }

}

之后就是简单入口调用实现:

/**
 * 启动应用入口
 */
public class WebSocketApp {

    /**
     * 入口方法
     */
    public static void main(String[] args) {
        // 创建 echo 管理器, 并且移交 WebSocketApp::echo 静态函数来对内部消息进行 hook
        final var echoSupervisor = EchoSupervisor.create(WebSocketApp::echo);

        // 创建 echo 运行时的系统
        final var echoActor = ActorSystem.create(
                echoSupervisor,
                "echo");

        // 发送给 echo actor 消息让其调用事件
        echoActor.tell(new EchoSupervisor.Message("Hello.World"));
    }

    /**
     * Actor管理器回调函数
     *
     * @param supervisor Actor 运行管理器
     */
    public static void echo(EchoSupervisor supervisor) {
        final var builder = supervisor.getReceiveBuilder();
        builder.onMessage(EchoSupervisor.Message.class, (event) -> {
            System.out.println("Actor Message: " + event.message());
            return supervisor;
        });
    }
}

可以看出来内部只对功能进行简单的封装, 主要业务全部是在外部拦截并且转发从而实现 高内聚低耦合 的状态, 启动执行之后就能看到具体 Actor 消息转发执行显示内容了.