Pekko 异步运行时

Pekko 异步运行时
MeteorCat早期 Java 的并没有统一的异步运行时, 所以很多都需要去自行实现处理, 而 Pekko 则是自行设计通用的异步运行时.
Pekko 异步运行时是由 Actor 模型 + 调度器 + 流处理 + 未来式(Future) 构成的完整异步运行体系, 包含有以下组件:
-
Scheduler/Dispatcher: 任务定时器/调度器, 底层执行核心, 负责线程分配和任务调度, 隔离不同类型的任务 -
Future/CompletionStage: 异步任务结果包装, 处理异步任务的返回值,避免同步阻塞等待 -
Stream: 支持运行时的数据流处理, 解决批量数据的异步生产 - 消费问题, 内置背压避免流量失控 -
Actor: 最上层的任务载体, 用 “消息队列 + 单线程执行” 的模式将并发问题收敛到"消息顺序"而非"锁"
这些组件都是能够独立使用
在 Java8 之后可以依靠 Runnable/Callable/ThreadPool 实现简单的异步运行时, 但是对比 Pekko 纯正 Actor 设计差距很大:
| 特性 | Pekko 异步运行时 | Java 原生异步(Java 8+) |
|---|---|---|
| 核心模型 | Actor + 消息驱动 + 流处理 | CompletableFuture + 线程池 + NIO |
| 分布式支持 | 天然支持(Pekko Cluster) | 无原生支持,需手动实现 |
| 背压机制 | 内置(Stream 模块) | 无,需手动实现(如 BlockingQueue 阻塞) |
| 线程安全 | 单 Actor 单线程,无需锁 | 需手动加锁/使用并发容器 |
| 异常隔离 | Actor 故障域隔离,单个 Actor 异常不扩散 | 线程池任务异常可能导致线程终止 |
| 编程复杂度 | 较高(需理解 Actor/流) | 较低,但复杂场景(如分布式)需大量封装 |
手写异步任务对于锁的处理一定要小心, 否则死锁阻塞任务的问题会很频繁
这里需要说明 Pekko 异步运行时相关的函数方法, 这些都是需要被调度器来唤醒执行:
-
AskPattern.ask(): Actor 异步交互的核心方法,替代同步调用 + 锁, 需指定超时时间避免无限等待 -
thenApply()/thenAccept()/exceptionally(): 异步结果的转换、消费、异常处理,全程非阻塞 -
CompletableFuture.allOf()/anyOf():多 Future 并行组合, 适合运行批量异步任务
需要去参阅官方文档说明:
-
CompletableFuture: Java 内部的已经内置任务处理结果, 位于 java/util/concurrent/CompletableFuture.java 内置工具库之中
-
Patterns: https://nightlies.apache.org/pekko/docs/pekko/1.4/japi/org/apache/pekko/pattern/Patterns.html
Patterns 针对经典 Actor 设计, Typed Actor 需用 AskPattern, 这是个很重要的接口, 涵盖以下功能和场景:
| 方法 | 核心作用 | 典型场景 |
|---|---|---|
ask |
向 Actor 异步发请求并获取结果 | 查询数据、调用需要返回值的 Actor 方法 |
pipe |
将 Future 结果自动转发给 Actor | 异步任务结果投递到其他 Actor 处理 |
after |
延迟执行异步操作 | 超时兜底、定时触发 |
retry |
自动重试失败的异步操作 | 不稳定服务的重试(网络/数据库故障) |
要驱动 Pekko Actor 去执行任务则是依赖 Scheduler 来进行调度:
1 | /** |
定时调度器一般提供给类似游戏服务端挂载定时调用功能, 比如 角色|怪物每秒移动 / 金币|钻石等资源的同步增长 / 每日跨天更新通知
而 Patterns 不需要直接延迟调度, 只是需要做类似于 请求 - 响应 这种强状态相关的调度, 类似下面:
1 | /** |
这种方式是最常用的, 涉及到强关联的数据就必须要做到 要么直接成功获取结果, 要么直接失败抛出外层异常, 绝对不要有任何中间意外因素.
需要注意: Patterns 会占用 Promise 等资源等待响应, 所以才需要超时处理功能, 应该避免使用 Patterns 防止 Actor 处理缓慢
如果没有数据强制上下文一致性的情况, 将 Patterns 改为自定义回调消息然后拦截更好点, 而内置的 CompletionStage 功能基本如下:
1. 单阶段任务后续操作(触发条件:当前阶段正常完成)
这类方法以 then 为前缀,用于当前异步阶段完成后执行下一个操作,又细分为三类:
| 方法前缀 | 核心逻辑 | 代表方法 | 作用 |
|---|---|---|---|
| thenApply | 转换结果(有入参有返回) | thenApply/thenApplyAsync | 接收当前阶段结果,通过 Function 转换为新结果,返回新的 CompletionStage |
| thenAccept | 消费结果(有入参无返回) | thenAccept/thenAcceptAsync | 接收当前阶段结果,通过 Consumer 消费(如打印、存储),返回 Void 类型阶段 |
| thenRun | 执行动作(无入参无返回) | thenRun/thenRunAsync | 不接收结果,仅执行 Runnable 动作,返回 Void 类型阶段 |
异步变体: 每个方法都有 Async 后缀版本(默认线程池)和 Async(Executor) 版本(自定义线程池),用于指定异步执行方式
2. 双阶段任务并联操作(触发条件:两个阶段都完成)
这类方法用于等待当前阶段 + 另一个阶段都正常完成后执行操作:
| 方法前缀 | 核心逻辑 | 代表方法 | 作用 |
|---|---|---|---|
| thenCombine | 合并结果(有入参有返回) | thenCombine/thenCombineAsync | 接收两个阶段的结果,通过 BiFunction 合并为新结果 |
| thenAcceptBoth | 消费双结果(有入参无返回) | thenAcceptBoth/thenAcceptBothAsync | 接收两个阶段的结果,通过 BiConsumer 消费 |
| runAfterBoth | 执行动作(无入参无返回) | runAfterBoth/runAfterBothAsync | 两个阶段都完成后,执行 Runnable 动作 |
3. 双阶段任务选路操作(触发条件:任一阶段完成)
这类方法用于等待当前阶段 或 另一个阶段任一正常完成后执行操作:
| 方法前缀 | 核心逻辑 | 代表方法 | 作用 |
|---|---|---|---|
| applyToEither | 转换结果(有入参有返回) | applyToEither/applyToEitherAsync | 接收先完成阶段的结果,通过 Function 转换为新结果 |
| acceptEither | 消费结果(有入参无返回) | acceptEither/acceptEitherAsync | 接收先完成阶段的结果,通过 Consumer 消费 |
| runAfterEither | 执行动作(无入参无返回) | runAfterEither/runAfterEitherAsync | 任一阶段完成后,执行 Runnable 动作 |
4. 阶段嵌套(组合多个异步流程)
| 方法名 | 核心逻辑 | 作用 |
|---|---|---|
| thenCompose | 接收当前阶段结果,返回一个新的 CompletionStage(而非直接返回结果) | 解决 thenApply 嵌套 CompletionStage 导致的「嵌套异步」问题,实现异步流程的扁平化串联 |
| thenComposeAsync | thenCompose 的异步版本 | 同上,异步执行 Function(返回 CompletionStage 的函数) |
5. 异常处理与全量结果处理
这类方法不区分阶段是否正常完成, 专门处理异常或全量结果:
| 方法名 | 核心逻辑 | 作用 |
|---|---|---|
| handle | 接收「结果 + 异常」(任一为 null),通过 BiFunction 处理并返回新结果 | 统一处理正常/异常场景,可转换异常为正常结果 |
| whenComplete | 接收「结果 + 异常」,通过 BiConsumer 消费(不改变结果) | 仅监听完成状态(正常/异常),不修改原结果,适合日志、监控等场景 |
| exceptionally | 仅当阶段异常完成时执行,通过 Function 将异常转换为正常结果 | 类似 try-catch 中的 catch 块,专门处理异常 |
| exceptionallyCompose | 异常时返回新的 CompletionStage(异步版异常处理) | Java 12 新增,支持异常场景下的异步流程编排 |
6. 工具方法
| 方法名 | 核心逻辑 | 作用 |
|---|---|---|
| toCompletableFuture | 转换为 CompletableFuture |
实现不同 CompletionStage 实现类的互通,对接 CompletableFuture 的扩展功能 |
日常只需要按照数据处理流程来选择不同的异步任务处理, 而常规的函数方法任务也可以直接包装成异步任务让渡给系统调用:
1 | /** |
自行采用 ExecutionContextExecutor 来创建调度任务会打乱 Pekko 内部的线程调度引发线程安全的问题, 所以不推荐采用这样直接调度处理.
如果需采取这种任务调度方法, 需要单独创建线程池加上消息管理传递, 而非使用采用 Actor 任务线程池, 具体使用如下:
1 | /** |
默认 Actor 是基于单线程来运行逻辑, 通过 Executors + Patterns.pipe 可以让其拥有更强的线程任务调度调度能力.
这种就是异步流程就是官方推荐的 “结果管道化(CompletableFuture Pipe)”
比如计算比较复杂且耗时的任务可以利用 allOf/anyOf 批量提交给指定线程池运行, 之后通过 Patterns.pipe 投递消息结果即可.



