Pekko Protobuf 技巧

Pekko 是支持将数据对象作为 ProtobufV2/V3 序列化转发, 而且官方集群数据传输方案实现了这部分功能.

序列化说明: https://pekko.apache.org/docs/pekko/current/serialization.html#serialization-of-pekkos-messages

目前官方有两套 Protobuf 序列化方案支持, 比较推荐采用 pekko-protobuf-v3:

如果没有客户端需求, 一般只有集群转发会引用, 而集群处理默认已经自动引入 protobuf 支持.

而如果客户端需要依赖 Protobuf 传输数据而你的 Actor 服务目前还用不到集群功能的时候就需要手动来引入类库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
<!-- Maven 依赖 -->
<project>
<!-- 全局属性 -->
<properties>
<!-- protobuf 依赖 -->
<protobuf.platform.artifact-id>protobuf-bom</protobuf.platform.artifact-id>
<protobuf.platform.group-id>com.google.protobuf</protobuf.platform.group-id>
<protobuf.platform.version>4.32.0</protobuf.platform.version>

<!-- 平台检测插件 -->
<protobuf.plugin.group-id>org.xolstice.maven.plugins</protobuf.plugin.group-id>
<protobuf.plugin.version>0.6.1</protobuf.plugin.version>

<!-- protoc 插件判断平台相关插件 -->
<os.plugin.group-id>kr.motd.maven</os.plugin.group-id>
<os.plugin.version>1.6.2</os.plugin.version>


<!-- protobuf 插件 -->
<protobuf.plugin.group-id>org.xolstice.maven.plugins</protobuf.plugin.group-id>
<protobuf.plugin.version>0.6.1</protobuf.plugin.version>

<!-- protoc 编译器 -->
<protobuf.compiler.artifact-id>protoc</protobuf.compiler.artifact-id>
<protobuf.compiler.group-id>com.google.protobuf</protobuf.compiler.group-id>
<protobuf.compiler.version>${protobuf.platform.version}</protobuf.compiler.version>
</properties>

<!-- 全局版本管理 -->
<dependencyManagement>
<dependencies>

<!-- protobuf 核心依赖 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.platform.version}</version>
</dependency>
</dependencies>
</dependencyManagement>


<!-- 第三方类库引入 -->
<dependencies>
<!-- 这里采用经典方式 Actor -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>

<!-- Pekko Protobuf 类库依赖, pekko 只是实现了底层接口, 具体实现依赖还需要 google-protobuf -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-protobuf-v3_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>

<!-- Protobuf 核心依赖, 内部包含有 protbuf 全部接口功能实现 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>

<!-- SLF4J 桥接依赖, 必须添加 -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-slf4j_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>

<!-- slf4j 日志API -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>

<!-- SLF4J 简单实现 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.9</version>
<scope>runtime</scope>
</dependency>
</dependencies>

<!-- 扩展打包编译配置 -->
<build>

<!-- 扩展插件 -->
<extensions>
<!-- 检测提供给 Protobuf 提供系统平台编译插件, 也就是提供 ${os.detected.classifier} 这个平台变量 -->
<extension>
<groupId>${os.plugin.group-id}</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>${os.plugin.version}</version>
</extension>
</extensions>


<plugins>

<!-- Protobuf 打包生成 Java 绑定插件 -->
<!-- https://www.xolstice.org/protobuf-maven-plugin/compile-mojo.html -->
<plugin>
<groupId>${protobuf.plugin.group-id}</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf.plugin.version}</version>
<extensions>true</extensions>
<configuration>
<!-- Protobuf 编译器版本 -->
<protocArtifact>
${protobuf.compiler.group-id}:${protobuf.compiler.artifact-id}:${protobuf.compiler.version}:exe:${os.detected.classifier}
</protocArtifact>

<!-- *.proto 源文件路径, 直接默认即可(默认就是在 src/main/proto 目录), 系统事件不需要其他共享 -->
<!-- <protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>-->

<!-- 生成Java文件, 直接默认就行, IDEA会自动识别到这部分代码(IDEA没办法实时检测引入, 所以常常生成之后发现没有引入) -->
<!-- 可以直接生成引入到 main/java自己的项目目录, 这样 IDEA 能够直接检测出来, 不过要小心加层目录避免覆盖清空 -->
<!--<outputDirectory>${project.basedir}/src/main/java</outputDirectory>-->

<!-- 是否生成之前清空目录, 最好不要随便乱动 -->
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>

<!-- 执行时机 -->
<executions>
<!-- 执行 mvn compile 的时候打包生成 Java 文件 -->
<execution>
<id>compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>
</build>
</project>

之后生成个 Protobuf 文件来测试下是否能够正确生成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 声明采用 proto v3 版本处理
// 只要触发 compile 指令就会动态生成在 target/generated-sources 目录之下
syntax = "proto3";

// 针对 Java 生成放置于那个包目录
// 一般最好生成在 {自己包}.protobuf 之中, 最好避免这个目录在自己包存在
// 一定要避免同名包目录在自己业务存在, 否则可能会出现目录内部文件被覆盖的问题
option java_package = "me.meteorcat.game.protobuf";

// 官方推荐将对应消息全部拆分成不同的类对象, 否则会将单个文件所有的 message 全部合并成在单个类文件
option java_multiple_files = true;

// 回显数据: 二进制数据
message EchoBytesCommand{
int32 tick = 1; // 提交 int32 值, 经过服务端的之后增长1
bytes data = 2; // 提交的二进制数据
}

// 回显数据: 文本数据
message EchoStringCommand{
int32 tick = 1; // 同上
string data = 2; // 文本数据
}

最后跑个测试代码运行是否正确序列化即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import me.meteorcat.game.protobuf.EchoBytesCommand;
import me.meteorcat.game.protobuf.EchoStringCommand;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;

/**
* Pekko Protobuf 消息
*/
public class PekkoProtobufMessage {


/**
* 服务入口
*/
public static void main(String[] args) {
String message = "Hello.World"; // 文本
byte[] messageBytes = message.getBytes(StandardCharsets.UTF_8); // 二进制

// 打包成文本
EchoStringCommand.Builder stringBuilder = EchoStringCommand.newBuilder();
stringBuilder.setTick(100);
stringBuilder.setData(message);

// 文本构建成二进制数据传输
EchoStringCommand stringCommand = stringBuilder.build();
byte[] stringBytes = stringCommand.toByteArray();

try {
// 二进制转回结构体
EchoStringCommand newStringCommand = EchoStringCommand.parseFrom(stringBytes);

// 打印数据
System.out.printf("文本消息信息: %s, 结构体: {%s}%n", Arrays.toString(stringBytes), newStringCommand);
} catch (InvalidProtocolBufferException e) {
// 解析错误拦截
System.err.println(e.getMessage());
}


// 打包成二进制
EchoBytesCommand.Builder bytesBuilder = EchoBytesCommand.newBuilder();
bytesBuilder.setTick(101);
bytesBuilder.setData(ByteString.copyFrom(messageBytes));

// 二进制打包等待传输
EchoBytesCommand bytesCommand = bytesBuilder.build();
byte[] bytes = bytesCommand.toByteArray();

try {
// 二进制转回结构体
EchoBytesCommand newBytesCommand = EchoBytesCommand.parseFrom(bytes);

// 打印数据
System.out.printf("二进制消息信息: %s, 结构体: {%s}%n", Arrays.toString(bytes), newBytesCommand);
} catch (InvalidProtocolBufferException e) {
// 解析错误拦截
System.err.println(e.getMessage());
}

}
}

测试之后没什么大问题就代表成功引入 Protbuf, 接下来就是引入另外概念: 角色信息需要设置成 Protobuf 吗?

数据体 Protobuf

以前 Erlang 项目当中看到的情况, 以下就是玩家信息定义, 直接把玩家信息 record 定义成全局实体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
%%% 定义保护宏, 防止多次引入的重复定义
-ifndef(__ERLANG_PLAYER_RECORD__).
-define(__ERLANG_PLAYER_RECORD__, 1).

%% 玩家实体
-record(player, {
uid = 0, % 用户ID
sid = 0, % 服务器ID
socket = ?null, % 用户连接 Socket 对象
pid = ?null, % 玩家所在进程id
scene = ?null, % 场景进程


nickname = ?null, % 玩家昵称
money = 0, % 玩家充值货币金额, 单位:分
lv = 1, % 玩家等级
exp = 0, % 玩家经验值
attr = ?null, % 角色属性(最终结算数值)

net = ?null % 网络I/O对象
}).


-endif.

Erlang 的 record 相当于 Java 的 record, 也是作为数据载体负责数据管理, 在 Pekko + Protobuf 之中是可以直接声明 Proto:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
syntax = "proto3";
option java_package = "me.meteorcat.game.protobuf";
option java_multiple_files = true;



// 玩家实体对象
// 这里要做好数据序号位置来分配数据的类型
// 比如 1~99 是归属于后续扩展的系统参数
// 而 100~999 是归属于玩家角色信息之类
// 同时 1000~9999+ 是归属于 Actor 当前会话的属性
// 其他还有什么战斗属性和 buffer 属性也是按照区分对应
// 一定字段确定好字段, 那么后续不允许改动, 如果需要修改则是要保持格式重新追加新字段
// 虽然 uint32/uint64 很节省空间, 但是 unsigned 在某些变成语言当中会造成问题, 所以可以的话直接直接采用 int32/int64
message PlayerEntity {
// 基础信息
int64 uid = 1; // 用户 ID
int64 sid = 2; // 服务器 ID
int64 rid = 3; // 角色 ID
string uuid = 4; // ActorUUID 唯一标识, 可用于跨 Actor 推送消息
int64 online = 5; // 在线时间(毫秒数) 计算方式为退出 Actor 的时候, online = (System.currentTimeMillis() - uptime) + online


// 角色对应信息
string nickname = 100; // 昵称信息
int32 money = 101; // 金币货币(也有的叫gold等), 也就是游戏内非充值的货币单位, 也就是游戏能自己产出的货币
int32 currency = 102; // 充值货币(也有的叫gem/diamond), 采用外部现金比例的兑换货币单位, 约等于Q币这种充值货币
int32 stamina = 103; // 疲劳值|耐力值(也有的叫action|health), 用于某些关卡入场的点数, 一般会按照策划配表按时间递增恢复到满值


// 角色等级相关信息最好额外剔除分隔, 因为后续可能会单独有分组(比如记录角色攻击力/防御力/速度等)
int32 level = 200; // 角色等级
int64 experience = 201; // 角色经验值


// 会话属性
int64 uptime = 1000; // Actor 挂载时间, 一般也可以当作记录为登陆时间戳 System.currentTimeMillis()
}

这样设计的玩家实体就注册成 Protobuf 对象, 也就是能直接用于 Actor 传递处理(我这里还是使用经典 Actor 而非 TypedActor):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import me.meteorcat.game.protobuf.PlayerEntity;
import org.apache.pekko.actor.*;
import org.apache.pekko.event.Logging;
import org.apache.pekko.event.LoggingAdapter;
import scala.concurrent.ExecutionContextExecutor;

import java.io.IOException;
import java.time.Duration;
import java.util.Objects;

/**
* Pekko Protobuf 示例
*/
public class PekkoProtobufExample {

/**
* 服务入口
*/
public static void main(String[] args) throws IOException {

// 构建 Actor 系统
ActorSystem system = ActorSystem.create("pekko-protobuf");

// 挂载 Actor 服务
ActorRef actorRef = system.actorOf(PekkoPlayerManager.props(), "player-manager");

// 这里采用任意输入回车中断监听, 如果未做任何键盘操作则是保持运行
System.out.println("Press RETURN to stop...");
int ignore = System.in.read();
system.terminate();
}


/**
* Pekko 玩家实体挂载
*/
public static class PekkoPlayerManager extends AbstractActor {


/**
* 日志对象
*/
final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);


/**
* 调度器
*/
final ExecutionContextExecutor dispatcher = getContext().getDispatcher();

/**
* 定时器
*/
final Scheduler scheduler = getContext().getSystem().getScheduler();


/**
* 玩家实体
*/
PlayerEntity entity;


/**
* 取消定时落地任务句柄
*/
final Cancellable store;


/**
* 取消体力增长任务句柄
*/
final Cancellable stamina;

/**
* 保存玩家实体的命令
*/
private record StoreCommand() {

private static StoreCommand self;

public static StoreCommand getInstance() {
if (Objects.isNull(self)) self = new StoreCommand();
return self;
}
}

/**
* 体力值|疲劳值增长的命令
*/
private record StaminaCommand() {
private static StaminaCommand self;

public static StaminaCommand getInstance() {
if (Objects.isNull(self)) self = new StaminaCommand();
return self;
}
}


/**
* 私有构建
*/
private PekkoPlayerManager(PlayerEntity entity, Duration storeDuration, Duration staminaDuration) {
this.entity = entity;
this.store = scheduler.scheduleAtFixedRate(
storeDuration, // 初始延迟
storeDuration, // 间隔延迟
getSelf(), // 目标 Actor
StoreCommand.getInstance(), // 发送的消息
dispatcher,
ActorRef.noSender());

this.stamina = scheduler.scheduleAtFixedRate(
staminaDuration, // 初始延迟
staminaDuration, // 间隔延迟
getSelf(), // 目标 Actor
StaminaCommand.getInstance(), // 发送的消息
dispatcher,
ActorRef.noSender());
}

/**
* 静态构建
*/
public static Props props() {

// 一般是数据库加载的上来的, 这里直接写死模拟已经从数据加载
PlayerEntity.Builder builder = PlayerEntity.newBuilder();
builder.setUid(10001);
builder.setSid(1);
builder.setRid(100);
builder.setNickname("MeteorCat");
builder.setUptime(System.currentTimeMillis());


// 构建的时候塞入玩家实体, 之后 Actor 读写就是基于内存上面 PlayerEntity 对象处理
// 这里假设 30s 落地一次数据库实体, 每 10s 增加一点体力值|疲劳值
return Props.create(PekkoPlayerManager.class, () -> new PekkoPlayerManager(
builder.build(),
Duration.ofSeconds(30),
Duration.ofSeconds(10)
));
}


/**
* 初始化 Actor 回调
*/
@Override
public void preStart() {
log.info("玩家挂载: {}", entity);

}

/**
* 退出 Actor 回调
*/
@Override
public void postStop() {
// 取消定时任务
if (Objects.nonNull(store)) store.cancel();
if (Objects.nonNull(stamina)) stamina.cancel();

// 最后触发保存实体对象
if (Objects.nonNull(entity)) storeEntity(StoreCommand.getInstance());

}

/**
* Actor 状态机监控处理, 防止单条消息处理失败导致 Actor 崩溃
*/
@Override
public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(
3,// 单条消息异常重试次数
Duration.ofSeconds(10), // 重试窗口时间
throwable -> {
log.warning(throwable, "玩家消息处理异常");
// 异常时重启 Actor, 内部的 entity 会保留
return SupervisorStrategy.restart();
}
);
}

/**
* 消息拦截
*/
@Override
public Receive createReceive() {
return receiveBuilder()
.match(StoreCommand.class, this::storeEntity)
.match(StaminaCommand.class, this::staminaCommand)
.build();
}

/**
* 增长体力值 +1
*/
private void staminaCommand(StaminaCommand ignore) {
// 这里加个限制, 超过100点直接不需要增长的, 一般策划才会配置
if (this.entity.getStamina() >= 100) return;
log.info("玩家体力增长");

// 注意: Entity 实体只允许直接替换整个对象而不能修改其中某个值, 所以需要重新构建成 Builder
PlayerEntity.Builder builder = entity.toBuilder();
builder.setStamina(this.entity.getStamina() + 1); // 增长
this.entity = builder.build();
}


/**
* 触发保存玩家实体
*/
private void storeEntity(StoreCommand ignore) {
log.info("保存玩家角色实体成功");
long now = System.currentTimeMillis();
long offset = now - entity.getUptime(); // 登陆时间偏差值
long online = offset + entity.getOnline(); // 最终的在线时间


// 更新在线时间和更新时间
// 注意: Entity 实体只允许直接替换整个对象而不能修改其中某个值, 所以需要重新构建成 Builder
PlayerEntity.Builder builder = entity.toBuilder();
builder.setUptime(now);
builder.setOnline(online);

// 其他修改

// 最后重新构建成 entity
this.entity = builder.build();
log.info("保存实体成功: {}", this.entity);
}
}
}

上面启动之后会创建自定义玩家内存实体, 并且实现了简单的游戏体力自增长和模拟实体保存数据库的功能

对于 Pekko 这样做的好处是如果后续升级到集群处理的时候, 直接就能支持集群的玩家角色属性传递(Pekko 集群底层采用 Protobuf)

Protobuf 生成的对象都是不可变的, 也就代表只允许被重新写入数据构建而不能针对某些字段单独修改.

注意: 玩家客户端只允许生成归于自身单独的 Actor, 否则会出现 Actor 数量上限问题, 单个 Actor 也不会只挂载 PlayerEntity 单一实体

而玩家 Actor 在生成时会实例化并挂载日常需要 manager/mgr(管理器) 相关玩家操作集, 其实也就是在 Actor 挂载的时候实例化自己的类功能调用.

比如挂载成就系统/战斗系统/养成系统等多种系统, 其实也就是在构造的时候 new {特定系统}Manager(entity) 挂载到当前 Actor.

这种情况都是将 Actor 视为玩家自己的线程操作, 只要遵循 Actor 调度规则的话不会出现线程数据异常的问题.

当然实际游戏服务端当中不会这么简单, 这里只是大概说明下作为服务端使用 Protobuf 传输消息和执行操作的过程.

如果可以的话, 尽可能把设计数据结构定义成 Protobuf, 这样有利于在客户端和服务端共享并同步数据, 这里提供战斗单位的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 英雄阵营: 攻击方|防御方
enum HeroCamp{
attacker = 0; // 攻击方
defender = 1; // 防御方
}


// 战斗的英雄单位, 一般是应用在放置类卡牌游戏之中
// 序列序号 1~99 是基础信息
// 序列序号 100~999 是攻击战斗Buffer等属性信息
// 序号序号 1000~1999 是时装|模板属性
message HeroInfo{
int64 id = 1; // 英雄单位ID标识

// 英雄位置, 一般来说放置卡牌游戏都是类似于插槽1,2,3,4这样上阵英雄, 所以只需要数值即可
// 但是如果自走棋之类, 那么应该是棋盘的 (X,Y) 位置属性
int32 grid = 2;
int32 level = 3; // 英雄卡牌等级, 如果英雄卡牌需要等级养成系统就需要定义('狗粮'机制), 这种需要和阶级区分(比如绿卡/蓝卡/紫卡)


// 下面两个需要合起来说明
// 属性值集合, 比如攻击速度/最大血量/当前血量/攻击力/暴击率等, 用 <{属性ID},{属性数值}> 做标识
map<int32, int32> initialized = 100; // 初始属性值
map<int32, int32> properties = 101; // 更新属性值
// 注意: 战斗单位的数值会受到大量情况影响, 比如 buffer 增减攻击力等情况, 然后3回合需要回滚数值的情况
// 所以需要记录英雄的初始化值方便恢复默认的数值


// 技能这里简化成 int32 标识行为, 实际上技能涵盖的属性值非常多的, 有时候需要单独设计成类定义
// 注意: 攻击动作其实也是0消耗技能, 也就是普通攻击也可以看作是无消耗且没有增益技能, 其他则是带有消耗和增益属性
int32 attack_normal = 200; // 普通攻击
// int32 skill_active = 201; // 特殊攻击|主动技能, 如果英雄单位简单(只有单个主动技能), 直接可以定义成 int32, 否则就需要定义成 list/map
repeated int32 skills_active = 201; // 主动技能组
repeated int32 skills_passive = 202; // 被动技能组



// 这里还有些 buffer 相关的, 但是篇幅太多跳过


int32 tid = 1000; // 模板ID, 用户客户端调用外观装备的模板 ID
}


// 英雄队伍信息
message HeroTeam{
HeroCamp camp = 1; // 英雄阵营
repeated HeroInfo heros = 2; // 上阵英雄列表
}

类似于这种结构就可以同步给客户端来同步玩家信息, 无论客户端还是服务端采用什么语言都可以按照这种逻辑同步战斗对局信息.

注意: 战斗中只同步变化的 HeroInfo 字段(如血量、Buff 剩余回合), 而非全量同步, 否则会导致大量无意义的数据传输