Pekko JDBC-ORM 数据库处理

之前大部分操作都是针对单体 Actor, 但是实际生产之后免不了要操作数据库, 而 Pekko 内部已经集成这部分功能

大部分情况都会看到内部所有模块都标注 用于对象持久化处理, 这些都是用于保存 Actor 状态, 如果想简单使用的话太过复杂

Slink 集成另外说明, 因为数据库相关篇幅很复杂没办法直接讲清楚; 绝大部分情况 Slink 是在 Scala 上用, Java 用 JDBC 熟悉

这里采用 pekko-projection-jdbc 官方模块和文档说明, 最后说下这个过程踩过的坑, 这里通过 Maven 引入第三方库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<!-- 第三方依赖 -->
<properties>
<scala.binary.version>2.13</scala.binary.version>
<!-- 注意: pekko-projection 之类项目归属第三方模块, 所以版本号和 pekko 之类不同 -->
<!-- 不要把 pekko 核心功能的版本与 pekko-projection 版本混淆 -->
<pekko.projection.version>1.1.0</pekko.projection.version>
</properties>
<dependencies>

<!-- Pekko Projection 基础依赖 -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-projection-core_${scala.binary.version}</artifactId>
<version>${pekko.projection.version}</version>
</dependency>

<!-- Pekko JDBC 核心依赖 -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-projection-jdbc_${scala.binary.version}</artifactId>
<version>${pekko.projection.version}</version>
</dependency>


<!-- 这里目前底层用的 MariaDB 数据库, 也需要引入对应 MariaDB 驱动 -->
<!-- 具体驱动依赖配置参考: https://pekko.apache.org/docs/pekko-persistence-jdbc/current/configuration.html#database-schema -->
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>3.5.6</version>
<scope>compile</scope>
</dependency>

<!-- 连接池(HikariCP, Pekko JDBC 底层默认使用)-->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>5.1.0</version>
<scope>compile</scope>
</dependency>

<!-- 如果需要数据支持 hibernate ORM 的话就需要引入, 建议引入来避免裸写 SQL 操作 -->
<dependency>
<groupId>org.hibernate.orm</groupId>
<artifactId>hibernate-core</artifactId>
<version>7.2.1.Final</version>
<scope>compile</scope>
</dependency>

</dependencies>

之后就是配置数据库相关参数, 我这边默认数据库内部已经有 pekko_game 数据库, 其他都是沿用默认数据配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
pekko {
projection {
jdbc {

# MariaDB 完全兼容 MySQL 方言, 因此配置为 mysql-dialect 即可(必须配置,否则运行报错)
dialect = "mysql-dialect"

# 设置阻塞 JDBC 调度器(必须配置,否则运行报错)
use-dispatcher = "pekko.projection.jdbc.blocking-jdbc-dispatcher"
blocking-jdbc-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
# 关键: 与后续的连接池 HikariCP maximum-pool-size 保持一致
fixed-pool-size = 10
}
throughput = 1
}

# 设置偏移存储配置
offset-store {
schema = "" # MySQL/MariaDB 无特殊 schema, 留空
table = "pekko_projection_offset_store"
management-table = "pekko_projection_management"
use-lowercase-schema = true # 对 MySQL/MariaDB 无影响, 保留默认
}

# 设置调试日志(生产环境关闭)
debug.verbose-offset-store-logging = true

# 数据库连接池配置
connection-pool {
url = "jdbc:mariadb://localhost:3306/pekko_game?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC&rewriteBatchedStatements=true"
user = "pekko"
password = "pekko123"
driver = "org.mariadb.jdbc.Driver"

# HikariCP 连接池参数(官方强烈建议使用连接池)
hikaricp {
maximum-pool-size = 10 # 与调度器线程数一致
minimum-idle = 5 # 生产环境建议不小于 5, 避免频繁创建连接
idle-timeout = 300000 # 5分钟, 超过则释放空闲连接
connection-timeout = 30000 # 30秒连接超时
max-lifetime = 1800000 # 30分钟, 强制刷新连接(避免数据库断开空闲连接)
validation-timeout = 5000 # 5秒验证连接有效性
connection-test-query = "SELECT 1" # 验证连接的 SQL(MySQL/MariaDB 通用)
}
}
}
}
}

这个文件可以创建在 src/main/resources 声明成 application.conf, 之后启动就会发现 pekko 异常:

1
2
[pekko-database-pekko.actor.default-dispatcher-4] INFO org.apache.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
[pekko-database-pekko.actor.default-dispatcher-5] WARN org.apache.pekko.util.ManifestInfo - You are using version 1.1.5 of Apache Pekko, but it appears you (perhaps indirectly) also depend on older versions of related artifacts. You can solve this by adding an explicit dependency on version 1.1.5 of the [pekko-protobuf-v3, pekko-persistence-query, pekko-stream, pekko-actor-typed, pekko-persistence] artifacts to your project. Here's a complete collection of detected artifacts: (1.1.3, [pekko-actor-typed, pekko-persistence, pekko-persistence-query, pekko-protobuf-v3, pekko-stream]), (1.1.5, [pekko-actor, pekko-slf4j]). See also: https://pekko.apache.org/docs/pekko/current/common/binary-compatibility-rules.html#mixed-versioning-is-not-allowed

这也是目前 pekko 最大的割裂, 我采用的是 pekko-actor 基础功能是最新版本 1.1.5, 但底层其他依赖内部都是使用 1.1.3 组件

官方文档解释怎么处理: https://pekko.apache.org/docs/pekko/current/common/binary-compatibility-rules.html#mixed-versioning-is-not-allowed

这里就需要提前显式声明 pekko-projection 内部依赖组件, 让其覆盖掉目前内部底层会引入版本信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<!-- 可以需要升级底层依赖: (1.1.3, [pekko-actor-typed, pekko-persistence, pekko-persistence-query, pekko-protobuf-v3, pekko-stream]) -->
<dependencies>
<!-- 优先引入高版本模块, 他们会自动覆盖掉内部底层的的版本 -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor-typed_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-persistence_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-persistence-query_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-protobuf-v3_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>

<!-- 其他略 -->
</dependencies>

另外如果后续启动现实以下错误(xxx is not configured相关):

1
Config value for 'pekko.projection.jdbc.blocking-jdbc-dispatcher.thread-pool-executor.fixed-pool-size' is not configured. The thread pool size must be integer and be as large as the JDBC connection pool.

Pekko 使用 JDBC 必须给他们分配单独线程池, 而且配置基本都是写死的, 这里需要检查 blocking-jdbc-dispatcher 是否正确配置成功

踩坑指南

首先必须要知道 pekko-jdbc 需要自定义以下关键字对象:

关键字 功能类型 核心作用 通俗解释
projectionId ProjectionId 唯一标识当前的 Projection 实例 给你的数据库操作任务起一个唯一名字,用于区分不同的数据库处理任务
sourceProvider SourceProvider<Offset, Envelope> 提供待处理的数据来源(数据流) 定义“要处理什么数据”,可以是数据库查询结果、消息队列数据、定时任务生成的数据等
sessionCreator Supplier<S extends JdbcSession> 提供数据库会话实例 每次处理数据时,创建一个数据库连接会话
handler Supplier<JdbcHandler<Envelope, S>> 定义数据处理逻辑 定义“怎么处理数据”,比如执行增删改查 SQL、事务控制等

这里按照官方文档来说明必须首先创建 JdbcSession 作为会话工厂的对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import org.apache.pekko.projection.jdbc.JdbcSession;

import java.sql.Connection;
import java.sql.SQLException;

/**
* 自定义的 PekkoJDBC 数据库会话工厂
*/
public static class PekkoJdbcSession implements JdbcSession {

/**
* 数据库会话连接
*/
final Connection connection;


/**
* 按照依赖注入原则, 数据库会话处理由外部传入
*/
public PekkoJdbcSession(final Connection connection) {
this.connection = connection;
}

/**
* 确认是否采用 Exactly Once 模式, 如果传入 true 自动关闭 auto commit
*/
public PekkoJdbcSession(final Connection connection, boolean isExactlyOnce) throws SQLException {
connection.setAutoCommit(!isExactlyOnce);
this.connection = connection;
}


@Override
public <Result> Result withConnection(Function<Connection, Result> func) throws Exception {
return func.apply(connection);
}

@Override
public void commit() throws SQLException {
connection.commit();
}

@Override
public void rollback() throws SQLException {
connection.rollback();
}

@Override
public void close() throws SQLException {
connection.close();
}
}

官方后续提到怎么去实现 Hibernate-ORM 的支持, 方便采用现代化 Hibernate-ORM 处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* 自定义的 Hibernate-ORM 支持数据库会话工厂
*/
public static class HibernateJdbcSession implements JdbcSession {

/**
* ORM 管理器
*/
public final EntityManager entityManager;

/**
* ORM 事务
*/
private final EntityTransaction transaction;

/**
* 构造方法
*/
public HibernateJdbcSession(EntityManager entityManager) {
this.entityManager = entityManager;
this.transaction = this.entityManager.getTransaction();
this.transaction.begin();
}

@Override
public <Result> Result withConnection(Function<Connection, Result> func) {
Session hibernateSession = entityManager.unwrap(Session.class);
return hibernateSession.doReturningWork(
connection -> {
try {
return func.apply(connection);
} catch (SQLException e) {
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
});
}

@Override
public void commit() {
if (transaction.isActive()) transaction.commit();
}

@Override
public void rollback() {
if (transaction.isActive()) transaction.rollback();
}

@Override
public void close() {
if (this.entityManager.isOpen()) this.entityManager.close();
}
}

这里还需要声明 Hibernate 的数据构建工厂, 用于创建初始化 Hibernate 工厂对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 自定义的 Hibernate-ORM 支持数据库会话生产工厂句柄
*/
public static class HibernateJdbcSessionFactory {
private final EntityManagerFactory entityManagerFactory;


/**
* 构建方法
*/
public HibernateJdbcSessionFactory(String name) {
this.entityManagerFactory = Persistence.createEntityManagerFactory(name);
}

/**
* 生产 Hibernate JDBC 的处理会话
*/
public HibernateJdbcSession newInstance() {
return new HibernateJdbcSession(entityManagerFactory.createEntityManager());
}

/**
* 关闭工厂
*/
public void close() {
if (entityManagerFactory.isOpen()) {
entityManagerFactory.close();
}
}
}

这里确认编写启动测试例子, 后面才会慢慢讲解怎么去 PekkoJDBC 是怎么去解析处理的, 最后添加 Hikari 转化工具:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.pekko.projection.jdbc.internal.JdbcSettings;

/**
* Hikari 数据库连接池工具
*/
public static final class HikariUtils {

/**
* 加载 ActorSystem 之中的 HikariDataSource 配置
*/
public static HikariDataSource create(JdbcSettings settings) {
Config jdbcConfig = settings.config(); // 内部映射的配置为 pekko.projection.jdbc
Config poolConfig = jdbcConfig.getConfig("connection-pool");// 加载 pekko.projection.jdbc.connection-pool 配置
Config hikariConfig = poolConfig.getConfig("hikaricp");// 加载 pekko.projection.jdbc.connection-pool.hikaricp 配置

// 初始化 HikariCP
HikariDataSource props = new HikariDataSource();
props.setJdbcUrl(poolConfig.getString("url"));
props.setUsername(poolConfig.getString("user"));
props.setPassword(poolConfig.getString("password"));
props.setDriverClassName(poolConfig.getString("driver"));


// 设置连接池 HikariCP 参数
props.setMaximumPoolSize(hikariConfig.getInt("maximum-pool-size"));
props.setMinimumIdle(hikariConfig.getInt("minimum-idle"));
props.setIdleTimeout(hikariConfig.getLong("idle-timeout"));
props.setConnectionTimeout(hikariConfig.getLong("connection-timeout"));
props.setMaxLifetime(hikariConfig.getLong("max-lifetime"));
props.setValidationTimeout(hikariConfig.getLong("validation-timeout"));
props.setConnectionTestQuery(hikariConfig.getString("connection-test-query"));

return props;
}

/**
* 加载 ActorSystem 之中的 HikariDataSource 配置
*/
public static <T> HikariDataSource create(ActorSystem<T> system) {
return create(JdbcSettings.apply(system));
}
}

官方稳定其实说得很模糊, 所以造就这部分功能很难入手处理; 这里面所有功能涉及的模块功能, 包含且不止有:

  • 数据库 JDBC 驱动

  • PekkoActor 和 PekkoProjection 版本差异

  • PekkoActor 弱类型与强类型的转换

  • 不同数据库的不同驱动(MySQL/MariaDB/Postgres)

  • Hibernate-ORM 的数据实体建模

  • PekkoStream 数据流操作处理

  • Pekko Projection 提供的偏移存储(Offset Store)

这些模块都是社区当中的大型解决方案, 单独某一项了解上手都要很久时间来学习.

其他框架易用性这么高, 其实就是底层已经提取封装好了, 基本上在配完系统配置就完成模块自动装载, 而 Pekko 就需要自己动手来包装

另外还有个很大问题: 不同版本的 Provider 实现是不同的, 导致网上和文档相关内容可能写出来是错误的, 甚至没办法跑通测试例子

按照 PekkoJDBC 的官方文档配置我最终也没跑通运行, 官方最开始的说明:

1
The source of the envelopes can be events from Apache Pekko Persistence or any other SourceProvider with supported offset types.

也是要求采用持久化功能来处理, 需要手动处理 信封(envelopes) 封装提取出消息的流程, 按照官方文档那个页面教程是没办法直接跑通.

问题排查

这里我查询资料之后最终测试运行成功跑通最后的 JDBC 例子, 这里代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.japi.function.Function;
import org.apache.pekko.projection.jdbc.JdbcSession;
import org.apache.pekko.projection.jdbc.internal.JdbcSettings;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

/**
* PekkoJDBC 简单示范样例
*/
public class PekkoJdbcSimpleExample {

/**
* Hikari 数据库连接池工具
*/
public static final class HikariUtils {

/**
* 加载 ActorSystem 之中的 HikariDataSource 配置
*/
public static HikariDataSource create(JdbcSettings settings) {
Config jdbcConfig = settings.config(); // 内部映射的配置为 pekko.projection.jdbc
Config poolConfig = jdbcConfig.getConfig("connection-pool");// 加载 pekko.projection.jdbc.connection-pool 配置
Config hikariConfig = poolConfig.getConfig("hikaricp");// 加载 pekko.projection.jdbc.connection-pool.hikaricp 配置

// 初始化 HikariCP
HikariDataSource props = new HikariDataSource();
props.setJdbcUrl(poolConfig.getString("url"));
props.setUsername(poolConfig.getString("user"));
props.setPassword(poolConfig.getString("password"));
props.setDriverClassName(poolConfig.getString("driver"));


// 设置连接池 HikariCP 参数
props.setMaximumPoolSize(hikariConfig.getInt("maximum-pool-size"));
props.setMinimumIdle(hikariConfig.getInt("minimum-idle"));
props.setIdleTimeout(hikariConfig.getLong("idle-timeout"));
props.setConnectionTimeout(hikariConfig.getLong("connection-timeout"));
props.setMaxLifetime(hikariConfig.getLong("max-lifetime"));
props.setValidationTimeout(hikariConfig.getLong("validation-timeout"));
props.setConnectionTestQuery(hikariConfig.getString("connection-test-query"));

return props;
}

/**
* 加载 ActorSystem 之中的 HikariDataSource 配置
*/
public static <T> HikariDataSource create(ActorSystem<T> system) {
return create(JdbcSettings.apply(system));
}
}

/**
* 自定义的 PekkoJDBC 数据库会话工厂
*/
public static class PekkoJdbcSession implements JdbcSession, AutoCloseable {

/**
* 数据库会话连接
*/
final Connection connection;


/**
* 按照依赖注入原则, 数据库会话处理由外部传入
*/
public PekkoJdbcSession(final Connection connection) {
this.connection = connection;
}

/**
* 确认是否采用 Exactly Once 模式, 如果传入 true 自动关闭 auto commit
*/
public PekkoJdbcSession(final Connection connection, boolean isExactlyOnce) throws SQLException {
connection.setAutoCommit(!isExactlyOnce);
this.connection = connection;
}


@Override
public <Result> Result withConnection(Function<Connection, Result> func) throws Exception {
return func.apply(connection);
}

@Override
public void commit() throws SQLException {
connection.commit();
}

@Override
public void rollback() throws SQLException {
connection.rollback();
}

@Override
public void close() throws SQLException {
connection.close();
}
}


/**
* 测试用的用户实体
*/
public static class User {
private Long id;
private String name;
private Integer age;

public User(Long id, String name, Integer age) {
this.id = id;
this.name = name;
this.age = age;
}

// getter/setter 省略
@Override
public String toString() {
return "User{id=" + id + ", name='" + name + "', age=" + age + "}";
}
}


/**
* 创建测试表
*/
private static void createTestTable(HikariDataSource ds) throws Exception {
String sql = "CREATE TABLE IF NOT EXISTS user_test (id BIGINT PRIMARY KEY, name VARCHAR(50), age INT)";
try (var conn = ds.getConnection();
var stmt = conn.createStatement()) {
stmt.execute(sql);
}
}


// 插入用户(复用你的 JdbcSession)
private static void insertUser(HikariDataSource ds, User user) throws Exception {
String sql = "INSERT INTO user_test (id, name, age) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE name=?, age=?";

// 使用自定义的 JdbcSession 封装连接
try (PekkoJdbcSession session = new PekkoJdbcSession(ds.getConnection())) {
session.withConnection(conn -> {
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setLong(1, user.id);
stmt.setString(2, user.name);
stmt.setInt(3, user.age);
stmt.setString(4, user.name);
stmt.setInt(5, user.age);
return stmt.executeUpdate();
}
});
session.commit();
}
}


// 查询所有用户
private static List<User> listAllUsers(HikariDataSource ds) throws Exception {
String sql = "SELECT id, name, age FROM user_test";
List<User> users = new ArrayList<>();

try (PekkoJdbcSession session = new PekkoJdbcSession(ds.getConnection())) {
session.withConnection(conn -> {
try (PreparedStatement stmt = conn.prepareStatement(sql);
ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
users.add(new User(
rs.getLong("id"),
rs.getString("name"),
rs.getInt("age")
));
}
return null;
}
});
}
return users;
}


/**
* 项目启动入口
*
* @param args 参数列表
*/
public static void main(String[] args) throws Exception {

// 获取实例化 Actor 系统, JDBC 底层用了强类型相关组件
// 如果想要采用经典版本 Actor 则必须手动降级处理
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "pekko-database");
org.apache.pekko.actor.ActorSystem classicSystem = system.classicSystem(); // 降为无强类型


// 获取 hikariDataSource 数据库连接池配置
HikariDataSource hikariDataSource = HikariUtils.create(system);


// 创建测试表
createTestTable(hikariDataSource);

// 插入数据
insertUser(hikariDataSource, new User(1L, "MeteorGX", 18));
insertUser(hikariDataSource, new User(2L, "MeteorCat", 30));

// 查询数据
List<User> users = listAllUsers(hikariDataSource);
System.out.println("查询到的用户列表:");
users.forEach(System.out::println);

// 注册 Actor 退出之后的回调
system.getWhenTerminated().thenAccept(ignore -> {
hikariDataSource.close();// 关闭 Hikari 数据驱动
System.out.println("Clean Actor Resource");
});


// 任意键按下退出
System.out.println("Press RETURN to stop...");
int ignore = System.in.read();

// 退出功能
system.terminate();
}
}

这里启动之后会在数据库当中构建出 user_test 表, 后续就是插入数据最后查询处理, 而且能够看到具体的 HikariPool 启动打印内容.

后面追加 Hibernate 支持也是网上资料查询之后测试出来的, 这里补充个后续采用的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;
import jakarta.persistence.*;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.japi.function.Function;
import org.apache.pekko.projection.jdbc.JdbcSession;
import org.apache.pekko.projection.jdbc.internal.JdbcSettings;
import org.hibernate.Session;
import org.hibernate.cfg.HikariCPSettings;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* PekkoJDBC Hibernate 示范样例
*/
public class PekkoJdbcHibernateExample {

/**
* Hikari 数据库连接池工具
*/
public static final class HikariUtils {

/**
* 加载 ActorSystem 之中的 HikariDataSource 配置
*/
public static HikariDataSource create(JdbcSettings settings) {
Config jdbcConfig = settings.config(); // 内部映射的配置为 pekko.projection.jdbc
Config poolConfig = jdbcConfig.getConfig("connection-pool");// 加载 pekko.projection.jdbc.connection-pool 配置
Config hikariConfig = poolConfig.getConfig("hikaricp");// 加载 pekko.projection.jdbc.connection-pool.hikaricp 配置

// 初始化 HikariCP
HikariDataSource props = new HikariDataSource();
props.setJdbcUrl(poolConfig.getString("url"));
props.setUsername(poolConfig.getString("user"));
props.setPassword(poolConfig.getString("password"));
props.setDriverClassName(poolConfig.getString("driver"));


// 设置连接池 HikariCP 参数
props.setMaximumPoolSize(hikariConfig.getInt("maximum-pool-size"));
props.setMinimumIdle(hikariConfig.getInt("minimum-idle"));
props.setIdleTimeout(hikariConfig.getLong("idle-timeout"));
props.setConnectionTimeout(hikariConfig.getLong("connection-timeout"));
props.setMaxLifetime(hikariConfig.getLong("max-lifetime"));
props.setValidationTimeout(hikariConfig.getLong("validation-timeout"));
props.setConnectionTestQuery(hikariConfig.getString("connection-test-query"));

return props;
}

/**
* 加载 ActorSystem 之中的 HikariDataSource 配置
*/
public static <T> HikariDataSource create(ActorSystem<T> system) {
return create(JdbcSettings.apply(system));
}
}


/**
* 自定义的 Hibernate-ORM 支持数据库会话工厂
*/
public static class HibernateJdbcSession implements JdbcSession, AutoCloseable {

/**
* ORM 管理器
*/
public final EntityManager entityManager;

/**
* ORM 事务
*/
private final EntityTransaction transaction;

/**
* 构造方法
*/
public HibernateJdbcSession(EntityManager entityManager) {
this.entityManager = entityManager;
this.transaction = this.entityManager.getTransaction();
this.transaction.begin();
}

@Override
public <Result> Result withConnection(Function<Connection, Result> func) {
Session hibernateSession = entityManager.unwrap(Session.class);
return hibernateSession.doReturningWork(
connection -> {
try {
return func.apply(connection);
} catch (SQLException e) {
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
});
}

@Override
public void commit() {
if (transaction.isActive()) transaction.commit();
}

@Override
public void rollback() {
if (transaction.isActive()) transaction.rollback();
}

@Override
public void close() {
if (this.entityManager.isOpen()) this.entityManager.close();
}
}


/**
* 自定义的 Hibernate-ORM 支持数据库会话生产工厂句柄
*/
public static class HibernateJdbcSessionFactory implements AutoCloseable {
private final EntityManagerFactory entityManagerFactory;


/**
* 构建方法
*/
public HibernateJdbcSessionFactory(String name) {
this.entityManagerFactory = Persistence.createEntityManagerFactory(name);
}

/**
* 新追加的复用 HikariDataSource 内部配置生成 entityManagerFactory
*/
public HibernateJdbcSessionFactory(String name, HikariDataSource hikariDataSource /*, String dialect*/) {
Map<String, Object> prop = new HashMap<>();

// 加载外部连接池设置 hibernate.connection.provider_class 项
prop.put(org.hibernate.cfg.JdbcSettings.CONNECTION_PROVIDER, "hikaricp");

// 加载 JDBC 的参数
prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_DRIVER, hikariDataSource.getDriverClassName());
prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_URL, hikariDataSource.getJdbcUrl());
prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_USER, hikariDataSource.getUsername());
prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_PASSWORD, hikariDataSource.getPassword());

// Hibernate 专属常量: 数据库方言, 用常量替代手写 "hibernate.dialect"
// Hibernate 7.x 的之后无需配置, 会被自动识别
//prop.put(AvailableSettings.DIALECT, dialect);

// SQL 日志配置
prop.put(org.hibernate.cfg.JdbcSettings.HIGHLIGHT_SQL, true);
prop.put(org.hibernate.cfg.JdbcSettings.SHOW_SQL, true);
prop.put(org.hibernate.cfg.JdbcSettings.FORMAT_SQL, true);

// HikariCP 连接池配置
prop.put(HikariCPSettings.HIKARI_MAX_SIZE, hikariDataSource.getMaximumPoolSize());
prop.put(HikariCPSettings.HIKARI_MIN_IDLE_SIZE, hikariDataSource.getMinimumIdle());
prop.put(HikariCPSettings.HIKARI_MAX_LIFETIME, hikariDataSource.getMaxLifetime());
prop.put(HikariCPSettings.HIKARI_LEAK_TIMEOUT, hikariDataSource.getLeakDetectionThreshold());
prop.put(HikariCPSettings.HIKARI_IDLE_TIMEOUT, hikariDataSource.getIdleTimeout());
prop.put(HikariCPSettings.HIKARI_ACQUISITION_TIMEOUT, hikariDataSource.getConnectionTimeout());
prop.put(HikariCPSettings.HIKARI_VALIDATION_TIMEOUT, hikariDataSource.getValidationTimeout());
prop.put(HikariCPSettings.HIKARI_INITIALIZATION_TIMEOUT, hikariDataSource.getInitializationFailTimeout());
prop.put(HikariCPSettings.HIKARI_KEEPALIVE_TIME, hikariDataSource.getKeepaliveTime());
prop.put(HikariCPSettings.HIKARI_INITIAL_SQL, hikariDataSource.getConnectionInitSql());
prop.put(HikariCPSettings.HIKARI_POOL_NAME, hikariDataSource.getPoolName());
prop.put(HikariCPSettings.HIKARI_READ_ONLY, hikariDataSource.isReadOnly());
prop.put(HikariCPSettings.HIKARI_ISOLATE_INTERNAL_QUERIES, hikariDataSource.isIsolateInternalQueries());


// 剔除掉 null 的值
prop = prop.entrySet()
.stream()
.filter(active -> Objects.nonNull(active.getValue()))
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue
));

this.entityManagerFactory = new PersistenceConfiguration(name)
.properties(prop)
.managedClass(HibernateUser.class) // 注入识别实体对象, 很多高级框架是自动扫描自己包内的 Entity 对象之后自动装配
.createEntityManagerFactory();
}

/**
* 生产 Hibernate JDBC 的处理会话
*/
public HibernateJdbcSession newInstance() {
return new HibernateJdbcSession(entityManagerFactory.createEntityManager());
}

/**
* 关闭工厂
*/
public void close() {
if (entityManagerFactory.isOpen()) {
entityManagerFactory.close();
}
}
}


/**
* 定义数据实体
*/
@Entity
@Table(name = "user_test")
public static class HibernateUser {
@Id
private Long id;
private String name;
private Integer age;

// 无参构造(Hibernate 必需)
public HibernateUser() {
}

public HibernateUser(Long id, String name, Integer age) {
this.id = id;
this.name = name;
this.age = age;
}

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Integer getAge() {
return age;
}

public void setAge(Integer age) {
this.age = age;
}
}


/**
* 创建测试表
*/
private static void createTestTable(HikariDataSource ds) throws Exception {
String sql = "CREATE TABLE IF NOT EXISTS user_test (id BIGINT PRIMARY KEY, name VARCHAR(50), age INT)";
try (var conn = ds.getConnection();
var stmt = conn.createStatement()) {
stmt.execute(sql);
}
}


/**
* 项目启动入口
*
* @param args 参数列表
*/
public static void main(String[] args) throws Exception {

// 获取实例化 Actor 系统, JDBC 底层用了强类型相关组件
// 如果想要采用经典版本 Actor 则必须手动降级处理
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "pekko-database");
org.apache.pekko.actor.ActorSystem classicSystem = system.classicSystem(); // 降为无强类型


// 获取 hikariDataSource 数据库连接池配置
HikariDataSource hikariDataSource = PekkoJdbcSimpleExample.HikariUtils.create(system);

// 初始化 Hibernate 工厂
// 如果没有关联 HikariDataSource 就需要编写 persistence.xml 具体的配置文件
// HibernateJdbcSessionFactory hibernateFactory = new HibernateJdbcSessionFactory("pekko-hibernate");
// 这里采用复用 hikariDataSource
HibernateJdbcSessionFactory hibernateFactory = new HibernateJdbcSessionFactory(
"pekko-hibernate",
hikariDataSource
// "org.hibernate.dialect.MariaDBDialect" // 对应 hibernate 数据库方言, Hibernate 7.x 后续版本无需处理自动识别
);

// 创建测试表
createTestTable(hikariDataSource);


// 插入数据
try (HibernateJdbcSession session = hibernateFactory.newInstance()) {
if (Objects.isNull(session.entityManager.find(HibernateUser.class, 3L))) {
session.entityManager.persist(new HibernateUser(3L, "NewMeteorGX", 28));
}

if (Objects.isNull(session.entityManager.find(HibernateUser.class, 4L))) {
session.entityManager.persist(new HibernateUser(4L, "NewMeteorCat", 56));
}
session.commit();
}

// 查询数据
try (HibernateJdbcSession session = hibernateFactory.newInstance()) {
HibernateUser user = session.entityManager.find(HibernateUser.class, 4L);
System.out.println("Hibernate 查询到的用户:" + user.getName());
}


// 注册 Actor 退出之后的回调
system.getWhenTerminated().thenAccept(ignore -> {
hibernateFactory.close(); // 关闭 Hibernate 工厂对象
hikariDataSource.close();// 关闭 Hikari 数据驱动
System.out.println("Clean Actor Resource");
});


// 任意键按下退出
System.out.println("Press RETURN to stop...");
int ignore = System.in.read();

// 退出功能
system.terminate();
}
}

这里就是采用 hibernate 的数据库交互功能, 其中需要手动加载 @Entity 注解的类对象, 想要自动启动扫描这是另外实现的功能.

Actor 结合

现在就可以让 JDBC 结合 Actor 挂载来处理授权验证并加载数据库玩家信息, 最后动态创建子 Actor 挂载运行并且异步落地到数据库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;
import jakarta.persistence.*;
import org.apache.pekko.actor.Cancellable;
import org.apache.pekko.actor.typed.*;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.Receive;
import org.apache.pekko.japi.function.Function;
import org.apache.pekko.projection.jdbc.JdbcSession;
import org.apache.pekko.projection.jdbc.internal.JdbcSettings;
import org.hibernate.Session;
import org.hibernate.cfg.HikariCPSettings;
import org.slf4j.Logger;

import java.sql.Connection;
import java.sql.SQLException;
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;

/**
* 模拟 Actor 授权挂载, 并且在线的时候异步落地数据库
*/
public class PekkoJdbcActorExample {


/**
* Hikari 数据库连接池工具
*/
public static final class HikariUtils {

/**
* 加载 ActorSystem 之中的 HikariDataSource 配置
*/
public static HikariDataSource create(JdbcSettings settings) {
Config jdbcConfig = settings.config(); // 内部映射的配置为 pekko.projection.jdbc
Config poolConfig = jdbcConfig.getConfig("connection-pool");// 加载 pekko.projection.jdbc.connection-pool 配置
Config hikariConfig = poolConfig.getConfig("hikaricp");// 加载 pekko.projection.jdbc.connection-pool.hikaricp 配置

// 初始化 HikariCP
HikariDataSource props = new HikariDataSource();
props.setJdbcUrl(poolConfig.getString("url"));
props.setUsername(poolConfig.getString("user"));
props.setPassword(poolConfig.getString("password"));
props.setDriverClassName(poolConfig.getString("driver"));


// 设置连接池 HikariCP 参数
props.setMaximumPoolSize(hikariConfig.getInt("maximum-pool-size"));
props.setMinimumIdle(hikariConfig.getInt("minimum-idle"));
props.setIdleTimeout(hikariConfig.getLong("idle-timeout"));
props.setConnectionTimeout(hikariConfig.getLong("connection-timeout"));
props.setMaxLifetime(hikariConfig.getLong("max-lifetime"));
props.setValidationTimeout(hikariConfig.getLong("validation-timeout"));
props.setConnectionTestQuery(hikariConfig.getString("connection-test-query"));

return props;
}

/**
* 加载 ActorSystem 之中的 HikariDataSource 配置
*/
public static <T> HikariDataSource create(ActorSystem<T> system) {
return create(JdbcSettings.apply(system));
}
}


/**
* 自定义的 Hibernate-ORM 支持数据库会话工厂
*/
public static class HibernateJdbcSession implements JdbcSession, AutoCloseable {

/**
* ORM 管理器
*/
public final EntityManager entityManager;

/**
* ORM 事务
*/
private final EntityTransaction transaction;

/**
* 构造方法
*/
public HibernateJdbcSession(EntityManager entityManager) {
this.entityManager = entityManager;
this.transaction = this.entityManager.getTransaction();
this.transaction.begin();
}

@Override
public <Result> Result withConnection(Function<Connection, Result> func) {
Session hibernateSession = entityManager.unwrap(Session.class);
return hibernateSession.doReturningWork(
connection -> {
try {
return func.apply(connection);
} catch (SQLException e) {
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
});
}

@Override
public void commit() {
if (transaction.isActive()) transaction.commit();
}

@Override
public void rollback() {
if (transaction.isActive()) transaction.rollback();
}

@Override
public void close() {
if (this.entityManager.isOpen()) this.entityManager.close();
}
}


/**
* 自定义的 Hibernate-ORM 支持数据库会话生产工厂句柄
*/
public static class HibernateJdbcSessionFactory implements AutoCloseable {
private final EntityManagerFactory entityManagerFactory;


/**
* 构建方法
*/
public HibernateJdbcSessionFactory(final String name) {
this.entityManagerFactory = Persistence.createEntityManagerFactory(name);
}

/**
* 加载复用 HikariDataSource 内部配置
*/
public HibernateJdbcSessionFactory(final String name, HikariDataSource hikariDataSource, List<Class<?>> clazz) {
Map<String, Object> prop = new HashMap<>();

// 加载外部连接池设置 hibernate.connection.provider_class 项
prop.put(org.hibernate.cfg.JdbcSettings.CONNECTION_PROVIDER, "hikaricp");

// 加载 JDBC 的参数
prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_DRIVER, hikariDataSource.getDriverClassName());
prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_URL, hikariDataSource.getJdbcUrl());
prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_USER, hikariDataSource.getUsername());
prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_PASSWORD, hikariDataSource.getPassword());

// Hibernate 专属常量: 数据库方言, 用常量替代手写 "hibernate.dialect"
// Hibernate 7.x 的之后无需配置, 会被自动识别
//prop.put(AvailableSettings.DIALECT, dialect);

// SQL 日志配置
prop.put(org.hibernate.cfg.JdbcSettings.HIGHLIGHT_SQL, true);
prop.put(org.hibernate.cfg.JdbcSettings.SHOW_SQL, true);
prop.put(org.hibernate.cfg.JdbcSettings.FORMAT_SQL, true);

// HikariCP 连接池配置
prop.put(HikariCPSettings.HIKARI_MAX_SIZE, hikariDataSource.getMaximumPoolSize());
prop.put(HikariCPSettings.HIKARI_MIN_IDLE_SIZE, hikariDataSource.getMinimumIdle());
prop.put(HikariCPSettings.HIKARI_MAX_LIFETIME, hikariDataSource.getMaxLifetime());
prop.put(HikariCPSettings.HIKARI_LEAK_TIMEOUT, hikariDataSource.getLeakDetectionThreshold());
prop.put(HikariCPSettings.HIKARI_IDLE_TIMEOUT, hikariDataSource.getIdleTimeout());
prop.put(HikariCPSettings.HIKARI_ACQUISITION_TIMEOUT, hikariDataSource.getConnectionTimeout());
prop.put(HikariCPSettings.HIKARI_VALIDATION_TIMEOUT, hikariDataSource.getValidationTimeout());
prop.put(HikariCPSettings.HIKARI_INITIALIZATION_TIMEOUT, hikariDataSource.getInitializationFailTimeout());
prop.put(HikariCPSettings.HIKARI_KEEPALIVE_TIME, hikariDataSource.getKeepaliveTime());
prop.put(HikariCPSettings.HIKARI_INITIAL_SQL, hikariDataSource.getConnectionInitSql());
prop.put(HikariCPSettings.HIKARI_POOL_NAME, hikariDataSource.getPoolName());
prop.put(HikariCPSettings.HIKARI_READ_ONLY, hikariDataSource.isReadOnly());
prop.put(HikariCPSettings.HIKARI_ISOLATE_INTERNAL_QUERIES, hikariDataSource.isIsolateInternalQueries());


// 剔除掉 null 的值
prop = prop.entrySet()
.stream()
.filter(active -> Objects.nonNull(active.getValue()))
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue
));

// 加载基础参数
PersistenceConfiguration configuration = new PersistenceConfiguration(name);
configuration.properties(prop);

// 挂载支持的 Entity 实体类
clazz.forEach(configuration::managedClass);

// 挂载实例化对象
this.entityManagerFactory = configuration.createEntityManagerFactory();
}

/**
* 生产 Hibernate JDBC 的处理会话
*/
public HibernateJdbcSession newInstance() {
return new HibernateJdbcSession(entityManagerFactory.createEntityManager());
}

/**
* 关闭工厂
*/
public void close() {
if (entityManagerFactory.isOpen()) {
entityManagerFactory.close();
}
}
}


/**
* 指令合集
*/
public static class Commands {

/**
* 指令基础接口
*/
public interface ICommand {
}


/**
* 会话加入命令
*
* @param id 角色 ID
* @param token 授权 Token
*/
public record Join(long id, String token) implements ICommand {
}

/**
* 会话离开命令
*
* @param id 角色 ID
*/
public record Leave(long id) implements ICommand {

}
}


/**
* Actor 管理器
*/
public static class PekkoActorBehaviour extends AbstractBehavior<Commands.ICommand> {

/**
* 日志对象
*/
final Logger log = getContext().getLog();

/**
* Hibernate 会话工厂
*/
final HibernateJdbcSessionFactory factory;

/**
* 登陆授权的 Actor 列表
*/
final Map<Long, ActorRef<Commands.ICommand>> actors;

/**
* 私有化构建, Map 由外部定义是否采用线程安全
*/
private PekkoActorBehaviour(ActorContext<Commands.ICommand> context, HibernateJdbcSessionFactory factory, Map<Long, ActorRef<Commands.ICommand>> actors) {
super(context);
this.factory = factory;
this.actors = actors;
}

/**
* 静态构建
*/
public static Behavior<Commands.ICommand> create(HibernateJdbcSessionFactory factory, final Map<Long, ActorRef<Commands.ICommand>> actors) {
return Behaviors.setup((ctx) -> new PekkoActorBehaviour(ctx, factory, actors));
}

/**
* 消息拦截
*/
@Override
public Receive<Commands.ICommand> createReceive() {
return newReceiveBuilder()
.onSignal(PostStop.class, this::onStop)
.onMessage(Commands.Join.class, this::onJoin)
.onMessage(Commands.Leave.class, this::onLeave)
.build();
}


/**
* 关闭 Actor 管理器
*/
private Behavior<Commands.ICommand> onStop(PostStop ignore) {
log.warn("Actor stopped: {}", getContext().getSelf().path().toSerializationFormat());
actors.forEach((id, actor) -> getContext().stop(actor));
return Behaviors.stopped();
}


/**
* 加入会话
*/
private Behavior<Commands.ICommand> onJoin(Commands.Join command) {
try (HibernateJdbcSession session = factory.newInstance()) {
final EntityManager manager = session.entityManager;

// 确认实体
HibernatePlayer player = manager.find(HibernatePlayer.class, command.id);

// 检索数据库玩家是否存在
if (Objects.isNull(player)) {
log.warn("账号不存在: {}", command.id);
return this;
}

// 判断授权是否正确
if (!player.token.equals(command.token)) {
log.warn("授权不正确: {}", command.id);
return this;
}

// 判断目前是否已经登陆在线, 是的话需要下线处理
ActorRef<Commands.ICommand> online = actors.get(command.id);
if (Objects.nonNull(online)) {
getContext().stop(online);
actors.remove(command.id);
}

// 如果正确就动态创建 Actor 会话管理
ActorRef<Commands.ICommand> actor = getContext().spawn(
PekkoActorSession.create(player, factory),
"pekko-jdbc-actor-session-%d".formatted(command.id)
);
actors.put(command.id, actor);
} catch (Exception e) {
log.warn("无法打开数据库", e);
}

return this;
}

/**
* 离开会话
*/
private Behavior<Commands.ICommand> onLeave(Commands.Leave command) {
// 确认是否存在登陆账户, 如果不存在不需要处理
final ActorRef<Commands.ICommand> active = actors.get(command.id);
if (Objects.isNull(active)) return Behaviors.same();

// 直接通知会话 Actor 发出 PostStop 停止信号
active.unsafeUpcast().tell(PostStop.instance());
return this;
}
}


/**
* Actor 会话对象
*/
public static class PekkoActorSession extends AbstractBehavior<Commands.ICommand> {

/**
* 日志对象
*/
final Logger log = getContext().getLog();

/**
* 定时器
*/
final Scheduler scheduler = getContext().getSystem().scheduler();


/**
* Hibernate 会话工厂
*/
final HibernateJdbcSessionFactory factory;

/**
* 登陆角色信息实体
*/
HibernatePlayer player;


/**
* 数据落地定时任务
*/
Cancellable cancellable;


/**
* 定时落地数据库
*/
record ScheduleUpdatePlayer() implements Commands.ICommand {
}

/**
* 私有构建
*/
private PekkoActorSession(ActorContext<Commands.ICommand> context, HibernatePlayer player, HibernateJdbcSessionFactory factory) {
super(context);
this.player = player;
this.factory = factory;

// 更新登陆时间
this.player.loginTime = System.currentTimeMillis();


// 启动定时器, 异步执行数据库落地任务: 每 5s 落地一次数据库
cancellable = scheduler.scheduleAtFixedRate(
Duration.ofSeconds(5),
Duration.ofSeconds(5),
() -> getContext().getSelf().tell(new ScheduleUpdatePlayer()),
getContext().getExecutionContext()
);

log.info("玩家数据授权成功: {}", player.onlineTime);
}

/**
* 静态实例化
*/
public static Behavior<Commands.ICommand> create(HibernatePlayer player, HibernateJdbcSessionFactory factory) {
return Behaviors.setup((ctx) -> new PekkoActorSession(ctx, player, factory));
}


/**
* 消息拦截
*/
@Override
public Receive<Commands.ICommand> createReceive() {
return newReceiveBuilder()
.onSignal(PostStop.class, this::onStop)
.onMessage(ScheduleUpdatePlayer.class, this::onSave)
.build();
}


/**
* 关闭 Actor 管理器
*/
private Behavior<Commands.ICommand> onStop(PostStop ignore) {
log.warn("Session stopped: {}", getContext().getSelf().path().toSerializationFormat());

// 停止定时任务
if (Objects.nonNull(cancellable)) cancellable.cancel();

// 退出的时候最后更新一次玩家信息
onSave(new ScheduleUpdatePlayer());
return Behaviors.stopped();
}


/**
* 数据库落地
*/
private Behavior<Commands.ICommand> onSave(ScheduleUpdatePlayer ignore) {
// 将 player 的最新信息写入到数据库
try (HibernateJdbcSession session = factory.newInstance()) {


// 目前这里只需要计算在线时间
long now = System.currentTimeMillis();
long offset = now - player.loginTime; // 得出误差值
player.onlineTime = offset + player.onlineTime; // 合并上次在线时间

// 保存数据库
session.entityManager.merge(player);
session.commit();

log.info("玩家数据落地成功: {}", player);
} catch (Exception e) {
log.warn("玩家数据无法落地", e);
}
return this;
}
}


/**
* 玩家的数据实体
*/
@Entity
@Table(name = "player")
public static class HibernatePlayer {

/**
* 自增长 ID
*/
@Id
private Long id;

/**
* 账号名
*/
@Column(nullable = false)
private String username;


/***
* 授权 Token
*/
@Column(nullable = false)
private String token;


/**
* 登陆时间戳: 毫秒级
*/
@Column(nullable = false, name = "login_time")
private Long loginTime;


/**
* 在线时长: 毫秒级
*/
@Column(nullable = false, name = "online_time")
private Long onlineTime;

/**
* 全参数构造方法
*/
public HibernatePlayer(Long id, String username, String token, Long loginTime, Long onlineTime) {
this.id = id;
this.username = username;
this.token = token;
this.loginTime = loginTime;
this.onlineTime = onlineTime;
}

/**
* 无参数构造方法
*/
public HibernatePlayer() {

}

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getToken() {
return token;
}

public void setToken(String token) {
this.token = token;
}

public Long getLoginTime() {
return loginTime;
}

public void setLoginTime(Long loginTime) {
this.loginTime = loginTime;
}

public Long getOnlineTime() {
return onlineTime;
}

public void setOnlineTime(Long onlineTime) {
this.onlineTime = onlineTime;
}
}


/**
* 创建玩家表
*/
private static void createPlayerTable(HikariDataSource ds) throws Exception {
String sql = "CREATE TABLE IF NOT EXISTS player (id BIGINT PRIMARY KEY AUTO_INCREMENT NOT NULL, username VARCHAR(64) NOT NULL, token VARCHAR(64) NOT NULL,login_time BIGINT NOT NULL, online_time BIGINT NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
try (var conn = ds.getConnection();
var stmt = conn.createStatement()) {
stmt.execute(sql);
}
}


/**
* 项目启动入口
*
* @param args 参数列表
*/
public static void main(String[] args) throws Exception {

// 构建 Actor 系统
final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "pekko-jdbc-actor");
final Scheduler scheduler = system.scheduler(); // 定时器


// 获取 hikariDataSource 数据库连接池配置
try (HikariDataSource hikariDataSource = HikariUtils.create(system)) {

// 创建初始化玩家表
createPlayerTable(hikariDataSource);


// 初始化 Hibernate 工厂
HibernateJdbcSessionFactory hibernate = new HibernateJdbcSessionFactory(
"pekko-jdbc-actor-hibernate",
hikariDataSource,
List.of(HibernatePlayer.class)
);


// 创建测试账号
try (HibernateJdbcSession session = hibernate.newInstance()) {
if (Objects.isNull(session.entityManager.find(HibernatePlayer.class, 10001L))) {
session.entityManager.persist(new HibernatePlayer(10001L, "MeteorCat", "empty-token", System.currentTimeMillis(), 0L));
session.entityManager.flush();
session.commit();
}
}

// 创建管理器对象
final ActorRef<Commands.ICommand> behaviour = system.systemActorOf(
PekkoActorBehaviour.create(hibernate, new HashMap<>()),
"pekko-jdbc-actor-behaviour",
Props.empty()
);


// 模拟 3s 有个账号登陆, 提交不存在的 id 和授权 token
scheduler.scheduleOnce(Duration.ofSeconds(3), () -> {
behaviour.tell(new Commands.Join(1000L, "ignore-token"));
}, system.executionContext());

// 模拟 5s 正确的账号登陆,
scheduler.scheduleOnce(Duration.ofSeconds(5), () -> {
behaviour.tell(new Commands.Join(10001L, "empty-token"));
}, system.executionContext());


// 写入 ActorSystem 退出回调
system.getWhenTerminated().thenAccept(done -> {
behaviour.unsafeUpcast().tell(PostStop.instance()); // 退出管理器
hibernate.close(); // 关闭会话工厂
hikariDataSource.close(); // 关闭连接池
});
}


// 任意键按下退出
System.out.println("Press RETURN to stop...");
int ignore = System.in.read();
system.terminate(); // 关闭 ActorSystem
}
}

这里就是比较粗糙的设计, 依托 Pekko 生态系统编写简单授权验证之后异步落地数据库的功能, 具体代码运行之后可以观察下执行流程.

实际的正式代码考虑精细度比上面还多, 不过整体流程就是和上面差不多, 只是要考虑底层更加易用从而抽象成专门的工具

上面的示例很多都是官方没有说明的, 所以在这过程当中耗费比较多时间去查资料, 实际上后续踩坑的地方比起刚刚那个仅仅是开端

注意: 上面例子 “下线处理” 仅提供思路, 正常登陆是将 Actor 的 Socket 更新并让老 Socket 断开, 不能直接断开重建 Actor

不过考虑到 Pekko 本身作为 “工具集” 而非高级 “框架”, 这点问题也可以接受, 方便自己手动定制裁减相关功能

另外需要说明的官方文档集成 Stream(流处理) 太过复杂了, 不建议直接上手就按照官方文档集成, 里面涉及的概念会让你更加迷糊.

官方文档甚至采用 Envelope(信箱投递) + Stream(流处理) 交叉使用做案例, 其中也没有说明相关依赖和具体实现类, 很容易越看越不懂