Java Pekko Pekko JDBC-ORM 数据库处理 MeteorCat 2026-01-23 2026-01-23 之前大部分操作都是针对单体 Actor, 但是实际生产之后免不了要操作数据库, 而 Pekko 内部已经集成这部分功能
大部分情况都会看到内部所有模块都标注 用于对象持久化处理, 这些都是用于保存 Actor 状态, 如果想简单使用的话太过复杂
Slink 集成另外说明, 因为数据库相关篇幅很复杂没办法直接讲清楚; 绝大部分情况 Slink 是在 Scala 上用, Java 用 JDBC 熟悉
这里采用 pekko-projection-jdbc 官方模块和文档说明, 最后说下这个过程踩过的坑, 这里通过 Maven 引入第三方库:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 <properties > <scala.binary.version > 2.13</scala.binary.version > <pekko.projection.version > 1.1.0</pekko.projection.version > </properties > <dependencies > <dependency > <groupId > org.apache.pekko</groupId > <artifactId > pekko-projection-core_${scala.binary.version}</artifactId > <version > ${pekko.projection.version}</version > </dependency > <dependency > <groupId > org.apache.pekko</groupId > <artifactId > pekko-projection-jdbc_${scala.binary.version}</artifactId > <version > ${pekko.projection.version}</version > </dependency > <dependency > <groupId > org.mariadb.jdbc</groupId > <artifactId > mariadb-java-client</artifactId > <version > 3.5.6</version > <scope > compile</scope > </dependency > <dependency > <groupId > com.zaxxer</groupId > <artifactId > HikariCP</artifactId > <version > 5.1.0</version > <scope > compile</scope > </dependency > <dependency > <groupId > org.hibernate.orm</groupId > <artifactId > hibernate-core</artifactId > <version > 7.2.1.Final</version > <scope > compile</scope > </dependency > </dependencies >
之后就是配置数据库相关参数, 我这边默认数据库内部已经有 pekko_game 数据库, 其他都是沿用默认数据配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 pekko { projection { jdbc { # MariaDB 完全兼容 MySQL 方言, 因此配置为 mysql-dialect 即可(必须配置,否则运行报错) dialect = "mysql-dialect" # 设置阻塞 JDBC 调度器(必须配置,否则运行报错) use-dispatcher = "pekko.projection.jdbc.blocking-jdbc-dispatcher" blocking-jdbc-dispatcher { type = Dispatcher executor = "thread-pool-executor" thread-pool-executor { # 关键: 与后续的连接池 HikariCP maximum-pool-size 保持一致 fixed-pool-size = 10 } throughput = 1 } # 设置偏移存储配置 offset-store { schema = "" # MySQL/MariaDB 无特殊 schema, 留空 table = "pekko_projection_offset_store" management-table = "pekko_projection_management" use-lowercase-schema = true # 对 MySQL/MariaDB 无影响, 保留默认 } # 设置调试日志(生产环境关闭) debug.verbose-offset-store-logging = true # 数据库连接池配置 connection-pool { url = "jdbc:mariadb://localhost:3306/pekko_game?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC&rewriteBatchedStatements=true" user = "pekko" password = "pekko123" driver = "org.mariadb.jdbc.Driver" # HikariCP 连接池参数(官方强烈建议使用连接池) hikaricp { maximum-pool-size = 10 # 与调度器线程数一致 minimum-idle = 5 # 生产环境建议不小于 5, 避免频繁创建连接 idle-timeout = 300000 # 5分钟, 超过则释放空闲连接 connection-timeout = 30000 # 30秒连接超时 max-lifetime = 1800000 # 30分钟, 强制刷新连接(避免数据库断开空闲连接) validation-timeout = 5000 # 5秒验证连接有效性 connection-test-query = "SELECT 1" # 验证连接的 SQL(MySQL/MariaDB 通用) } } } } }
这个文件可以创建在 src/main/resources 声明成 application.conf, 之后启动就会发现 pekko 异常:
1 2 [pekko-database-pekko.actor.default-dispatcher-4] INFO org.apache.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started [pekko-database-pekko.actor.default-dispatcher-5] WARN org.apache.pekko.util.ManifestInfo - You are using version 1.1.5 of Apache Pekko, but it appears you (perhaps indirectly) also depend on older versions of related artifacts. You can solve this by adding an explicit dependency on version 1.1.5 of the [pekko-protobuf-v3, pekko-persistence-query, pekko-stream, pekko-actor-typed, pekko-persistence] artifacts to your project. Here's a complete collection of detected artifacts: (1.1.3, [pekko-actor-typed, pekko-persistence, pekko-persistence-query, pekko-protobuf-v3, pekko-stream]), (1.1.5, [pekko-actor, pekko-slf4j]). See also: https://pekko.apache.org/docs/pekko/current/common/binary-compatibility-rules.html#mixed-versioning-is-not-allowed
这也是目前 pekko 最大的割裂, 我采用的是 pekko-actor 基础功能是最新版本 1.1.5, 但底层其他依赖内部都是使用 1.1.3 组件
官方文档解释怎么处理: https://pekko.apache.org/docs/pekko/current/common/binary-compatibility-rules.html#mixed-versioning-is-not-allowed
这里就需要提前显式声明 pekko-projection 内部依赖组件, 让其覆盖掉目前内部底层会引入版本信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 <dependencies > <dependency > <groupId > org.apache.pekko</groupId > <artifactId > pekko-actor-typed_${scala.binary.version}</artifactId > <version > ${pekko.version}</version > </dependency > <dependency > <groupId > org.apache.pekko</groupId > <artifactId > pekko-persistence_${scala.binary.version}</artifactId > <version > ${pekko.version}</version > </dependency > <dependency > <groupId > org.apache.pekko</groupId > <artifactId > pekko-persistence-query_${scala.binary.version}</artifactId > <version > ${pekko.version}</version > </dependency > <dependency > <groupId > org.apache.pekko</groupId > <artifactId > pekko-protobuf-v3_${scala.binary.version}</artifactId > <version > ${pekko.version}</version > </dependency > <dependency > <groupId > org.apache.pekko</groupId > <artifactId > pekko-stream_${scala.binary.version}</artifactId > <version > ${pekko.version}</version > </dependency > </dependencies >
另外如果后续启动现实以下错误(xxx is not configured相关):
1 Config value for 'pekko.projection.jdbc.blocking-jdbc-dispatcher.thread-pool-executor.fixed-pool-size' is not configured. The thread pool size must be integer and be as large as the JDBC connection pool.
Pekko 使用 JDBC 必须给他们分配单独线程池, 而且配置基本都是写死的, 这里需要检查 blocking-jdbc-dispatcher 是否正确配置成功
踩坑指南
首先必须要知道 pekko-jdbc 需要自定义以下关键字对象:
关键字
功能类型
核心作用
通俗解释
projectionId
ProjectionId
唯一标识当前的 Projection 实例
给你的数据库操作任务起一个唯一名字,用于区分不同的数据库处理任务
sourceProvider
SourceProvider<Offset, Envelope>
提供待处理的数据来源(数据流)
定义“要处理什么数据”,可以是数据库查询结果、消息队列数据、定时任务生成的数据等
sessionCreator
Supplier<S extends JdbcSession>
提供数据库会话实例
每次处理数据时,创建一个数据库连接会话
handler
Supplier<JdbcHandler<Envelope, S>>
定义数据处理逻辑
定义“怎么处理数据”,比如执行增删改查 SQL、事务控制等
这里按照官方文档来说明必须首先创建 JdbcSession 作为会话工厂的对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import org.apache.pekko.projection.jdbc.JdbcSession;import java.sql.Connection;import java.sql.SQLException;public static class PekkoJdbcSession implements JdbcSession { final Connection connection; public PekkoJdbcSession (final Connection connection) { this .connection = connection; } public PekkoJdbcSession (final Connection connection, boolean isExactlyOnce) throws SQLException { connection.setAutoCommit(!isExactlyOnce); this .connection = connection; } @Override public <Result> Result withConnection (Function<Connection, Result> func) throws Exception { return func.apply(connection); } @Override public void commit () throws SQLException { connection.commit(); } @Override public void rollback () throws SQLException { connection.rollback(); } @Override public void close () throws SQLException { connection.close(); } }
官方后续提到怎么去实现 Hibernate-ORM 的支持, 方便采用现代化 Hibernate-ORM 处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public static class HibernateJdbcSession implements JdbcSession { public final EntityManager entityManager; private final EntityTransaction transaction; public HibernateJdbcSession (EntityManager entityManager) { this .entityManager = entityManager; this .transaction = this .entityManager.getTransaction(); this .transaction.begin(); } @Override public <Result> Result withConnection (Function<Connection, Result> func) { Session hibernateSession = entityManager.unwrap(Session.class); return hibernateSession.doReturningWork( connection -> { try { return func.apply(connection); } catch (SQLException e) { throw e; } catch (Exception e) { throw new SQLException (e); } }); } @Override public void commit () { if (transaction.isActive()) transaction.commit(); } @Override public void rollback () { if (transaction.isActive()) transaction.rollback(); } @Override public void close () { if (this .entityManager.isOpen()) this .entityManager.close(); } }
这里还需要声明 Hibernate 的数据构建工厂, 用于创建初始化 Hibernate 工厂对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public static class HibernateJdbcSessionFactory { private final EntityManagerFactory entityManagerFactory; public HibernateJdbcSessionFactory (String name) { this .entityManagerFactory = Persistence.createEntityManagerFactory(name); } public HibernateJdbcSession newInstance () { return new HibernateJdbcSession (entityManagerFactory.createEntityManager()); } public void close () { if (entityManagerFactory.isOpen()) { entityManagerFactory.close(); } } }
这里确认编写启动测试例子, 后面才会慢慢讲解怎么去 PekkoJDBC 是怎么去解析处理的, 最后添加 Hikari 转化工具:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import com.typesafe.config.Config;import com.zaxxer.hikari.HikariDataSource;import org.apache.pekko.projection.jdbc.internal.JdbcSettings;public static final class HikariUtils { public static HikariDataSource create (JdbcSettings settings) { Config jdbcConfig = settings.config(); Config poolConfig = jdbcConfig.getConfig("connection-pool" ); Config hikariConfig = poolConfig.getConfig("hikaricp" ); HikariDataSource props = new HikariDataSource (); props.setJdbcUrl(poolConfig.getString("url" )); props.setUsername(poolConfig.getString("user" )); props.setPassword(poolConfig.getString("password" )); props.setDriverClassName(poolConfig.getString("driver" )); props.setMaximumPoolSize(hikariConfig.getInt("maximum-pool-size" )); props.setMinimumIdle(hikariConfig.getInt("minimum-idle" )); props.setIdleTimeout(hikariConfig.getLong("idle-timeout" )); props.setConnectionTimeout(hikariConfig.getLong("connection-timeout" )); props.setMaxLifetime(hikariConfig.getLong("max-lifetime" )); props.setValidationTimeout(hikariConfig.getLong("validation-timeout" )); props.setConnectionTestQuery(hikariConfig.getString("connection-test-query" )); return props; } public static <T> HikariDataSource create (ActorSystem<T> system) { return create(JdbcSettings.apply(system)); } }
官方稳定其实说得很模糊, 所以造就这部分功能很难入手处理; 这里面所有功能涉及的模块功能, 包含且不止有:
数据库 JDBC 驱动
PekkoActor 和 PekkoProjection 版本差异
PekkoActor 弱类型与强类型的转换
不同数据库的不同驱动(MySQL/MariaDB/Postgres)
Hibernate-ORM 的数据实体建模
PekkoStream 数据流操作处理
Pekko Projection 提供的偏移存储(Offset Store)
这些模块都是社区当中的大型解决方案, 单独某一项了解上手都要很久时间来学习.
其他框架易用性这么高, 其实就是底层已经提取封装好了, 基本上在配完系统配置就完成模块自动装载, 而 Pekko 就需要自己动手来包装
另外还有个很大问题: 不同版本的 Provider 实现是不同的, 导致网上和文档相关内容可能写出来是错误的, 甚至没办法跑通测试例子
按照 PekkoJDBC 的官方文档配置我最终也没跑通运行, 官方最开始的说明:
1 The source of the envelopes can be events from Apache Pekko Persistence or any other SourceProvider with supported offset types.
也是要求采用持久化功能来处理, 需要手动处理 信封(envelopes) 封装提取出消息的流程, 按照官方文档那个页面教程是没办法直接跑通.
问题排查
这里我查询资料之后最终测试运行成功跑通最后的 JDBC 例子, 这里代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 import com.typesafe.config.Config;import com.zaxxer.hikari.HikariDataSource;import org.apache.pekko.actor.typed.ActorSystem;import org.apache.pekko.actor.typed.javadsl.Behaviors;import org.apache.pekko.japi.function.Function;import org.apache.pekko.projection.jdbc.JdbcSession;import org.apache.pekko.projection.jdbc.internal.JdbcSettings;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.util.ArrayList;import java.util.List;public class PekkoJdbcSimpleExample { public static final class HikariUtils { public static HikariDataSource create (JdbcSettings settings) { Config jdbcConfig = settings.config(); Config poolConfig = jdbcConfig.getConfig("connection-pool" ); Config hikariConfig = poolConfig.getConfig("hikaricp" ); HikariDataSource props = new HikariDataSource (); props.setJdbcUrl(poolConfig.getString("url" )); props.setUsername(poolConfig.getString("user" )); props.setPassword(poolConfig.getString("password" )); props.setDriverClassName(poolConfig.getString("driver" )); props.setMaximumPoolSize(hikariConfig.getInt("maximum-pool-size" )); props.setMinimumIdle(hikariConfig.getInt("minimum-idle" )); props.setIdleTimeout(hikariConfig.getLong("idle-timeout" )); props.setConnectionTimeout(hikariConfig.getLong("connection-timeout" )); props.setMaxLifetime(hikariConfig.getLong("max-lifetime" )); props.setValidationTimeout(hikariConfig.getLong("validation-timeout" )); props.setConnectionTestQuery(hikariConfig.getString("connection-test-query" )); return props; } public static <T> HikariDataSource create (ActorSystem<T> system) { return create(JdbcSettings.apply(system)); } } public static class PekkoJdbcSession implements JdbcSession , AutoCloseable { final Connection connection; public PekkoJdbcSession (final Connection connection) { this .connection = connection; } public PekkoJdbcSession (final Connection connection, boolean isExactlyOnce) throws SQLException { connection.setAutoCommit(!isExactlyOnce); this .connection = connection; } @Override public <Result> Result withConnection (Function<Connection, Result> func) throws Exception { return func.apply(connection); } @Override public void commit () throws SQLException { connection.commit(); } @Override public void rollback () throws SQLException { connection.rollback(); } @Override public void close () throws SQLException { connection.close(); } } public static class User { private Long id; private String name; private Integer age; public User (Long id, String name, Integer age) { this .id = id; this .name = name; this .age = age; } @Override public String toString () { return "User{id=" + id + ", name='" + name + "', age=" + age + "}" ; } } private static void createTestTable (HikariDataSource ds) throws Exception { String sql = "CREATE TABLE IF NOT EXISTS user_test (id BIGINT PRIMARY KEY, name VARCHAR(50), age INT)" ; try (var conn = ds.getConnection(); var stmt = conn.createStatement()) { stmt.execute(sql); } } private static void insertUser (HikariDataSource ds, User user) throws Exception { String sql = "INSERT INTO user_test (id, name, age) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE name=?, age=?" ; try (PekkoJdbcSession session = new PekkoJdbcSession (ds.getConnection())) { session.withConnection(conn -> { try (PreparedStatement stmt = conn.prepareStatement(sql)) { stmt.setLong(1 , user.id); stmt.setString(2 , user.name); stmt.setInt(3 , user.age); stmt.setString(4 , user.name); stmt.setInt(5 , user.age); return stmt.executeUpdate(); } }); session.commit(); } } private static List<User> listAllUsers (HikariDataSource ds) throws Exception { String sql = "SELECT id, name, age FROM user_test" ; List<User> users = new ArrayList <>(); try (PekkoJdbcSession session = new PekkoJdbcSession (ds.getConnection())) { session.withConnection(conn -> { try (PreparedStatement stmt = conn.prepareStatement(sql); ResultSet rs = stmt.executeQuery()) { while (rs.next()) { users.add(new User ( rs.getLong("id" ), rs.getString("name" ), rs.getInt("age" ) )); } return null ; } }); } return users; } public static void main (String[] args) throws Exception { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "pekko-database" ); org.apache.pekko.actor.ActorSystem classicSystem = system.classicSystem(); HikariDataSource hikariDataSource = HikariUtils.create(system); createTestTable(hikariDataSource); insertUser(hikariDataSource, new User (1L , "MeteorGX" , 18 )); insertUser(hikariDataSource, new User (2L , "MeteorCat" , 30 )); List<User> users = listAllUsers(hikariDataSource); System.out.println("查询到的用户列表:" ); users.forEach(System.out::println); system.getWhenTerminated().thenAccept(ignore -> { hikariDataSource.close(); System.out.println("Clean Actor Resource" ); }); System.out.println("Press RETURN to stop..." ); int ignore = System.in.read(); system.terminate(); } }
这里启动之后会在数据库当中构建出 user_test 表, 后续就是插入数据最后查询处理, 而且能够看到具体的 HikariPool 启动打印内容.
后面追加 Hibernate 支持也是网上资料查询之后测试出来的, 这里补充个后续采用的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 import com.typesafe.config.Config;import com.zaxxer.hikari.HikariDataSource;import jakarta.persistence.*;import org.apache.pekko.actor.typed.ActorSystem;import org.apache.pekko.actor.typed.javadsl.Behaviors;import org.apache.pekko.japi.function.Function;import org.apache.pekko.projection.jdbc.JdbcSession;import org.apache.pekko.projection.jdbc.internal.JdbcSettings;import org.hibernate.Session;import org.hibernate.cfg.HikariCPSettings;import java.sql.Connection;import java.sql.SQLException;import java.util.HashMap;import java.util.Map;import java.util.Objects;import java.util.stream.Collectors;public class PekkoJdbcHibernateExample { public static final class HikariUtils { public static HikariDataSource create (JdbcSettings settings) { Config jdbcConfig = settings.config(); Config poolConfig = jdbcConfig.getConfig("connection-pool" ); Config hikariConfig = poolConfig.getConfig("hikaricp" ); HikariDataSource props = new HikariDataSource (); props.setJdbcUrl(poolConfig.getString("url" )); props.setUsername(poolConfig.getString("user" )); props.setPassword(poolConfig.getString("password" )); props.setDriverClassName(poolConfig.getString("driver" )); props.setMaximumPoolSize(hikariConfig.getInt("maximum-pool-size" )); props.setMinimumIdle(hikariConfig.getInt("minimum-idle" )); props.setIdleTimeout(hikariConfig.getLong("idle-timeout" )); props.setConnectionTimeout(hikariConfig.getLong("connection-timeout" )); props.setMaxLifetime(hikariConfig.getLong("max-lifetime" )); props.setValidationTimeout(hikariConfig.getLong("validation-timeout" )); props.setConnectionTestQuery(hikariConfig.getString("connection-test-query" )); return props; } public static <T> HikariDataSource create (ActorSystem<T> system) { return create(JdbcSettings.apply(system)); } } public static class HibernateJdbcSession implements JdbcSession , AutoCloseable { public final EntityManager entityManager; private final EntityTransaction transaction; public HibernateJdbcSession (EntityManager entityManager) { this .entityManager = entityManager; this .transaction = this .entityManager.getTransaction(); this .transaction.begin(); } @Override public <Result> Result withConnection (Function<Connection, Result> func) { Session hibernateSession = entityManager.unwrap(Session.class); return hibernateSession.doReturningWork( connection -> { try { return func.apply(connection); } catch (SQLException e) { throw e; } catch (Exception e) { throw new SQLException (e); } }); } @Override public void commit () { if (transaction.isActive()) transaction.commit(); } @Override public void rollback () { if (transaction.isActive()) transaction.rollback(); } @Override public void close () { if (this .entityManager.isOpen()) this .entityManager.close(); } } public static class HibernateJdbcSessionFactory implements AutoCloseable { private final EntityManagerFactory entityManagerFactory; public HibernateJdbcSessionFactory (String name) { this .entityManagerFactory = Persistence.createEntityManagerFactory(name); } public HibernateJdbcSessionFactory (String name, HikariDataSource hikariDataSource ) { Map<String, Object> prop = new HashMap <>(); prop.put(org.hibernate.cfg.JdbcSettings.CONNECTION_PROVIDER, "hikaricp" ); prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_DRIVER, hikariDataSource.getDriverClassName()); prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_URL, hikariDataSource.getJdbcUrl()); prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_USER, hikariDataSource.getUsername()); prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_PASSWORD, hikariDataSource.getPassword()); prop.put(org.hibernate.cfg.JdbcSettings.HIGHLIGHT_SQL, true ); prop.put(org.hibernate.cfg.JdbcSettings.SHOW_SQL, true ); prop.put(org.hibernate.cfg.JdbcSettings.FORMAT_SQL, true ); prop.put(HikariCPSettings.HIKARI_MAX_SIZE, hikariDataSource.getMaximumPoolSize()); prop.put(HikariCPSettings.HIKARI_MIN_IDLE_SIZE, hikariDataSource.getMinimumIdle()); prop.put(HikariCPSettings.HIKARI_MAX_LIFETIME, hikariDataSource.getMaxLifetime()); prop.put(HikariCPSettings.HIKARI_LEAK_TIMEOUT, hikariDataSource.getLeakDetectionThreshold()); prop.put(HikariCPSettings.HIKARI_IDLE_TIMEOUT, hikariDataSource.getIdleTimeout()); prop.put(HikariCPSettings.HIKARI_ACQUISITION_TIMEOUT, hikariDataSource.getConnectionTimeout()); prop.put(HikariCPSettings.HIKARI_VALIDATION_TIMEOUT, hikariDataSource.getValidationTimeout()); prop.put(HikariCPSettings.HIKARI_INITIALIZATION_TIMEOUT, hikariDataSource.getInitializationFailTimeout()); prop.put(HikariCPSettings.HIKARI_KEEPALIVE_TIME, hikariDataSource.getKeepaliveTime()); prop.put(HikariCPSettings.HIKARI_INITIAL_SQL, hikariDataSource.getConnectionInitSql()); prop.put(HikariCPSettings.HIKARI_POOL_NAME, hikariDataSource.getPoolName()); prop.put(HikariCPSettings.HIKARI_READ_ONLY, hikariDataSource.isReadOnly()); prop.put(HikariCPSettings.HIKARI_ISOLATE_INTERNAL_QUERIES, hikariDataSource.isIsolateInternalQueries()); prop = prop.entrySet() .stream() .filter(active -> Objects.nonNull(active.getValue())) .collect(Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue )); this .entityManagerFactory = new PersistenceConfiguration (name) .properties(prop) .managedClass(HibernateUser.class) .createEntityManagerFactory(); } public HibernateJdbcSession newInstance () { return new HibernateJdbcSession (entityManagerFactory.createEntityManager()); } public void close () { if (entityManagerFactory.isOpen()) { entityManagerFactory.close(); } } } @Entity @Table(name = "user_test") public static class HibernateUser { @Id private Long id; private String name; private Integer age; public HibernateUser () { } public HibernateUser (Long id, String name, Integer age) { this .id = id; this .name = name; this .age = age; } public Long getId () { return id; } public void setId (Long id) { this .id = id; } public String getName () { return name; } public void setName (String name) { this .name = name; } public Integer getAge () { return age; } public void setAge (Integer age) { this .age = age; } } private static void createTestTable (HikariDataSource ds) throws Exception { String sql = "CREATE TABLE IF NOT EXISTS user_test (id BIGINT PRIMARY KEY, name VARCHAR(50), age INT)" ; try (var conn = ds.getConnection(); var stmt = conn.createStatement()) { stmt.execute(sql); } } public static void main (String[] args) throws Exception { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "pekko-database" ); org.apache.pekko.actor.ActorSystem classicSystem = system.classicSystem(); HikariDataSource hikariDataSource = PekkoJdbcSimpleExample.HikariUtils.create(system); HibernateJdbcSessionFactory hibernateFactory = new HibernateJdbcSessionFactory ( "pekko-hibernate" , hikariDataSource ); createTestTable(hikariDataSource); try (HibernateJdbcSession session = hibernateFactory.newInstance()) { if (Objects.isNull(session.entityManager.find(HibernateUser.class, 3L ))) { session.entityManager.persist(new HibernateUser (3L , "NewMeteorGX" , 28 )); } if (Objects.isNull(session.entityManager.find(HibernateUser.class, 4L ))) { session.entityManager.persist(new HibernateUser (4L , "NewMeteorCat" , 56 )); } session.commit(); } try (HibernateJdbcSession session = hibernateFactory.newInstance()) { HibernateUser user = session.entityManager.find(HibernateUser.class, 4L ); System.out.println("Hibernate 查询到的用户:" + user.getName()); } system.getWhenTerminated().thenAccept(ignore -> { hibernateFactory.close(); hikariDataSource.close(); System.out.println("Clean Actor Resource" ); }); System.out.println("Press RETURN to stop..." ); int ignore = System.in.read(); system.terminate(); } }
这里就是采用 hibernate 的数据库交互功能, 其中需要手动加载 @Entity 注解的类对象, 想要自动启动扫描这是另外实现的功能.
Actor 结合
现在就可以让 JDBC 结合 Actor 挂载来处理授权验证并加载数据库玩家信息, 最后动态创建子 Actor 挂载运行并且异步落地到数据库:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 import com.typesafe.config.Config;import com.zaxxer.hikari.HikariDataSource;import jakarta.persistence.*;import org.apache.pekko.actor.Cancellable;import org.apache.pekko.actor.typed.*;import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;import org.apache.pekko.actor.typed.javadsl.ActorContext;import org.apache.pekko.actor.typed.javadsl.Behaviors;import org.apache.pekko.actor.typed.javadsl.Receive;import org.apache.pekko.japi.function.Function;import org.apache.pekko.projection.jdbc.JdbcSession;import org.apache.pekko.projection.jdbc.internal.JdbcSettings;import org.hibernate.Session;import org.hibernate.cfg.HikariCPSettings;import org.slf4j.Logger;import java.sql.Connection;import java.sql.SQLException;import java.time.Duration;import java.util.*;import java.util.stream.Collectors;public class PekkoJdbcActorExample { public static final class HikariUtils { public static HikariDataSource create (JdbcSettings settings) { Config jdbcConfig = settings.config(); Config poolConfig = jdbcConfig.getConfig("connection-pool" ); Config hikariConfig = poolConfig.getConfig("hikaricp" ); HikariDataSource props = new HikariDataSource (); props.setJdbcUrl(poolConfig.getString("url" )); props.setUsername(poolConfig.getString("user" )); props.setPassword(poolConfig.getString("password" )); props.setDriverClassName(poolConfig.getString("driver" )); props.setMaximumPoolSize(hikariConfig.getInt("maximum-pool-size" )); props.setMinimumIdle(hikariConfig.getInt("minimum-idle" )); props.setIdleTimeout(hikariConfig.getLong("idle-timeout" )); props.setConnectionTimeout(hikariConfig.getLong("connection-timeout" )); props.setMaxLifetime(hikariConfig.getLong("max-lifetime" )); props.setValidationTimeout(hikariConfig.getLong("validation-timeout" )); props.setConnectionTestQuery(hikariConfig.getString("connection-test-query" )); return props; } public static <T> HikariDataSource create (ActorSystem<T> system) { return create(JdbcSettings.apply(system)); } } public static class HibernateJdbcSession implements JdbcSession , AutoCloseable { public final EntityManager entityManager; private final EntityTransaction transaction; public HibernateJdbcSession (EntityManager entityManager) { this .entityManager = entityManager; this .transaction = this .entityManager.getTransaction(); this .transaction.begin(); } @Override public <Result> Result withConnection (Function<Connection, Result> func) { Session hibernateSession = entityManager.unwrap(Session.class); return hibernateSession.doReturningWork( connection -> { try { return func.apply(connection); } catch (SQLException e) { throw e; } catch (Exception e) { throw new SQLException (e); } }); } @Override public void commit () { if (transaction.isActive()) transaction.commit(); } @Override public void rollback () { if (transaction.isActive()) transaction.rollback(); } @Override public void close () { if (this .entityManager.isOpen()) this .entityManager.close(); } } public static class HibernateJdbcSessionFactory implements AutoCloseable { private final EntityManagerFactory entityManagerFactory; public HibernateJdbcSessionFactory (final String name) { this .entityManagerFactory = Persistence.createEntityManagerFactory(name); } public HibernateJdbcSessionFactory (final String name, HikariDataSource hikariDataSource, List<Class<?>> clazz) { Map<String, Object> prop = new HashMap <>(); prop.put(org.hibernate.cfg.JdbcSettings.CONNECTION_PROVIDER, "hikaricp" ); prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_DRIVER, hikariDataSource.getDriverClassName()); prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_URL, hikariDataSource.getJdbcUrl()); prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_USER, hikariDataSource.getUsername()); prop.put(org.hibernate.cfg.JdbcSettings.JAKARTA_JDBC_PASSWORD, hikariDataSource.getPassword()); prop.put(org.hibernate.cfg.JdbcSettings.HIGHLIGHT_SQL, true ); prop.put(org.hibernate.cfg.JdbcSettings.SHOW_SQL, true ); prop.put(org.hibernate.cfg.JdbcSettings.FORMAT_SQL, true ); prop.put(HikariCPSettings.HIKARI_MAX_SIZE, hikariDataSource.getMaximumPoolSize()); prop.put(HikariCPSettings.HIKARI_MIN_IDLE_SIZE, hikariDataSource.getMinimumIdle()); prop.put(HikariCPSettings.HIKARI_MAX_LIFETIME, hikariDataSource.getMaxLifetime()); prop.put(HikariCPSettings.HIKARI_LEAK_TIMEOUT, hikariDataSource.getLeakDetectionThreshold()); prop.put(HikariCPSettings.HIKARI_IDLE_TIMEOUT, hikariDataSource.getIdleTimeout()); prop.put(HikariCPSettings.HIKARI_ACQUISITION_TIMEOUT, hikariDataSource.getConnectionTimeout()); prop.put(HikariCPSettings.HIKARI_VALIDATION_TIMEOUT, hikariDataSource.getValidationTimeout()); prop.put(HikariCPSettings.HIKARI_INITIALIZATION_TIMEOUT, hikariDataSource.getInitializationFailTimeout()); prop.put(HikariCPSettings.HIKARI_KEEPALIVE_TIME, hikariDataSource.getKeepaliveTime()); prop.put(HikariCPSettings.HIKARI_INITIAL_SQL, hikariDataSource.getConnectionInitSql()); prop.put(HikariCPSettings.HIKARI_POOL_NAME, hikariDataSource.getPoolName()); prop.put(HikariCPSettings.HIKARI_READ_ONLY, hikariDataSource.isReadOnly()); prop.put(HikariCPSettings.HIKARI_ISOLATE_INTERNAL_QUERIES, hikariDataSource.isIsolateInternalQueries()); prop = prop.entrySet() .stream() .filter(active -> Objects.nonNull(active.getValue())) .collect(Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue )); PersistenceConfiguration configuration = new PersistenceConfiguration (name); configuration.properties(prop); clazz.forEach(configuration::managedClass); this .entityManagerFactory = configuration.createEntityManagerFactory(); } public HibernateJdbcSession newInstance () { return new HibernateJdbcSession (entityManagerFactory.createEntityManager()); } public void close () { if (entityManagerFactory.isOpen()) { entityManagerFactory.close(); } } } public static class Commands { public interface ICommand { } public record Join (long id, String token) implements ICommand { } public record Leave (long id) implements ICommand { } } public static class PekkoActorBehaviour extends AbstractBehavior <Commands.ICommand> { final Logger log = getContext().getLog(); final HibernateJdbcSessionFactory factory; final Map<Long, ActorRef<Commands.ICommand>> actors; private PekkoActorBehaviour (ActorContext<Commands.ICommand> context, HibernateJdbcSessionFactory factory, Map<Long, ActorRef<Commands.ICommand>> actors) { super (context); this .factory = factory; this .actors = actors; } public static Behavior<Commands.ICommand> create(HibernateJdbcSessionFactory factory, final Map<Long, ActorRef<Commands.ICommand>> actors) { return Behaviors.setup((ctx) -> new PekkoActorBehaviour (ctx, factory, actors)); } @Override public Receive<Commands.ICommand> createReceive() { return newReceiveBuilder() .onSignal(PostStop.class, this ::onStop) .onMessage(Commands.Join.class, this ::onJoin) .onMessage(Commands.Leave.class, this ::onLeave) .build(); } private Behavior<Commands.ICommand> onStop(PostStop ignore) { log.warn("Actor stopped: {}" , getContext().getSelf().path().toSerializationFormat()); actors.forEach((id, actor) -> getContext().stop(actor)); return Behaviors.stopped(); } private Behavior<Commands.ICommand> onJoin(Commands.Join command) { try (HibernateJdbcSession session = factory.newInstance()) { final EntityManager manager = session.entityManager; HibernatePlayer player = manager.find(HibernatePlayer.class, command.id); if (Objects.isNull(player)) { log.warn("账号不存在: {}" , command.id); return this ; } if (!player.token.equals(command.token)) { log.warn("授权不正确: {}" , command.id); return this ; } ActorRef<Commands.ICommand> online = actors.get(command.id); if (Objects.nonNull(online)) { getContext().stop(online); actors.remove(command.id); } ActorRef<Commands.ICommand> actor = getContext().spawn( PekkoActorSession.create(player, factory), "pekko-jdbc-actor-session-%d" .formatted(command.id) ); actors.put(command.id, actor); } catch (Exception e) { log.warn("无法打开数据库" , e); } return this ; } private Behavior<Commands.ICommand> onLeave(Commands.Leave command) { final ActorRef<Commands.ICommand> active = actors.get(command.id); if (Objects.isNull(active)) return Behaviors.same(); active.unsafeUpcast().tell(PostStop.instance()); return this ; } } public static class PekkoActorSession extends AbstractBehavior <Commands.ICommand> { final Logger log = getContext().getLog(); final Scheduler scheduler = getContext().getSystem().scheduler(); final HibernateJdbcSessionFactory factory; HibernatePlayer player; Cancellable cancellable; record ScheduleUpdatePlayer () implements Commands .ICommand { } private PekkoActorSession (ActorContext<Commands.ICommand> context, HibernatePlayer player, HibernateJdbcSessionFactory factory) { super (context); this .player = player; this .factory = factory; this .player.loginTime = System.currentTimeMillis(); cancellable = scheduler.scheduleAtFixedRate( Duration.ofSeconds(5 ), Duration.ofSeconds(5 ), () -> getContext().getSelf().tell(new ScheduleUpdatePlayer ()), getContext().getExecutionContext() ); log.info("玩家数据授权成功: {}" , player.onlineTime); } public static Behavior<Commands.ICommand> create(HibernatePlayer player, HibernateJdbcSessionFactory factory) { return Behaviors.setup((ctx) -> new PekkoActorSession (ctx, player, factory)); } @Override public Receive<Commands.ICommand> createReceive() { return newReceiveBuilder() .onSignal(PostStop.class, this ::onStop) .onMessage(ScheduleUpdatePlayer.class, this ::onSave) .build(); } private Behavior<Commands.ICommand> onStop(PostStop ignore) { log.warn("Session stopped: {}" , getContext().getSelf().path().toSerializationFormat()); if (Objects.nonNull(cancellable)) cancellable.cancel(); onSave(new ScheduleUpdatePlayer ()); return Behaviors.stopped(); } private Behavior<Commands.ICommand> onSave(ScheduleUpdatePlayer ignore) { try (HibernateJdbcSession session = factory.newInstance()) { long now = System.currentTimeMillis(); long offset = now - player.loginTime; player.onlineTime = offset + player.onlineTime; session.entityManager.merge(player); session.commit(); log.info("玩家数据落地成功: {}" , player); } catch (Exception e) { log.warn("玩家数据无法落地" , e); } return this ; } } @Entity @Table(name = "player") public static class HibernatePlayer { @Id private Long id; @Column(nullable = false) private String username; @Column(nullable = false) private String token; @Column(nullable = false, name = "login_time") private Long loginTime; @Column(nullable = false, name = "online_time") private Long onlineTime; public HibernatePlayer (Long id, String username, String token, Long loginTime, Long onlineTime) { this .id = id; this .username = username; this .token = token; this .loginTime = loginTime; this .onlineTime = onlineTime; } public HibernatePlayer () { } public Long getId () { return id; } public void setId (Long id) { this .id = id; } public String getUsername () { return username; } public void setUsername (String username) { this .username = username; } public String getToken () { return token; } public void setToken (String token) { this .token = token; } public Long getLoginTime () { return loginTime; } public void setLoginTime (Long loginTime) { this .loginTime = loginTime; } public Long getOnlineTime () { return onlineTime; } public void setOnlineTime (Long onlineTime) { this .onlineTime = onlineTime; } } private static void createPlayerTable (HikariDataSource ds) throws Exception { String sql = "CREATE TABLE IF NOT EXISTS player (id BIGINT PRIMARY KEY AUTO_INCREMENT NOT NULL, username VARCHAR(64) NOT NULL, token VARCHAR(64) NOT NULL,login_time BIGINT NOT NULL, online_time BIGINT NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4" ; try (var conn = ds.getConnection(); var stmt = conn.createStatement()) { stmt.execute(sql); } } public static void main (String[] args) throws Exception { final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "pekko-jdbc-actor" ); final Scheduler scheduler = system.scheduler(); try (HikariDataSource hikariDataSource = HikariUtils.create(system)) { createPlayerTable(hikariDataSource); HibernateJdbcSessionFactory hibernate = new HibernateJdbcSessionFactory ( "pekko-jdbc-actor-hibernate" , hikariDataSource, List.of(HibernatePlayer.class) ); try (HibernateJdbcSession session = hibernate.newInstance()) { if (Objects.isNull(session.entityManager.find(HibernatePlayer.class, 10001L ))) { session.entityManager.persist(new HibernatePlayer (10001L , "MeteorCat" , "empty-token" , System.currentTimeMillis(), 0L )); session.entityManager.flush(); session.commit(); } } final ActorRef<Commands.ICommand> behaviour = system.systemActorOf( PekkoActorBehaviour.create(hibernate, new HashMap <>()), "pekko-jdbc-actor-behaviour" , Props.empty() ); scheduler.scheduleOnce(Duration.ofSeconds(3 ), () -> { behaviour.tell(new Commands .Join(1000L , "ignore-token" )); }, system.executionContext()); scheduler.scheduleOnce(Duration.ofSeconds(5 ), () -> { behaviour.tell(new Commands .Join(10001L , "empty-token" )); }, system.executionContext()); system.getWhenTerminated().thenAccept(done -> { behaviour.unsafeUpcast().tell(PostStop.instance()); hibernate.close(); hikariDataSource.close(); }); } System.out.println("Press RETURN to stop..." ); int ignore = System.in.read(); system.terminate(); } }
这里就是比较粗糙的设计, 依托 Pekko 生态系统编写简单授权验证之后异步落地数据库的功能, 具体代码运行之后可以观察下执行流程.
实际的正式代码考虑精细度比上面还多, 不过整体流程就是和上面差不多, 只是要考虑底层更加易用从而抽象成专门的工具
上面的示例很多都是官方没有说明的, 所以在这过程当中耗费比较多时间去查资料, 实际上后续踩坑的地方比起刚刚那个仅仅是开端
注意: 上面例子 “下线处理” 仅提供思路, 正常登陆是将 Actor 的 Socket 更新并让老 Socket 断开, 不能直接断开重建 Actor
不过考虑到 Pekko 本身作为 “工具集” 而非高级 “框架”, 这点问题也可以接受, 方便自己手动定制裁减相关功能
另外需要说明的官方文档集成 Stream(流处理) 太过复杂了, 不建议直接上手就按照官方文档集成, 里面涉及的概念会让你更加迷糊.
官方文档甚至采用 Envelope(信箱投递) + Stream(流处理) 交叉使用做案例, 其中也没有说明相关依赖和具体实现类, 很容易越看越不懂