Pekko 启用 HTTP 和 WebSocket

Pekko 作为 Actor 底层框架, 其实内部自带了 Http/Https/WebSocket 扩展, 涵盖日常所需要的短连接和长连接场景.

这部分涉及到以下相关内容:

这里为什么推荐采用 WebSocket 做长连接服务?

  • 主流支持非常广泛(现代代浏览器、iOS、Android、物联网设备)

  • 抛弃以往的粘包/拆包、心跳检测、消息校验问题, 不用将心力放在数据流的验证上

  • CDN 厂家现在逐步有 WebSocket 的支持, 可以利用 CDN 的边缘节点就近接入客户端

当然也可以外置 WebSocket 库挂载服务, 只是采用 Actor 集成更加高效方便, 无缝贴合 Actor 处理

另外需要说明的 WebSocket 支持 text(文本流) 和 binary(二进制流) 传输模式, 这里强烈建议采用二进制流传输.

如果采用 text 文本流, 大部分数据库解析的时候都会做默认的 UTF8 编码转化成 String, 之后内部又会转换成 byte[] 交换数据.

高并发的情况下很容易直接生成大量的无用的内存垃圾(GC新生代), 直接将 CPU 效率浪费在编码/解码/转化字符串之中.

所以建议直接默认采用 binary(二进制) 来传输, 不要用到 text 的任何相关处理, 并且数据交换则是采用 protobuf 来传输.

HTTP 构建

因为 WebSocket 是基于 HTTP, 所以必须先学习怎么生成 HTTP 服务并分析, 才能为后面的 WebSocket 打好基础.

这里直接官方参考来引入即可, 可以先不引入 protobuf 做数据传输(我这边采用 Maven 做包管理):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<!-- 其他略 -->
<properties>
<pekko.version>1.1.5</pekko.version>
<pekko.bom.version>1.3.0</pekko.bom.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-http-bom_${scala.binary.version}</artifactId>
<version>${pekko.bom.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor-typed_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-http_${scala.binary.version}</artifactId>
</dependency>
</dependencies>

Pekko 内部已经将 HTTP 模块自动集成到 Actor 管理了, 所以我们可以直接简单构建服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.http.javadsl.ServerBinding;
import org.apache.pekko.http.javadsl.server.AllDirectives;
import org.apache.pekko.http.javadsl.server.Route;

import java.util.concurrent.CompletionStage;

/**
* 构建 PekkoHTTP 服务
*/
public class PekkoHttpExample extends AllDirectives {


/**
* 服务请求入口
*/
public static void main(String[] args) throws Exception {

// 创建初始化默认的 ActorSystem Boot
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "routes");

// 生成 HTTP 功能服务
final Http http = Http.get(system);

// 生成管理的内部类对象实例, 这个类必须继承 AllDirectives 类
PekkoHttpExample app = new PekkoHttpExample();

// 将 HTTP 服务和内部类做好映射, 并且设置监听的地址和端口
final CompletionStage<ServerBinding> binding =
http.newServerAt("localhost", 18880).bind(app.createRoute());

// 这里采用任意输入回车中断监听, 如果未做任何键盘操作则是保持运行
System.out.println("Server online at http://localhost:18880/\nPress RETURN to stop...");
int ignore = System.in.read();

// 退出之后回收这部分 HTTP 资源并且直接中断 Actor 系统服务
binding
.thenCompose(ServerBinding::unbind) // trigger unbinding from the port
.thenAccept(unbound -> system.terminate()); // and shutdown when done

}

/**
* 构建请求路径的路由
*/
private Route createRoute() {
// 添加 GET /hello 访问路径, 映射到内部函数调用输出
return concat(path("hello", () -> get(() -> complete("Hello.World"))));
}
}

这样处理之后可以直接访问 curl http://localhost:18880/hello 即可显示具体的网页内容.

另外 pekko 内部是支持 jackson 相关的 JSON 序列化的, 直接引入即可:

1
2
3
4
5
6
7
<!-- http-json 支持 -->
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-http-jackson_${scala.binary.version}</artifactId>
</dependency>
</dependencies>

这种方式引入之后就可以支持默认结构体转 JSON 输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// 具体代码参照: https://pekko.apache.org/docs/pekko-http/current/introduction.html

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.http.javadsl.ServerBinding;
import org.apache.pekko.http.javadsl.marshallers.jackson.Jackson;
import org.apache.pekko.http.javadsl.server.AllDirectives;
import org.apache.pekko.http.javadsl.server.Route;

import java.util.concurrent.CompletionStage;

/**
* PekkoHttp JSON 响应
*/
public class PekkoJsonExample extends AllDirectives {
/**
* 服务请求入口
*/
public static void main(String[] args) throws Exception {

// 创建初始化默认的 ActorSystem Boot
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "routes");

// 生成 HTTP 功能服务
final Http http = Http.get(system);

// 生成管理的内部类对象实例, 这个类必须继承 AllDirectives 类
PekkoJsonExample app = new PekkoJsonExample();

// 将 HTTP 服务和内部类做好映射, 并且设置监听的地址和端口
final CompletionStage<ServerBinding> binding =
http.newServerAt("localhost", 18880).bind(app.createRoute());

// 这里采用任意输入回车中断监听, 如果未做任何键盘操作则是保持运行
System.out.println("Server online at http://localhost:18880/\nPress RETURN to stop...");
int ignore = System.in.read();

// 退出之后回收这部分 HTTP 资源并且直接中断 Actor 系统服务
binding
.thenCompose(ServerBinding::unbind) // trigger unbinding from the port
.thenAccept(unbound -> system.terminate()); // and shutdown when done

}

/**
* 构建请求路径的路由
*/
private Route createRoute() {
// 添加 GET /hello 访问路径, 映射到内部函数调用输出
return concat(
get(() -> pathPrefix("hello", () -> complete("Hello.World"))),
get(() -> pathPrefix("item", () -> {
Item item = new Item("MeteorCat", 2026L);
return completeOK(item, Jackson.marshaller());
}))
);
}

/**
* 采用 record 结构, 其实和 static class 差不多
*
* @param name
* @param id
*/
private record Item(String name, long id) {

@JsonCreator
private Item(@JsonProperty("name") String name, @JsonProperty("id") long id) {
this.name = name;
this.id = id;
}
}
}

看官方文档更详细所以就不太讲述太多细节, 主要用来做 JSON-API 网关的时候可能用得比较多, 后续需要用到的时候学习下就行了.

需要注意的是 pekko-http 不会为默认每个会话创建 actor 对象, 所以如果长链接需要再动态构建 actor 做消息队列监听

当然其实还有 Http Client 相关 API, 这部分基本用于做远程数据转发或者传输, 使用也是结合 Actor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.http.javadsl.model.*;
import org.apache.pekko.http.javadsl.server.examples.petstore.Pet;
import org.apache.pekko.stream.SystemMaterializer;

import java.util.concurrent.CompletionStage;

public class ClientSingleRequestExample {

public static void main(String[] args) {
final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "SingleRequest");

// Actor 如果内部需要做 HTTP 请求转发可以直接这样使用
// 这里 CompletionStage 是内部的异步运行时, 会将请求数据结果的 HttpResponse 做回调
final CompletionStage<HttpResponse> responseFuture =
Http.get(system).singleRequest(HttpRequest.create("https://pekko.apache.org"));
}

/**
* 构建请求路径的路由, 如果需要内部请求动态返回可以参考下面处理 /bing 的路由处理
*/
private Route createRoute(ActorSystem<Void> system) {
// 添加 GET /hello 访问路径, 映射到内部函数调用输出
return concat(
get(() -> pathPrefix("hello", () -> complete("Hello.World"))),
get(() -> pathPrefix("item", () -> {
Item item = new Item("MeteorCat", 2026L);
return completeOK(item, Jackson.marshaller());
})),
get(() -> pathPrefix("bing", () -> {
// 利用异步包装返回对应请求
// 动态加载远程请求网址信息信息
final CompletionStage<HttpResponse> responseFuture = Http.get(system).singleRequest(HttpRequest.create("https://bing.com"));
return onComplete(responseFuture, resp -> resp.fold(
ex -> complete(StatusCodes.BAD_GATEWAY, "upstream error"), // 如果检测到异常直接返回 BAD_GATEWAY 错误
this::complete // 没有异常就将对应数据返回给客户端
));
}))
);
}
}

至此基本上就已经大概清楚 Pekko 的 HTTP 驱动方式, 后面就是更加高级的 WebSocket 数据交互.

WebSocket 构建

官方文档: https://pekko.apache.org/docs/pekko-http/current/server-side/websocket-support.html

WebSocket 基于 Http, 只需要声明握手升级(Upgrade Handshake)就可以直接使用, 直接声明路由即可:

1
2
3
// 只要返回 handleWebSocketMessages, Pekko-HTTP 会自动回 101 Switching Protocols
Route wsRoute = path("ws", () -> handleWebSocketMessages(flow));
// WebSocket 服务构建方式, flow 其实是 Flow, 后面会说明

其中内部支持以下消息类型:

帧类型 官方类 用途
文本 TextMessage 浏览器默认,UTF-8 解码成 String
二进制 BinaryMessage 推荐,直接 ByteString
控制 PingMessage / PongMessage / CloseMessage 框架自动回 pong,一般不用管

这里提到了 Flow 就必须了解数据请求的 背压(), 也就是 Source/Sink/Flow 是什么? 同时还涉及到数据的 Stream 传输?

Stream 就是构成 Source/Sink/Flow 的基础单元, 把用户发送过来的数据视为 数据流(Data Stream), 这三者的作用如下:

构件 类比 背压角色 WebSocket 里具体表现
Source 水龙头 生产端 把业务对象变成 Message 推向客户端
Sink 下水道 消费端 把客户端发来的 Message 吸走并处理
Flow 过滤器 中转站 收到 Message → 业务逻辑 → 发出 Message(echo、广播、加解密)

背压(Backpressure) 则是内部为了保护消息正确投递的策略:

  • 当客户端消费速度 < 服务端生产速度时(比如客户端突然网络波动卡顿), 就需要让对应生产数据端暂时挂起等待, 避免消息队列内存爆满

  • 如果客户端消费数据流内部的消息过慢, 发送端将会接收到 onBackpressure 代表需要防止内存爆满来主动丢弃老消息填充的策略

WebSocket 里, Source 是发帧, Sink 是收帧, Flow 是改帧; 三者拼成管道, 背压自动从下游往上游拉

比较简单的日常形容, 可以把这个数据交换过程形容为 “自来水系统”:

  1. Source(水龙头)
    服务器想给客户端送水(数据),拧开水龙头就能哗哗流出 Message 帧。

  2. Flow(净水器)
    水流经过滤芯(业务逻辑),比如 echo 就把水再染个色,然后继续往下送。

  3. Sink(下水口)
    客户端那边有个下水口,水喝得有多快,就往下漏多快;喝得慢,下水口立刻“喊停”。

  4. 背压(阀门)
    下水口一喊停,水龙头里的阀门自动关小,水不再涌出,水池(内存)永远满不了
    如果水池太小,可以加装“溢流管”——.buffer(50, dropHead),老水直接倒掉,新水继续流。

  5. 结果
    无论客户端网络卡成什么样,服务器这边既不会淹水(OOM),也不会爆管(阻塞线程),整个厨房干干净净。

现在的 WebSocket 服务不再是传统 请求-响应 单向模型, 而是需要以 数据流(Stream) 方式来理解.

那么需要做的就是构建 Sink 和 Source, 最后拼合成 Flow 对客户端数据加工处理返回:

  • Source: 服务端 → 客户端, 服务端主动推送消息用到

  • Sink: 客户端 → 服务端, 客户端投递消息时候会用到

  • Flow: 将 Sink 和 Source 两者拼合在一起从而实现类似于数据传输管道

最后的代码逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 1. 打开水龙头: 服务端 → 客户端
Source<Message, NotUsed> faucet = Source
.queue(50, OverflowStrategy.dropHead()) // 业务线程往里倒消息, 如果触发背压信号就会默认抛弃50条最老数据
.mapMaterializedValue(queueRef -> { /* 把 queueRef 存起来别处用 */ });

// 2. 再接下水口: 客户端 → 服务端
Sink<Message, NotUsed> drain = Sink
.foreach(msg -> handle(msg)); // 客户端来的帧在这里处理

// 3. 中间装净水器
Flow<Message, Message, NotUsed> filter =
Flow.of(Message.class)
.filter(Message::isBinary)
.map(this::decrypt) // 你的业务
.map(this::encrypt);

// 4. 把三口拼成一根水管
Flow<Message, Message, NotUsed> pipeline =
Flow.fromSinkAndSource(drain, faucet).via(filter);

// 最后就是设置 WebSocket 的消息路由功能
// return path("ws", () -> handleWebSocketMessages(pipeline));

这里以 echo 数据请求为例, 来做个 WebSocket 回显服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.http.javadsl.ServerBinding;
import org.apache.pekko.http.javadsl.model.ws.BinaryMessage;
import org.apache.pekko.http.javadsl.model.ws.Message;
import org.apache.pekko.http.javadsl.server.AllDirectives;
import org.apache.pekko.http.javadsl.server.Route;
import org.apache.pekko.stream.OverflowStrategy;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
import org.apache.pekko.util.ByteString;

import java.util.concurrent.CompletionStage;

/**
* Pekko WebSocket 服务挂载
*/
public class PekkoWebSocketExample extends AllDirectives {

/**
* Actor 系统
*/
final ActorSystem<Void> system;

/**
* 服务端推送客户端消息队列
*/
private SourceQueueWithComplete<Message> queueRef;

/**
* 构造方法
*/
public PekkoWebSocketExample(ActorSystem<Void> system) {
this.system = system;
}

/**
* 服务请求入口
*/
public static void main(String[] args) throws Exception {

// 创建初始化默认的 ActorSystem Boot
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "routes");

// 生成 HTTP 功能服务
final Http http = Http.get(system);

// 生成管理的内部类对象实例, 这个类必须继承 AllDirectives 类
PekkoWebSocketExample app = new PekkoWebSocketExample(system);

// 将 HTTP 服务和内部类做好映射, 并且设置监听的地址和端口
final CompletionStage<ServerBinding> binding =
http.newServerAt("localhost", 18880).bind(app.createRoute());

// 这里采用任意输入回车中断监听, 如果未做任何键盘操作则是保持运行
System.out.println("Server online at http://localhost:18880/\nPress RETURN to stop...");
int ignore = System.in.read();

// 退出之后回收这部分 HTTP 资源并且直接中断 Actor 系统服务
binding
.thenCompose(ServerBinding::unbind) // trigger unbinding from the port
.thenAccept(unbound -> system.terminate()); // and shutdown when done
}

/**
* 构建请求路径的路由
*/
private Route createRoute() {
// 添加 GET /hello 访问路径, 映射到内部函数调用输出
return concat(
get(() -> pathPrefix("hello", () -> complete("Hello.World"))),
get(() -> pathPrefix("echo", () -> handleWebSocketMessages(createEchoFlow())))
);
}

/**
* 创建 echo(回显) WebSocket 消息管道
*/
private Flow<Message, Message, NotUsed> createEchoFlow() {
// 1. 水龙头:服务端→客户端
Source<Message, NotUsed> faucet =
Source.<Message>queue(50, OverflowStrategy.dropHead())
.mapMaterializedValue(queueRef -> {
// 把 queueRef 存起来,别处可以主动推消息
this.queueRef = queueRef;
return NotUsed.getInstance();
});


// 2. 下水口:客户端→服务端
Sink<Message, NotUsed> drain =
Flow.<Message>create()
.filter(BinaryMessage.class::isInstance) // 验证并且拦截二进制消息
.map(BinaryMessage.class::cast) // 转化成二进制消息内容
.flatMapConcat(BinaryMessage::getStreamedData) // Source<ByteString, _>
.to(Sink.foreach(chunk -> queueRef.offer(BinaryMessage.create(chunk)))); // 消费消息队列

// 3. 拼成双向管
return Flow.fromSinkAndSource(drain, faucet);
}
}

这里可以采用更加细致化的处理, 比如计算推送给客户端的掉帧计数器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 更换成大小帧判断
// 这里要定义个 droppedFrames 原子量, 用于当作丢帧计数器
// 在功能类定义 private final AtomicLong droppedFrames = new AtomicLong(); 即可
Sink<Message, NotUsed> drain =
Flow.<Message>create()
.filter(BinaryMessage.class::isInstance) // 验证并且拦截二进制消息
.map(BinaryMessage.class::cast) // 转化成二进制消息内容
.flatMapConcat(BinaryMessage::getStreamedData)
.map(BinaryMessage::create) // 打包成二进制消息结构
.to(Sink.foreach(chunk -> queueRef.offer(chunk).whenComplete((r, e) -> {
// 计算消息抛弃的帧数, 超过一定数值就可以做报警
if (r != QueueOfferResult.enqueued()) {
long drops = droppedFrames.incrementAndGet();
// 每 1000 次写入日志或者报警
if (drops % 1000 == 0) {
System.err.printf("WebSocket dropped {%d} frames%n", drops);
}
}
})
));

不过一般数据都是经可能规避大数据包, 都会严格控制包体大小, 最大消息结构最好控制 65536 字节(64K)左右, pekko 配置可以锁死:

1
2
3
4
5
6
7
8
9
10
11
12
pekko.http.websocket {
max-message-size = 65536 # 字节,超过立即断开连接

# 另外还有心跳设置
# 需要注意, 默认 Pekko 每 30s 自动发 Ping(空 payload), 客户端必须回 Pong(注意: 是必须), 否则主动断开
# 而如果采用 pong 模式则是由服务端主动发 Pong, 客户端收到后无需回复
periodic-keep-alive-mode = pong # 让服务端主动发起 Pong
periodic-keep-alive-max-idle = 30s # 每次间隔 30s

# 如果打算自己做应用层心跳, 则是按照以下方式关闭
# pekko.http.server.websocket.periodic-keep-alive-max-idle = infinite
}

大数据包传输在高并发的过程当中是必须要避免的, 宁愿将业务大数据拆分成多个小包多次传输也不要构建成一个大包直接传输.

单帧的数据过大会导致 TCP 消息挤占缓冲区, 从而影响消息队列入列导致消息被阻塞, 所以一定要严格把控消息单帧大小.

其实构建 Flow 的过程可以视为设计消息管道, 而运行之后就会当作流水线处理客户端和服务端的数据交互

这里请求 ws://localhost:18880/echo 并且发送二进制消息就可以测试, 具体可以采用客户端来做

流处理

官方文档: https://pekko.apache.org/docs/pekko/current/stream/stream-io.html#working-with-streaming-io

上面虽然简略讲述 Source(输出)/Sink(输入)/Flow(流处理) 的使用方法, 但底层很多东西都没说明, 需要边看官方文档来处理.

上面文档官方提供简单的 TCP 服务器数据流以 Framing.delimiter(后面会详细说明) 分隔符切分消息的交换方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
connections.runForeach(
connection -> {
System.out.println("New connection from: " + connection.remoteAddress());

final Flow<ByteString, ByteString, NotUsed> echo =
Flow.of(ByteString.class)
.via(
Framing.delimiter(
ByteString.fromString("\n"), 256, FramingTruncation.DISALLOW))
.map(ByteString::utf8String)
.map(s -> s + "!!!\n")
.map(ByteString::fromString);

connection.handleWith(echo, system);
},
system);

这里的数据流运行方式如下图所示:

Stream

可以看到关键的 Flow 就是将数据设置成具体的 “消息管道” 来调整输入和输出; 这种模式不止应用于网络流, 所有文件流/二进制流等都支持.

所以记住: 如果要构建消息流则必须要有 输入源(Sink)/输出源(Source)/管道处理(Flow) 这三者

这里需要学习下 Source/Sink/Flow 具体的实现思想, 这三者其实就是 pekko 底层的异步驱动方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
- Source<Out>: 代表仅有生成者模式, 即只输出数据而不去接收数据
- Sink<In>: 代码仅有消费者模式, 即只接收数据而不做生成数据需求
- Flow<In,Out>: 中间的传输层, 负责协调资源的输入与输出

目前已经知道前面的所需参数, 但是看到声明出现附加参数

- Source<Message, NotUsed>
- Sink<Message, NotUsed>
- Flow<Message, Message, NotUsed>

这里面的 NotUsed 是声明什么的?

NotUsed 其实是 Void 指代, 也就是代表消息管道生成之后并不需要返回结果
有的时候需要在输入输出的过程之中推送/获取结果, 就需要这个参数

这里的 NotUsed 参数可以用以下做替代
NotUsed → 我不关心任何附加结果: 不需要做返回数据处理
CompletionStage<T> → 异步计算结果: 获取异步处理结果
SourceQueue<T> → 可以主动推送数据的队列: 声明需要外部等待执行返回数据
ActorRef → 背后对应的Actor引用: 将数据结果推送到指定 Actor 对象

这里构建个 Slink 响应的消息通道, 用来要求管道执行命令之后返回结果

第三个参数具体应用场景:

类型 准确描述 获取方式示例 典型使用场景
NotUsed 不关心任何附加结果,仅关注流处理过程本身 Keep.left() / Keep.right() / Keep.none() 日志记录、数据转换、简单ETL
CompletionStage 异步计算最终结果的句柄,可获取流的最终状态 Sink.last() / Sink.fold() / Sink.seq() 聚合统计、最终值计算、结果汇总
SourceQueue 可外部主动推送数据的队列接口,提供offer/complete方法 Source.queue() 用户输入、事件驱动、实时数据注入
ActorRef 将流数据路由到指定Actor的引用,实现与Actor系统集成 Sink.actorRef() 命令模式、消息分发、Actor生态整合

这里实现利用 Source/Sink 来计算数值的的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

/**
* PekkoWebSocket 数据帧实现
*/
public class PekkoValuesExample extends AllDirectives {

/**
* 服务请求入口
*/
public static void main(String[] args) throws Exception {
// 创建初始化默认的 ActorSystem Boot
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "routes");

// 异步流计算器

// 1. 声明数据流异步调度运行时, Materializer 可以复用, 如果是频繁需要异步调用最好实例化句柄持有
Materializer materializer = Materializer.createMaterializer(system);

// 2. 创建带有 SourceQueueWithComplete<Integer> 结果的生成者管道
Source<Integer, SourceQueueWithComplete<Integer>> source = Source.queue(100, OverflowStrategy.backpressure());

// 3. 创建带有 CompletionStage<Integer> 结果的消费者管道
Sink<Integer, CompletionStage<Integer>> sink = Sink.fold(0, Integer::sum);

// 4. 通过 materializer 运行时将构建成异步程序执行, 最后返回队列和结果对象
Pair<SourceQueueWithComplete<Integer>, CompletionStage<Integer>> pair = source
.toMat(sink, Keep.both())
.run(materializer);

SourceQueueWithComplete<Integer> queue = pair.first(); // 数据队列对象
CompletionStage<Integer> future = pair.second(); // 数据异步结果对象

// 5. 将数据投递进去执行
RandomGenerator randomGenerator = RandomGenerator.getDefault();
for (int i = 1; i <= 10; i++) {
int value = randomGenerator.nextInt(10);
System.out.println("Value = " + value);
queue.offer(value);
}

// 6. 投递完成之后通知完成并准备执行
queue.complete();

// 7. 最后异步获取到最终执行结果
future.thenAccept(sum -> {
System.out.println("Total = " + sum);
});

// 这里阻塞等待到最后获取结果
future.toCompletableFuture().join();
// 也可以采用超时, 比如下面的超时5s
// future.toCompletableFuture().get(5, TimeUnit.SECONDS);
}
}

这里就是实现底层异步任务的包装, 如果想灵活使用 pekko 的 actor 功能, 学习包装异步任务是必不可少的任务.

不仅仅是 Stream 数据流需要用到异步任务, 其他很多需要返回结果的任务都是必须要的(比如支付扣款必须要知道结果)

游戏服务端购买都是直接等待异步更新, 对于数据一致性不是那么高, 所以不用实时等待, 也就是购买道具只管下发数值变动即可而不会阻塞等待结果

涉及到需要数据强一致性就需要对异步包装有一定了解, 但是必须要做好取舍: 当包装成异步任务的时候, 执行过程是阻塞等待结果的

高级构建

参考网站: https://pekko.apache.org/japi/pekko/1.0/org/apache/pekko/stream/javadsl/Framing.html

上面已经简单实现 webSocket-echo, 但消息其实不是那么简单, 一般采用 消息长度(int32)+消息ID(int32)+消息主体(byte[]) 格式.

这种就是标准的 header(int64) + body(byte[]) 方式, 还有采用特殊分割符切分的方法

这里的 int32 首位需要拆出来作为消息 id 识别, 次位的 int32 则是用于从流之中提取指定长度内容构建.

现有流处理其实比较抽象且不好理解, 目前很多都会采用专门的消息列表附加合并成然后再执行分帧, 涉及到以下需要思考:

  • 必须在功能类维护 ByteString.emptyByteString() 空消息

  • 读取到客户端消息的时候是将其合并(concat)到 ByteString

  • 合并完成之后再做数据分帧(frame), 跳过不合法的帧来重新切分投递到对应 Actor

  • 这种方式其实也被称为 ‘消息拆帧’, 用来将消息构建成队列化处理

这里其实要用到 pekko 的 Framing.lengthField 来做预定义数据分帧处理, 这里可以看下官方的方法信息:

  • 这里其实内部已经提供两个预定义分包函数:

    • Framing.delimiter: 提取内部的分割符切分成数据帧
    • Framing.lengthField: 提取内部的长度字段获取数据长度构成数据帧
    • Framing.simpleFramingProtocol: 简约版本的 Framing.lengthField 处理
分包方式 适用协议 典型应用 优点 缺点
lengthField 二进制协议 RPC、游戏协议 精准分包 需要预知帧结构
delimiter 文本协议 HTTP、日志 简单直观 分隔符冲突风险

推荐采用 lengthField, 因为 delimiter 可能会出现数据也带有分隔符直接影响分帧

这里提供下具体函数定义:

1
2
3
4
5
6
7
public static Flow<ByteString, ByteString, NotUsed> lengthField(
int fieldLength,
int fieldOffset,
int maximumFrameLength,
java.nio.ByteOrder byteOrder) {
// 内部实现
}
参数 含义 关键说明
fieldLength 长度字段的字节数 告诉解码器用多少字节来表示后续数据的长度(常用值:1, 2, 4, 8)
fieldOffset 长度字段的偏移量 从帧开始到长度字段起始位置的字节数,允许帧头有其他字段
maximumFrameLength 最大帧长度 包含头部在内的总长度(fieldOffset + fieldLength + 数据长度),超过此值流会失败
byteOrder 字节序 解码长度字段时使用的大端或小端字节序

假设一个典型帧格式:

1
2
3
4
5
[帧头其他字段]    [长度字段]      [实际数据]
↑ ↑ ↑
| | |
fieldOffset fieldLength 数据长度(由长度字段决定)
[起始偏移] [字段的长度值] [数据结构体]

如果是应用到游戏 Protobuf, 建议 maximumFrameLength 按照以下场景来做分配:

游戏类型 典型消息大小 推荐 maximumFrameLength
卡牌/回合制 0.5-2KB 16KB - 64KB
MOBA/RTS 1-10KB 64KB - 256KB
MMORPG 2-50KB 128KB - 1MB
开放世界 10-200KB 512KB - 2MB
批量同步 可达数MB 4MB - 16MB

如果想平衡点处理, 可以采用 256KB = 256*1024 来作为最小单帧长度, 可以覆盖大部分场景

分帧原理就是提取出指定位置的数值当作消息包长度, 然后从数据流当中自动分片出来, 可以通过 PekkoStream 结合构建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.http.javadsl.ServerBinding;
import org.apache.pekko.http.javadsl.model.ws.BinaryMessage;
import org.apache.pekko.http.javadsl.model.ws.Message;
import org.apache.pekko.http.javadsl.server.AllDirectives;
import org.apache.pekko.http.javadsl.server.Route;
import org.apache.pekko.stream.*;
import org.apache.pekko.stream.javadsl.*;
import org.apache.pekko.util.ByteString;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.Objects;
import java.util.concurrent.CompletionStage;

/**
* PekkoWebSocket 数据帧实现
*/
public class PekkoFramedExample extends AllDirectives {
/**
* Actor 系统
*/
final ActorSystem<Void> system;


/**
* 消息长度字段的字节数(int32 = 4字节)
*/
private static final int FIELD_LENGTH = Integer.BYTES;

/**
* 长度字段的偏移量(0,因为第一个字段就是长度)
*/
private static final int FIELD_OFFSET = 0;


/**
* 最大帧长度(256KB,覆盖大部分游戏/业务场景)
*/
private static final int MAX_FRAME_LENGTH = 256 * 1024;


/**
* 字节序(网络传输通常用大端)
*/
private static final ByteOrder BYTE_ORDER = ByteOrder.BIG_ENDIAN;


/**
* 消息ID字段的偏移量(长度字段之后,4字节)
*/
private static final int MSG_ID_OFFSET = FIELD_OFFSET + FIELD_LENGTH;


/**
* 消息主体的起始偏移量(长度+ID之后,8字节)
*/
private static final int MSG_BODY_OFFSET = MSG_ID_OFFSET + 4;


/**
* 构造方法
*/
public PekkoFramedExample(ActorSystem<Void> system) {
this.system = system;
}

/**
* 服务请求入口
*/
public static void main(String[] args) throws Exception {
// 创建初始化默认的 ActorSystem Boot
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "routes");

// 生成测试数据转 Base64, 方便部分 WebSocket 客户端以这部分数据做传输
String message = "Hello.World";
byte[] bytes = message.getBytes(StandardCharsets.UTF_8);
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + bytes.length);
buffer.putInt(Integer.BYTES + bytes.length); // 注意这里的内容长度是 Body 不含有 Header
buffer.putInt(10001);
buffer.put(bytes);
System.out.printf("Test Bytes: %s%n", Arrays.toString(buffer.array()));
String base64 = Base64.getEncoder().encodeToString(buffer.array());
System.out.printf("Test Base64: %s%n", base64);


// 生成 HTTP 功能服务
final Http http = Http.get(system);

// 生成管理的内部类对象实例, 这个类必须继承 AllDirectives 类
PekkoFramedExample app = new PekkoFramedExample(system);

// 将 HTTP 服务和内部类做好映射, 并且设置监听的地址和端口
final CompletionStage<ServerBinding> binding =
http.newServerAt("localhost", 18880).bind(app.createRoute());

// 这里采用任意输入回车中断监听, 如果未做任何键盘操作则是保持运行
System.out.println("Server online at http://localhost:18880/\nPress RETURN to stop...");
int ignore = System.in.read();

// 退出之后回收这部分 HTTP 资源并且直接中断 Actor 系统服务
binding
.thenCompose(ServerBinding::unbind) // trigger unbinding from the port
.thenAccept(unbound -> system.terminate()); // and shutdown when done
}

/**
* 构建请求路径的路由
*/
private Route createRoute() {
// 添加 GET /hello 访问路径, 映射到内部函数调用输出
return concat(get(() -> pathPrefix("frame", () -> handleWebSocketMessages(createFrameFlow().mapMaterializedValue(queueRef -> {
// 提取请求的消息队列保存到内部, 最好在内部维护好具体的请求队列列表
// queues.put(sessionId(), queue);
return queueRef;
})))));
}

/**
* 创建消息分帧管道
*/
private Flow<Message, Message, ?> createFrameFlow() {

// 1. 构建lengthField拆帧流的数据解码器
Flow<ByteString, ByteString, ?> frameDecoder = Framing.lengthField(
FIELD_LENGTH,
FIELD_OFFSET,
MAX_FRAME_LENGTH,
BYTE_ORDER
);


// 2. 完整流处理链路:缓存合并 → 拆帧 → 错误处理 → 解析 → 投递Actor
return Flow.<Message>create()
// 异步处理
.map(msg -> {
if (msg instanceof BinaryMessage bm) {
return bm.getStrictData();
} else {
return ByteString.emptyByteString();
}
})
// 步骤1:过滤空数据
.filter(bs -> !bs.isEmpty())
// 步骤2:执行精准拆帧
.via(frameDecoder)
// 步骤3:错误处理, 多个错误可以这样拦截处理
.recover(Throwable.class, () -> {
system.log().error("[拆帧异常]");
return ByteString.emptyByteString();// 异常直接返回空数据
})
// 步骤4:过滤空字节串(异常帧的返回值)
.filter(byteString -> !byteString.isEmpty())
// 步骤5:解析帧内容(提取ID、主体)
.map(this::parseFrame)
// 步骤6:过滤解析失败的帧
.filter(Objects::nonNull)
// 步骤7:控制台打印解析结果(替代Actor投递)
.map(body -> {
// 打印核心信息到控制台
System.out.println("===== 解析到完整消息帧 ==================================");
System.out.println("消息ID:" + body.id());
System.out.println("实际主体长度:" + body.message.size() + " 字节");
System.out.println("消息主体内容:" + Arrays.toString(body.message.toArray()));
System.out.println("======================================================\n");
return body; // 保留返回值,不影响后续流处理
})
// 步骤8:背压超时保护(5秒)
.backpressureTimeout(Duration.ofSeconds(5))
// 步骤9:超时异常处理
.recover(java.util.concurrent.TimeoutException.class, () -> {
system.log().warn("[流超时] 消息帧处理超时");
return null;
})
// 步骤10:过滤超时的空值
.filter(Objects::nonNull)
// 步骤11:核心 - 不向客户端返回响应,返回空流
// 大部分情况都不是 请求-响应 模型, 而是动态消息推送过来
.flatMapConcat(frame -> Source.empty()); // 空Source,无消息返回客户端
}


/**
* 解析单帧数据:按「长度int32 + IDint32 + 主体」格式提取核心信息
*/
private MessageFrame parseFrame(ByteString frame) {
try {
// 校验帧最小长度(至少包含长度+ID字段,共8字节)
if (frame.size() < MSG_BODY_OFFSET) {
system.log().warn("[帧解析] 无效帧:长度{} < 最小8字节", frame.size());
return null;
}

// 开始解析数据
ByteBuffer buffer = frame
.asByteBuffer()
.order(BYTE_ORDER);

// 1. 提取消息长度(int32,大端字节序)
int msgLength = buffer.getInt(FIELD_OFFSET);

// 校验长度合法性:声明的长度 = 实际帧长度 - 4(长度字段本身)
if (msgLength + FIELD_LENGTH != frame.size()) {
system.log().warn("[帧解析] 长度不匹配:声明{} vs 实际{}",
msgLength, frame.size() - FIELD_LENGTH);
return null;
}

// 2. 提取消息ID(int32,大端字节序)
int msgId = buffer.getInt(MSG_ID_OFFSET);

// 3. 提取消息主体(从8字节偏移开始,长度为msgLength-4)
ByteBuffer msgBody = buffer.slice(MSG_BODY_OFFSET, msgLength - 4);

system.log().info("[帧解析] 成功 - ID:{},总长度:{},主体长度:{}",
msgId, frame.size(), msgBody.remaining());
return new MessageFrame(msgId, ByteString.fromByteBuffer(msgBody));

} catch (Exception e) {
system.log().error("[帧解析] 失败", e);
return null;
}
}

/**
* 消息帧, 也就是客户端提交的数据流单帧
*
* @param id
* @param message
*/
public record MessageFrame(
int id,
ByteString message
) {
}
}

如果更加细致的帧队列维护就需要自己去编写 remainingBytes 不断附加和剔除来手动分帧, 如果不是业务不要操作手动分帧.

手动分帧自己维护 remaining 数据队列见过以前老游戏部分代码用过, 爆内存基本上是家常便饭, 需要靠堆设备硬抗处理.

上面采用 Framing.lengthField 只是做说明, 实际在日常当中如果协议没有特殊要求建议采用 Framing.simpleFramingProtocol:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 创建消息分帧管道
*/
private Flow<Message, Message, ?> createFrameFlow() {
// 采用 pekko 内部封装更加易用的 Framing.simpleFramingProtocol
// 内部其实就是上面提出的 Framing.lengthField(4,0,MAX,ORDER) 简易封装
// 好处就是零手动缓存、低风险、易维护
// 构建双向处理的简易分帧协议
// 设置了最大帧长度(超过抛异常,防OOM)
BidiFlow<ByteString, ByteString, ByteString, ByteString, NotUsed> bidiFraming =
Framing.simpleFramingProtocol(MAX_FRAME_LENGTH);

// BidiFlow 转为单向解码 Flow(仅保留入站解码,忽略出站编码)
Flow<ByteString, ByteString, ?> frameDecoder = bidiFraming.join(Flow.create())
// 拦截分帧异常(超大载荷/非法长度)
.recover(Throwable.class, () -> {
system.log().error("[分帧异常] 载荷超限/格式错误");
return ByteString.emptyByteString();
});

// 之后就是使用 frameDecoder 照常解码
// 其他略
}

这种方式就是比较合理的构建方式, 不过目前都是接收流数据数据, 还没有到处理数据阶段, 这部分篇幅太长需要额外区分开来;
可以说 pekko 就是很值得学习和使用的工具库, 掌握其使用可以构建出很高效的网络 Actor 模型.