序列化处理
现在已经可以通过配置来生成 ActorSystem, 但是现在需要引入 Protobuf 来做序列化传输.
默认 pekko 传输都是依赖 java-serialization 默认序列化做传输, 如果你成功启动服务传输的时候会出现以下提示:
[ERROR] [08/28/2025 11:29:54.811] [fortress-cluster-pekko.remote.default-remote-dispatcher-5] [Encoder(pekko://fortress-cluster)] Failed to serialize message [ActorSelectionMessage(io.fortress.common.event.ConnectedEvent)].
java.io.NotSerializableException: No configured serialization-bindings for class [io.fortress.common.event.ConnectedEvent]
看起来传输的消息结构只需要实现 java.io.Serializable 并且追加以下配置即可:
# Actor 序列化允许采用 Java 默认处理
fortress.actor.settings.pekko.actor.allow-java-serialization=on
这里虽然传输成功, 但是默认是会提示警告:
[WARN] [SECURITY][08/28/2025 12:01:31.345] [fortress-cluster-pekko.remote.default-remote-dispatcher-14] [org.apache.pekko.serialization.Serialization(pekko://fortress-cluster)] Using the Java serializer for class [io.fortress.common.event.ConnectedEvent] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'pekko.actor.warn-about-java-serializer-usage'
其实主要是因为 Java 的 java.io.Serializable 序列化消息性能很差, 且附带一堆无用的结构数据,
如果你是对速度和效率要求不是那么严格可以采用这种方式, 但是作为高并发的集群那么问题就很大了.
Java 内置序列化通过 ObjectOutputStream 和 ObjectInputStream 实现, 其底层依赖反射机制解析类结构, 带来以下问题:
- 产生大量临时对象, 增加 GC 压力
- 序列化过程包含完整的类元数据(类名、字段描述等), 导致字节流体积庞大
这里按照 pekko 官方说明来看, 性能方面选择只有 protobuf 或者手动实现序列化方案.
官方序列化文档: serialization
这里直接引入相关依赖就行了, 需要注意如果采用 ProtobufV3 的序列化需要额外引入 pekko-remote:
<dependencies>
<!-- Pekko 序列化依赖 -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-remote_${pekko.platform.scala-version}</artifactId>
</dependency>
<!-- Protobuf 核心依赖 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<!-- 以上就是序列化工具核心 -->
</dependencies>
之前的 pom.xml 已经配置好默认打包生成功能, 默认在 src/main/proto 目录之中;
这里添加系统事件消息 events.proto:
syntax = "proto3";
// Java 相关配置, 声明 Java 生成在 io.fortress.common.protobuf 包之中
// 实际上默认生成到编译目录的 target/generated-sources/protobuf/java 之中
option java_package = "io.fortress.common.protobuf";
// ===========================================================
// 这里需要的消息名定义规则
// - C2S: 客户端(client)->集群端(server)
// - S2C: 集群端(server)->客户端(client)
// - S2S: 集群端内部交换状态
//
// 这里的 sessionId 实际上就是玩家ID, 为什么采用 string 后续会做说明
//
// 以下事件基本囊括网关方面可能应对事件
// - 客户端连接
// - 客户端主动断开
// - 客户端被动断开
// - 客户端请求消息
// - 客户端响应消息
// ===========================================================
// 会话连接事件: 客户端→服务端
message C2SConnectedEvent{
string sessionId = 1; // 会话ID, 授权之后的玩家ID
int64 timestamp = 2; // 发起时间戳
string ip = 3; // 发起的IP地址
}
// 会话断开事件: 客户端→服务端
message C2SDisconnectEvent{
string sessionId = 1; // 会话ID, 授权之后的玩家ID
int64 timestamp = 2; // 发起时间戳
}
// 会话请求的消息内容: 客户端→服务端
message C2SMessageEvent{
string sessionId = 1; // 会话ID, 授权之后的玩家ID
int32 messageId = 2; // 内容ID
bytes messageData = 3; // 会话二进制内容
}
// 会话断开事件: 服务端→客户端
message S2CDisconnectEvent{
string sessionId = 1; // 会话ID, 授权之后的玩家ID
int64 timestamp = 2; // 发起时间戳
int32 status = 3; // 断开状态
string reason = 4; // 断开原因
}
// 会话响应的消息内容: 服务端→客户端
message S2CMessageEvent{
string sessionId = 1; // 会话ID, 授权之后的玩家ID
int64 timestamp = 2; // 发起时间戳
int32 messageId = 3; // 内容ID
bytes messageData = 4; // 会话二进制内容
}
这里需要执行 compile 命令生成下 Java 的消息结构, 另外还需要将 generated-sources 加入源码扫描:
只有这样才能被实时加载到内部代码之中, 现在编写个测试单元用来测试这部分是否成功:
package io.fortress.common;
/**
* Protobuf测试
* <p>
* io.fortress.common.ProtobufTests.java
*/
@QuarkusTest
public class ProtobufTests {
/**
* 日志对象
*/
private static final Logger log = LoggerFactory.getLogger(ProtobufTests.class);
/**
* 检测所有事件
*/
@Test
public void events() {
// 测试 C2SConnectedEvent 事件
Events.C2SConnectedEvent c2SConnectedEvent = Events.C2SConnectedEvent
.newBuilder()
.setSessionId(UUID.randomUUID().toString())
.setTimestamp(System.currentTimeMillis())
.setIp("127.0.0.1")
.build();
Assert.assertNotNull(c2SConnectedEvent);
Assert.assertTrue(c2SConnectedEvent.toByteArray().length > 0);
log.info("C2SConnectedEvent: {}", Arrays.toString(c2SConnectedEvent.toByteArray()));
// 测试 C2SDisconnectEvent 事件
Events.C2SDisconnectEvent c2SDisconnectEvent = Events.C2SDisconnectEvent
.newBuilder()
.setSessionId(UUID.randomUUID().toString())
.setTimestamp(System.currentTimeMillis())
.build();
Assert.assertNotNull(c2SDisconnectEvent);
Assert.assertTrue(c2SDisconnectEvent.toByteArray().length > 0);
log.info("C2SDisconnectEvent: {}", Arrays.toString(c2SDisconnectEvent.toByteArray()));
// 测试 C2SMessageEvent 事件
Events.C2SMessageEvent c2SMessageEvent = Events.C2SMessageEvent
.newBuilder()
.setSessionId(UUID.randomUUID().toString())
.setMessageId(RandomUtils.insecure().randomInt(1, 1000))
.setMessageData(ByteString.copyFrom("hello".getBytes()))
.build();
Assert.assertNotNull(c2SMessageEvent);
Assert.assertTrue(c2SMessageEvent.toByteArray().length > 0);
log.info("C2SMessageEvent: {}", Arrays.toString(c2SMessageEvent.toByteArray()));
// 测试 S2CDisconnectEvent 事件
Events.S2CDisconnectEvent s2cDisconnectEvent = Events.S2CDisconnectEvent
.newBuilder()
.setSessionId(UUID.randomUUID().toString())
.setTimestamp(System.currentTimeMillis())
.setStatus(1000)
.setReason("NORMAL CLOSE")
.build();
Assert.assertNotNull(s2cDisconnectEvent);
Assert.assertTrue(s2cDisconnectEvent.toByteArray().length > 0);
log.info("S2CDisconnectEvent: {}", Arrays.toString(s2cDisconnectEvent.toByteArray()));
// 测试 S2CMessageEvent 事件
Events.S2CMessageEvent s2cMessageEvent = Events.S2CMessageEvent
.newBuilder()
.setSessionId(UUID.randomUUID().toString())
.setTimestamp(System.currentTimeMillis())
.setMessageData(ByteString.copyFrom("world".getBytes()))
.build();
Assert.assertNotNull(s2cMessageEvent);
Assert.assertTrue(s2cMessageEvent.toByteArray().length > 0);
log.info("S2CMessageEvent: {}", Arrays.toString(s2cMessageEvent.toByteArray()));
}
}
保证这部分事件单元测试成功就行, 一般不会出什么问题, 后面就是怎么将他集成到 pekko 当中的系统输出之中;
这里就是之前关键的测试 test/resources/application.properties 配置:
# 采用 protobuf 做序列化处理
fortress.actor.settings.pekko.actor.serializers.proto=org.apache.pekko.remote.serialization.ProtobufSerializer
# 默认序列化绑定的消息类, 采用Protobuf底层生成继承 com.google.protobuf.Message 的 Java 消息类
fortress.actor.settings.pekko.actor.serialization-bindings."com.google.protobuf.Message"=proto
之后只需要获取 pekko 系统内部的 序列化 工具测试是否能够传入 protobuf 数据类就行了,
在 ActorSystemTests.java 测试类当中追加单元测试:
/**
* io.fortress.common.ActorSystemTests.java
* <p>
* ActorSystem测试单元
*/
@QuarkusTest
public class ActorSystemTests {
/**
* 扩展 Actor 系统配置接口
*/
@ConfigMapping(prefix = "fortress.actor")
interface ActorSystemConfigurationExt extends ActorSystemConfiguration {
// 后续 Actor 参数需要扩展就在这里追加方法
}
/**
* 加载测试单元的 application.properties 配置
*/
@Inject
ActorSystemConfigurationExt configuration;
/**
* 事件测试
*/
@Test
public void events() {
// 创建 ActorSystem
ActorSystem system = configuration.createActorSystem((terminatedTry -> {
Assert.assertTrue(terminatedTry.isSuccess());
return null;
}));
Assert.assertNotNull(system);
// 获取 pekko 的序列化工具
Serialization serialization = SerializationExtension.get(system);
// 测试 C2SConnectedEvent 事件
Events.C2SConnectedEvent c2SConnectedEvent = Events.C2SConnectedEvent
.newBuilder()
.setSessionId(UUID.randomUUID().toString())
.setTimestamp(System.currentTimeMillis())
.setIp("127.0.0.1")
.build();
Try<byte[]> c2SConnectedBuffer = serialization.serialize(c2SConnectedEvent);
Assert.assertTrue(c2SConnectedBuffer.isSuccess());
system.log().info("Serialized C2SConnectedEvent: " + Arrays.toString(c2SConnectedBuffer.get()));
// 测试 C2SDisconnectEvent 事件
Events.C2SDisconnectEvent c2SDisconnectEvent = Events.C2SDisconnectEvent
.newBuilder()
.setSessionId(UUID.randomUUID().toString())
.setTimestamp(System.currentTimeMillis())
.build();
Try<byte[]> c2SDisconnectBuffer = serialization.serialize(c2SDisconnectEvent);
Assert.assertTrue(c2SDisconnectBuffer.isSuccess());
system.log().info("Serialized C2SDisconnectEvent: " + Arrays.toString(c2SDisconnectBuffer.get()));
// 测试 C2SMessageEvent 事件
Events.C2SMessageEvent c2SMessageEvent = Events.C2SMessageEvent
.newBuilder()
.setSessionId(UUID.randomUUID().toString())
.setMessageId(RandomUtils.insecure().randomInt(1, 1000))
.setMessageData(ByteString.copyFrom("hello".getBytes()))
.build();
Try<byte[]> c2SMessageBuffer = serialization.serialize(c2SMessageEvent);
Assert.assertTrue(c2SMessageBuffer.isSuccess());
system.log().info("Serialized C2SMessageEvent: " + Arrays.toString(c2SMessageBuffer.get()));
// 测试 S2CDisconnectEvent 事件
Events.S2CDisconnectEvent s2cDisconnectEvent = Events.S2CDisconnectEvent
.newBuilder()
.setSessionId(UUID.randomUUID().toString())
.setTimestamp(System.currentTimeMillis())
.setStatus(1000)
.setReason("NORMAL CLOSE")
.build();
Try<byte[]> s2cDisconnectBuffer = serialization.serialize(s2cDisconnectEvent);
Assert.assertTrue(s2cDisconnectBuffer.isSuccess());
system.log().info("Serialized S2CDisconnectEvent: " + Arrays.toString(s2cDisconnectBuffer.get()));
// 测试 S2CMessageEvent 事件
Events.S2CMessageEvent s2cMessageEvent = Events.S2CMessageEvent
.newBuilder()
.setSessionId(UUID.randomUUID().toString())
.setTimestamp(System.currentTimeMillis())
.setMessageData(ByteString.copyFrom("world".getBytes()))
.build();
Try<byte[]> s2cMessageBuffer = serialization.serialize(s2cMessageEvent);
Assert.assertTrue(s2cMessageBuffer.isSuccess());
system.log().info("Serialized S2CMessageEvent: " + Arrays.toString(s2cMessageBuffer.get()));
}
}
最后生成测试单元结果如下: