MeteorCat / Fortress 3 - 序列化处理

Created Thu, 28 Aug 2025 19:34:47 +0800 Modified Wed, 29 Oct 2025 23:24:54 +0800

序列化处理

现在已经可以通过配置来生成 ActorSystem, 但是现在需要引入 Protobuf 来做序列化传输.

默认 pekko 传输都是依赖 java-serialization 默认序列化做传输, 如果你成功启动服务传输的时候会出现以下提示:

[ERROR] [08/28/2025 11:29:54.811] [fortress-cluster-pekko.remote.default-remote-dispatcher-5] [Encoder(pekko://fortress-cluster)] Failed to serialize message [ActorSelectionMessage(io.fortress.common.event.ConnectedEvent)].
java.io.NotSerializableException: No configured serialization-bindings for class [io.fortress.common.event.ConnectedEvent]

看起来传输的消息结构只需要实现 java.io.Serializable 并且追加以下配置即可:

# Actor 序列化允许采用 Java 默认处理
fortress.actor.settings.pekko.actor.allow-java-serialization=on

这里虽然传输成功, 但是默认是会提示警告:

[WARN] [SECURITY][08/28/2025 12:01:31.345] [fortress-cluster-pekko.remote.default-remote-dispatcher-14] [org.apache.pekko.serialization.Serialization(pekko://fortress-cluster)] Using the Java serializer for class [io.fortress.common.event.ConnectedEvent] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'pekko.actor.warn-about-java-serializer-usage'

其实主要是因为 Javajava.io.Serializable 序列化消息性能很差, 且附带一堆无用的结构数据, 如果你是对速度和效率要求不是那么严格可以采用这种方式, 但是作为高并发的集群那么问题就很大了.

Java 内置序列化通过 ObjectOutputStreamObjectInputStream 实现, 其底层依赖反射机制解析类结构, 带来以下问题:

  • 产生大量临时对象, 增加 GC 压力
  • 序列化过程包含完整的类元数据(类名、字段描述等), 导致字节流体积庞大

这里按照 pekko 官方说明来看, 性能方面选择只有 protobuf 或者手动实现序列化方案.

官方序列化文档: serialization

这里直接引入相关依赖就行了, 需要注意如果采用 ProtobufV3 的序列化需要额外引入 pekko-remote:


<dependencies>

    <!-- Pekko 序列化依赖 -->
    <dependency>
        <groupId>org.apache.pekko</groupId>
        <artifactId>pekko-remote_${pekko.platform.scala-version}</artifactId>
    </dependency>

    <!-- Protobuf 核心依赖 -->
    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
    </dependency>

    <!-- 以上就是序列化工具核心 -->
</dependencies>

之前的 pom.xml 已经配置好默认打包生成功能, 默认在 src/main/proto 目录之中; 这里添加系统事件消息 events.proto:

syntax = "proto3";

// Java 相关配置, 声明 Java 生成在 io.fortress.common.protobuf 包之中
// 实际上默认生成到编译目录的 target/generated-sources/protobuf/java 之中
option java_package = "io.fortress.common.protobuf";

// ===========================================================
// 这里需要的消息名定义规则
//  - C2S: 客户端(client)->集群端(server)
//  - S2C: 集群端(server)->客户端(client)
//  - S2S: 集群端内部交换状态
//
// 这里的 sessionId 实际上就是玩家ID, 为什么采用 string 后续会做说明
//
// 以下事件基本囊括网关方面可能应对事件
//  - 客户端连接
//  - 客户端主动断开
//  - 客户端被动断开
//  - 客户端请求消息
//  - 客户端响应消息
// ===========================================================


// 会话连接事件: 客户端→服务端
message C2SConnectedEvent{
  string sessionId = 1; // 会话ID, 授权之后的玩家ID
  int64 timestamp = 2; // 发起时间戳
  string ip = 3; // 发起的IP地址
}

// 会话断开事件: 客户端→服务端
message C2SDisconnectEvent{
  string sessionId = 1; // 会话ID, 授权之后的玩家ID
  int64 timestamp = 2; // 发起时间戳
}

// 会话请求的消息内容: 客户端→服务端
message C2SMessageEvent{
  string sessionId = 1; // 会话ID, 授权之后的玩家ID
  int32 messageId = 2; // 内容ID
  bytes messageData = 3; // 会话二进制内容
}


// 会话断开事件: 服务端→客户端
message S2CDisconnectEvent{
  string sessionId = 1; // 会话ID, 授权之后的玩家ID
  int64 timestamp = 2; // 发起时间戳
  int32 status = 3; // 断开状态
  string reason = 4; // 断开原因
}


// 会话响应的消息内容: 服务端→客户端
message S2CMessageEvent{
  string sessionId = 1; // 会话ID, 授权之后的玩家ID
  int64 timestamp = 2; // 发起时间戳
  int32 messageId = 3; // 内容ID
  bytes messageData = 4; // 会话二进制内容
}

这里需要执行 compile 命令生成下 Java 的消息结构, 另外还需要将 generated-sources 加入源码扫描:

protobuf

只有这样才能被实时加载到内部代码之中, 现在编写个测试单元用来测试这部分是否成功:

package io.fortress.common;

/**
 * Protobuf测试
 * <p>
 * io.fortress.common.ProtobufTests.java
 */
@QuarkusTest
public class ProtobufTests {


    /**
     * 日志对象
     */
    private static final Logger log = LoggerFactory.getLogger(ProtobufTests.class);

    /**
     * 检测所有事件
     */
    @Test
    public void events() {
        // 测试 C2SConnectedEvent 事件
        Events.C2SConnectedEvent c2SConnectedEvent = Events.C2SConnectedEvent
                .newBuilder()
                .setSessionId(UUID.randomUUID().toString())
                .setTimestamp(System.currentTimeMillis())
                .setIp("127.0.0.1")
                .build();
        Assert.assertNotNull(c2SConnectedEvent);
        Assert.assertTrue(c2SConnectedEvent.toByteArray().length > 0);
        log.info("C2SConnectedEvent: {}", Arrays.toString(c2SConnectedEvent.toByteArray()));

        // 测试 C2SDisconnectEvent 事件
        Events.C2SDisconnectEvent c2SDisconnectEvent = Events.C2SDisconnectEvent
                .newBuilder()
                .setSessionId(UUID.randomUUID().toString())
                .setTimestamp(System.currentTimeMillis())
                .build();
        Assert.assertNotNull(c2SDisconnectEvent);
        Assert.assertTrue(c2SDisconnectEvent.toByteArray().length > 0);
        log.info("C2SDisconnectEvent: {}", Arrays.toString(c2SDisconnectEvent.toByteArray()));


        // 测试 C2SMessageEvent 事件
        Events.C2SMessageEvent c2SMessageEvent = Events.C2SMessageEvent
                .newBuilder()
                .setSessionId(UUID.randomUUID().toString())
                .setMessageId(RandomUtils.insecure().randomInt(1, 1000))
                .setMessageData(ByteString.copyFrom("hello".getBytes()))
                .build();
        Assert.assertNotNull(c2SMessageEvent);
        Assert.assertTrue(c2SMessageEvent.toByteArray().length > 0);
        log.info("C2SMessageEvent: {}", Arrays.toString(c2SMessageEvent.toByteArray()));


        // 测试 S2CDisconnectEvent 事件
        Events.S2CDisconnectEvent s2cDisconnectEvent = Events.S2CDisconnectEvent
                .newBuilder()
                .setSessionId(UUID.randomUUID().toString())
                .setTimestamp(System.currentTimeMillis())
                .setStatus(1000)
                .setReason("NORMAL CLOSE")
                .build();
        Assert.assertNotNull(s2cDisconnectEvent);
        Assert.assertTrue(s2cDisconnectEvent.toByteArray().length > 0);
        log.info("S2CDisconnectEvent: {}", Arrays.toString(s2cDisconnectEvent.toByteArray()));


        // 测试 S2CMessageEvent 事件
        Events.S2CMessageEvent s2cMessageEvent = Events.S2CMessageEvent
                .newBuilder()
                .setSessionId(UUID.randomUUID().toString())
                .setTimestamp(System.currentTimeMillis())
                .setMessageData(ByteString.copyFrom("world".getBytes()))
                .build();
        Assert.assertNotNull(s2cMessageEvent);
        Assert.assertTrue(s2cMessageEvent.toByteArray().length > 0);
        log.info("S2CMessageEvent: {}", Arrays.toString(s2cMessageEvent.toByteArray()));

    }
}

保证这部分事件单元测试成功就行, 一般不会出什么问题, 后面就是怎么将他集成到 pekko 当中的系统输出之中; 这里就是之前关键的测试 test/resources/application.properties 配置:

# 采用 protobuf 做序列化处理
fortress.actor.settings.pekko.actor.serializers.proto=org.apache.pekko.remote.serialization.ProtobufSerializer
# 默认序列化绑定的消息类, 采用Protobuf底层生成继承 com.google.protobuf.Message 的 Java 消息类
fortress.actor.settings.pekko.actor.serialization-bindings."com.google.protobuf.Message"=proto

之后只需要获取 pekko 系统内部的 序列化 工具测试是否能够传入 protobuf 数据类就行了, 在 ActorSystemTests.java 测试类当中追加单元测试:

/**
 * io.fortress.common.ActorSystemTests.java
 * <p>
 * ActorSystem测试单元
 */
@QuarkusTest
public class ActorSystemTests {

    /**
     * 扩展 Actor 系统配置接口
     */
    @ConfigMapping(prefix = "fortress.actor")
    interface ActorSystemConfigurationExt extends ActorSystemConfiguration {
        // 后续 Actor 参数需要扩展就在这里追加方法
    }


    /**
     * 加载测试单元的 application.properties 配置
     */
    @Inject
    ActorSystemConfigurationExt configuration;


    /**
     * 事件测试
     */
    @Test
    public void events() {
        // 创建 ActorSystem
        ActorSystem system = configuration.createActorSystem((terminatedTry -> {
            Assert.assertTrue(terminatedTry.isSuccess());
            return null;
        }));
        Assert.assertNotNull(system);

        // 获取 pekko 的序列化工具
        Serialization serialization = SerializationExtension.get(system);


        // 测试 C2SConnectedEvent 事件
        Events.C2SConnectedEvent c2SConnectedEvent = Events.C2SConnectedEvent
                .newBuilder()
                .setSessionId(UUID.randomUUID().toString())
                .setTimestamp(System.currentTimeMillis())
                .setIp("127.0.0.1")
                .build();
        Try<byte[]> c2SConnectedBuffer = serialization.serialize(c2SConnectedEvent);
        Assert.assertTrue(c2SConnectedBuffer.isSuccess());
        system.log().info("Serialized C2SConnectedEvent: " + Arrays.toString(c2SConnectedBuffer.get()));


        // 测试 C2SDisconnectEvent 事件
        Events.C2SDisconnectEvent c2SDisconnectEvent = Events.C2SDisconnectEvent
                .newBuilder()
                .setSessionId(UUID.randomUUID().toString())
                .setTimestamp(System.currentTimeMillis())
                .build();
        Try<byte[]> c2SDisconnectBuffer = serialization.serialize(c2SDisconnectEvent);
        Assert.assertTrue(c2SDisconnectBuffer.isSuccess());
        system.log().info("Serialized C2SDisconnectEvent: " + Arrays.toString(c2SDisconnectBuffer.get()));

        // 测试 C2SMessageEvent 事件
        Events.C2SMessageEvent c2SMessageEvent = Events.C2SMessageEvent
                .newBuilder()
                .setSessionId(UUID.randomUUID().toString())
                .setMessageId(RandomUtils.insecure().randomInt(1, 1000))
                .setMessageData(ByteString.copyFrom("hello".getBytes()))
                .build();
        Try<byte[]> c2SMessageBuffer = serialization.serialize(c2SMessageEvent);
        Assert.assertTrue(c2SMessageBuffer.isSuccess());
        system.log().info("Serialized C2SMessageEvent: " + Arrays.toString(c2SMessageBuffer.get()));


        // 测试 S2CDisconnectEvent 事件
        Events.S2CDisconnectEvent s2cDisconnectEvent = Events.S2CDisconnectEvent
                .newBuilder()
                .setSessionId(UUID.randomUUID().toString())
                .setTimestamp(System.currentTimeMillis())
                .setStatus(1000)
                .setReason("NORMAL CLOSE")
                .build();
        Try<byte[]> s2cDisconnectBuffer = serialization.serialize(s2cDisconnectEvent);
        Assert.assertTrue(s2cDisconnectBuffer.isSuccess());
        system.log().info("Serialized S2CDisconnectEvent: " + Arrays.toString(s2cDisconnectBuffer.get()));


        // 测试 S2CMessageEvent 事件
        Events.S2CMessageEvent s2cMessageEvent = Events.S2CMessageEvent
                .newBuilder()
                .setSessionId(UUID.randomUUID().toString())
                .setTimestamp(System.currentTimeMillis())
                .setMessageData(ByteString.copyFrom("world".getBytes()))
                .build();
        Try<byte[]> s2cMessageBuffer = serialization.serialize(s2cMessageEvent);
        Assert.assertTrue(s2cMessageBuffer.isSuccess());
        system.log().info("Serialized S2CMessageEvent: " + Arrays.toString(s2cMessageBuffer.get()));
    }
}

最后生成测试单元结果如下:

tests