Java Pekko Pekko 启用 HTTP 和 WebSocket MeteorCat 2026-01-11 2026-01-11 Pekko 作为 Actor 底层框架, 其实内部自带了 Http/Https/WebSocket 扩展, 涵盖日常所需要的短连接和长连接场景.
这部分涉及到以下相关内容:
这里为什么推荐采用 WebSocket 做长连接服务?
主流支持非常广泛(现代代浏览器、iOS、Android、物联网设备)
抛弃以往的粘包/拆包、心跳检测、消息校验问题, 不用将心力放在数据流的验证上
CDN 厂家现在逐步有 WebSocket 的支持, 可以利用 CDN 的边缘节点就近接入客户端
当然也可以外置 WebSocket 库挂载服务, 只是采用 Actor 集成更加高效方便, 无缝贴合 Actor 处理
另外需要说明的 WebSocket 支持 text(文本流) 和 binary(二进制流) 传输模式, 这里强烈建议采用二进制流传输.
如果采用 text 文本流, 大部分数据库解析的时候都会做默认的 UTF8 编码转化成 String, 之后内部又会转换成 byte[] 交换数据.
高并发的情况下很容易直接生成大量的无用的内存垃圾(GC新生代), 直接将 CPU 效率浪费在编码/解码/转化字符串之中.
所以建议直接默认采用 binary(二进制) 来传输, 不要用到 text 的任何相关处理, 并且数据交换则是采用 protobuf 来传输.
HTTP 构建
因为 WebSocket 是基于 HTTP, 所以必须先学习怎么生成 HTTP 服务并分析, 才能为后面的 WebSocket 打好基础.
这里直接官方参考来引入即可, 可以先不引入 protobuf 做数据传输(我这边采用 Maven 做包管理):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 <properties > <pekko.version > 1.1.5</pekko.version > <pekko.bom.version > 1.3.0</pekko.bom.version > <scala.binary.version > 2.13</scala.binary.version > </properties > <dependencyManagement > <dependencies > <dependency > <groupId > org.apache.pekko</groupId > <artifactId > pekko-http-bom_${scala.binary.version}</artifactId > <version > ${pekko.bom.version}</version > <type > pom</type > <scope > import</scope > </dependency > </dependencies > </dependencyManagement > <dependencies > <dependency > <groupId > org.apache.pekko</groupId > <artifactId > pekko-actor-typed_${scala.binary.version}</artifactId > <version > ${pekko.version}</version > </dependency > <dependency > <groupId > org.apache.pekko</groupId > <artifactId > pekko-stream_${scala.binary.version}</artifactId > <version > ${pekko.version}</version > </dependency > <dependency > <groupId > org.apache.pekko</groupId > <artifactId > pekko-http_${scala.binary.version}</artifactId > </dependency > </dependencies >
Pekko 内部已经将 HTTP 模块自动集成到 Actor 管理了, 所以我们可以直接简单构建服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import org.apache.pekko.actor.typed.ActorSystem;import org.apache.pekko.actor.typed.javadsl.Behaviors;import org.apache.pekko.http.javadsl.Http;import org.apache.pekko.http.javadsl.ServerBinding;import org.apache.pekko.http.javadsl.server.AllDirectives;import org.apache.pekko.http.javadsl.server.Route;import java.util.concurrent.CompletionStage;public class PekkoHttpExample extends AllDirectives { public static void main (String[] args) throws Exception { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "routes" ); final Http http = Http.get(system); PekkoHttpExample app = new PekkoHttpExample (); final CompletionStage<ServerBinding> binding = http.newServerAt("localhost" , 18880 ).bind(app.createRoute()); System.out.println("Server online at http://localhost:18880/\nPress RETURN to stop..." ); int ignore = System.in.read(); binding .thenCompose(ServerBinding::unbind) .thenAccept(unbound -> system.terminate()); } private Route createRoute () { return concat(path("hello" , () -> get(() -> complete("Hello.World" )))); } }
这样处理之后可以直接访问 curl http://localhost:18880/hello 即可显示具体的网页内容.
另外 pekko 内部是支持 jackson 相关的 JSON 序列化的, 直接引入即可:
1 2 3 4 5 6 7 <dependencies > <dependency > <groupId > org.apache.pekko</groupId > <artifactId > pekko-http-jackson_${scala.binary.version}</artifactId > </dependency > </dependencies >
这种方式引入之后就可以支持默认结构体转 JSON 输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import com.fasterxml.jackson.annotation.JsonCreator;import com.fasterxml.jackson.annotation.JsonProperty;import org.apache.pekko.actor.typed.ActorSystem;import org.apache.pekko.actor.typed.javadsl.Behaviors;import org.apache.pekko.http.javadsl.Http;import org.apache.pekko.http.javadsl.ServerBinding;import org.apache.pekko.http.javadsl.marshallers.jackson.Jackson;import org.apache.pekko.http.javadsl.server.AllDirectives;import org.apache.pekko.http.javadsl.server.Route;import java.util.concurrent.CompletionStage;public class PekkoJsonExample extends AllDirectives { public static void main (String[] args) throws Exception { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "routes" ); final Http http = Http.get(system); PekkoJsonExample app = new PekkoJsonExample (); final CompletionStage<ServerBinding> binding = http.newServerAt("localhost" , 18880 ).bind(app.createRoute()); System.out.println("Server online at http://localhost:18880/\nPress RETURN to stop..." ); int ignore = System.in.read(); binding .thenCompose(ServerBinding::unbind) .thenAccept(unbound -> system.terminate()); } private Route createRoute () { return concat( get(() -> pathPrefix("hello" , () -> complete("Hello.World" ))), get(() -> pathPrefix("item" , () -> { Item item = new Item ("MeteorCat" , 2026L ); return completeOK(item, Jackson.marshaller()); })) ); } private record Item (String name, long id) { @JsonCreator private Item (@JsonProperty("name") String name, @JsonProperty("id") long id) { this .name = name; this .id = id; } } }
看官方文档更详细所以就不太讲述太多细节, 主要用来做 JSON-API 网关的时候可能用得比较多, 后续需要用到的时候学习下就行了.
需要注意的是 pekko-http 不会为默认每个会话创建 actor 对象, 所以如果长链接需要再动态构建 actor 做消息队列监听
当然其实还有 Http Client 相关 API, 这部分基本用于做远程数据转发或者传输, 使用也是结合 Actor:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import org.apache.pekko.actor.typed.ActorSystem;import org.apache.pekko.actor.typed.javadsl.Behaviors;import org.apache.pekko.http.javadsl.Http;import org.apache.pekko.http.javadsl.model.*;import org.apache.pekko.http.javadsl.server.examples.petstore.Pet;import org.apache.pekko.stream.SystemMaterializer;import java.util.concurrent.CompletionStage;public class ClientSingleRequestExample { public static void main (String[] args) { final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "SingleRequest" ); final CompletionStage<HttpResponse> responseFuture = Http.get(system).singleRequest(HttpRequest.create("https://pekko.apache.org" )); } private Route createRoute (ActorSystem<Void> system) { return concat( get(() -> pathPrefix("hello" , () -> complete("Hello.World" ))), get(() -> pathPrefix("item" , () -> { Item item = new Item ("MeteorCat" , 2026L ); return completeOK(item, Jackson.marshaller()); })), get(() -> pathPrefix("bing" , () -> { final CompletionStage<HttpResponse> responseFuture = Http.get(system).singleRequest(HttpRequest.create("https://bing.com" )); return onComplete(responseFuture, resp -> resp.fold( ex -> complete(StatusCodes.BAD_GATEWAY, "upstream error" ), this ::complete )); })) ); } }
至此基本上就已经大概清楚 Pekko 的 HTTP 驱动方式, 后面就是更加高级的 WebSocket 数据交互.
WebSocket 构建
官方文档: https://pekko.apache.org/docs/pekko-http/current/server-side/websocket-support.html
WebSocket 基于 Http, 只需要声明握手升级(Upgrade Handshake)就可以直接使用, 直接声明路由即可:
1 2 3 Route wsRoute = path("ws" , () -> handleWebSocketMessages(flow));
其中内部支持以下消息类型:
帧类型
官方类
用途
文本
TextMessage
浏览器默认,UTF-8 解码成 String
二进制
BinaryMessage
推荐,直接 ByteString
控制
PingMessage / PongMessage / CloseMessage
框架自动回 pong,一般不用管
这里提到了 Flow 就必须了解数据请求的 背压(), 也就是 Source/Sink/Flow 是什么? 同时还涉及到数据的 Stream 传输?
Stream 就是构成 Source/Sink/Flow 的基础单元, 把用户发送过来的数据视为 数据流(Data Stream), 这三者的作用如下:
构件
类比
背压角色
WebSocket 里具体表现
Source
水龙头
生产端
把业务对象变成 Message 推向客户端
Sink
下水道
消费端
把客户端发来的 Message 吸走并处理
Flow
过滤器
中转站
收到 Message → 业务逻辑 → 发出 Message(echo、广播、加解密)
而 背压(Backpressure) 则是内部为了保护消息正确投递的策略:
当客户端消费速度 < 服务端生产速度时(比如客户端突然网络波动卡顿), 就需要让对应生产数据端暂时挂起等待, 避免消息队列内存爆满
如果客户端消费数据流内部的消息过慢, 发送端将会接收到 onBackpressure 代表需要防止内存爆满来主动丢弃老消息填充的策略
WebSocket 里, Source 是发帧, Sink 是收帧, Flow 是改帧; 三者拼成管道, 背压自动从下游往上游拉
比较简单的日常形容, 可以把这个数据交换过程形容为 “自来水系统”:
Source(水龙头)
服务器想给客户端送水(数据),拧开水龙头就能哗哗流出 Message 帧。
Flow(净水器)
水流经过滤芯(业务逻辑),比如 echo 就把水再染个色,然后继续往下送。
Sink(下水口)
客户端那边有个下水口,水喝得有多快,就往下漏多快;喝得慢,下水口立刻“喊停”。
背压(阀门)
下水口一喊停,水龙头里的阀门自动关小 ,水不再涌出,水池(内存)永远满不了 。
如果水池太小,可以加装“溢流管”——.buffer(50, dropHead),老水直接倒掉,新水继续流。
结果
无论客户端网络卡成什么样,服务器这边既不会淹水(OOM),也不会爆管(阻塞线程) ,整个厨房干干净净。
现在的 WebSocket 服务不再是传统 请求-响应 单向模型, 而是需要以 数据流(Stream) 方式来理解.
那么需要做的就是构建 Sink 和 Source, 最后拼合成 Flow 对客户端数据加工处理返回:
Source: 服务端 → 客户端, 服务端主动推送消息用到
Sink: 客户端 → 服务端, 客户端投递消息时候会用到
Flow: 将 Sink 和 Source 两者拼合在一起从而实现类似于数据传输管道
最后的代码逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Source<Message, NotUsed> faucet = Source .queue(50 , OverflowStrategy.dropHead()) .mapMaterializedValue(queueRef -> { }); Sink<Message, NotUsed> drain = Sink .foreach(msg -> handle(msg)); Flow<Message, Message, NotUsed> filter = Flow.of(Message.class) .filter(Message::isBinary) .map(this ::decrypt) .map(this ::encrypt); Flow<Message, Message, NotUsed> pipeline = Flow.fromSinkAndSource(drain, faucet).via(filter);
这里以 echo 数据请求为例, 来做个 WebSocket 回显服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 import org.apache.pekko.NotUsed;import org.apache.pekko.actor.typed.ActorSystem;import org.apache.pekko.actor.typed.javadsl.Behaviors;import org.apache.pekko.http.javadsl.Http;import org.apache.pekko.http.javadsl.ServerBinding;import org.apache.pekko.http.javadsl.model.ws.BinaryMessage;import org.apache.pekko.http.javadsl.model.ws.Message;import org.apache.pekko.http.javadsl.server.AllDirectives;import org.apache.pekko.http.javadsl.server.Route;import org.apache.pekko.stream.OverflowStrategy;import org.apache.pekko.stream.javadsl.Flow;import org.apache.pekko.stream.javadsl.Sink;import org.apache.pekko.stream.javadsl.Source;import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;import org.apache.pekko.util.ByteString;import java.util.concurrent.CompletionStage;public class PekkoWebSocketExample extends AllDirectives { final ActorSystem<Void> system; private SourceQueueWithComplete<Message> queueRef; public PekkoWebSocketExample (ActorSystem<Void> system) { this .system = system; } public static void main (String[] args) throws Exception { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "routes" ); final Http http = Http.get(system); PekkoWebSocketExample app = new PekkoWebSocketExample (system); final CompletionStage<ServerBinding> binding = http.newServerAt("localhost" , 18880 ).bind(app.createRoute()); System.out.println("Server online at http://localhost:18880/\nPress RETURN to stop..." ); int ignore = System.in.read(); binding .thenCompose(ServerBinding::unbind) .thenAccept(unbound -> system.terminate()); } private Route createRoute () { return concat( get(() -> pathPrefix("hello" , () -> complete("Hello.World" ))), get(() -> pathPrefix("echo" , () -> handleWebSocketMessages(createEchoFlow()))) ); } private Flow<Message, Message, NotUsed> createEchoFlow () { Source<Message, NotUsed> faucet = Source.<Message>queue(50 , OverflowStrategy.dropHead()) .mapMaterializedValue(queueRef -> { this .queueRef = queueRef; return NotUsed.getInstance(); }); Sink<Message, NotUsed> drain = Flow.<Message>create() .filter(BinaryMessage.class::isInstance) .map(BinaryMessage.class::cast) .flatMapConcat(BinaryMessage::getStreamedData) .to(Sink.foreach(chunk -> queueRef.offer(BinaryMessage.create(chunk)))); return Flow.fromSinkAndSource(drain, faucet); } }
这里可以采用更加细致化的处理, 比如计算推送给客户端的掉帧计数器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Sink<Message, NotUsed> drain = Flow.<Message>create() .filter(BinaryMessage.class::isInstance) .map(BinaryMessage.class::cast) .flatMapConcat(BinaryMessage::getStreamedData) .map(BinaryMessage::create) .to(Sink.foreach(chunk -> queueRef.offer(chunk).whenComplete((r, e) -> { if (r != QueueOfferResult.enqueued()) { long drops = droppedFrames.incrementAndGet(); if (drops % 1000 == 0 ) { System.err.printf("WebSocket dropped {%d} frames%n" , drops); } } }) ));
不过一般数据都是经可能规避大数据包, 都会严格控制包体大小, 最大消息结构最好控制 65536 字节(64K)左右, pekko 配置可以锁死:
1 2 3 4 5 6 7 8 9 10 11 12 pekko.http.websocket { max-message-size = 65536 # 字节,超过立即断开连接 # 另外还有心跳设置 # 需要注意, 默认 Pekko 每 30s 自动发 Ping(空 payload), 客户端必须回 Pong(注意: 是必须), 否则主动断开 # 而如果采用 pong 模式则是由服务端主动发 Pong, 客户端收到后无需回复 periodic-keep-alive-mode = pong # 让服务端主动发起 Pong periodic-keep-alive-max-idle = 30s # 每次间隔 30s # 如果打算自己做应用层心跳, 则是按照以下方式关闭 # pekko.http.server.websocket.periodic-keep-alive-max-idle = infinite }
大数据包传输在高并发的过程当中是必须要避免的, 宁愿将业务大数据拆分成多个小包多次传输也不要构建成一个大包直接传输.
单帧的数据过大会导致 TCP 消息挤占缓冲区, 从而影响消息队列入列导致消息被阻塞, 所以一定要严格把控消息单帧大小.
其实构建 Flow 的过程可以视为设计消息管道, 而运行之后就会当作流水线处理客户端和服务端的数据交互
这里请求 ws://localhost:18880/echo 并且发送二进制消息就可以测试, 具体可以采用客户端来做
流处理
官方文档: https://pekko.apache.org/docs/pekko/current/stream/stream-io.html#working-with-streaming-io
上面虽然简略讲述 Source(输出)/Sink(输入)/Flow(流处理) 的使用方法, 但底层很多东西都没说明, 需要边看官方文档来处理.
上面文档官方提供简单的 TCP 服务器数据流以 Framing.delimiter(后面会详细说明) 分隔符切分消息的交换方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 connections.runForeach( connection -> { System.out.println("New connection from: " + connection.remoteAddress()); final Flow<ByteString, ByteString, NotUsed> echo = Flow.of(ByteString.class) .via( Framing.delimiter( ByteString.fromString("\n"), 256, FramingTruncation.DISALLOW)) .map(ByteString::utf8String) .map(s -> s + "!!!\n") .map(ByteString::fromString); connection.handleWith(echo, system); }, system);
这里的数据流运行方式如下图所示:
可以看到关键的 Flow 就是将数据设置成具体的 “消息管道” 来调整输入和输出; 这种模式不止应用于网络流, 所有文件流/二进制流等都支持.
所以记住: 如果要构建消息流则必须要有 输入源(Sink)/输出源(Source)/管道处理(Flow) 这三者
这里需要学习下 Source/Sink/Flow 具体的实现思想, 这三者其实就是 pekko 底层的异步驱动方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 - Source<Out>: 代表仅有生成者模式, 即只输出数据而不去接收数据 - Sink<In>: 代码仅有消费者模式, 即只接收数据而不做生成数据需求 - Flow<In,Out>: 中间的传输层, 负责协调资源的输入与输出 目前已经知道前面的所需参数, 但是看到声明出现附加参数 - Source<Message, NotUsed> - Sink<Message, NotUsed> - Flow<Message, Message, NotUsed> 这里面的 NotUsed 是声明什么的? NotUsed 其实是 Void 指代, 也就是代表消息管道生成之后并不需要返回结果 有的时候需要在输入输出的过程之中推送/获取结果, 就需要这个参数 这里的 NotUsed 参数可以用以下做替代 NotUsed → 我不关心任何附加结果: 不需要做返回数据处理 CompletionStage<T> → 异步计算结果: 获取异步处理结果 SourceQueue<T> → 可以主动推送数据的队列: 声明需要外部等待执行返回数据 ActorRef → 背后对应的Actor引用: 将数据结果推送到指定 Actor 对象 这里构建个 Slink 响应的消息通道, 用来要求管道执行命令之后返回结果
第三个参数具体应用场景:
类型
准确描述
获取方式示例
典型使用场景
NotUsed
不关心任何附加结果,仅关注流处理过程本身
Keep.left() / Keep.right() / Keep.none()
日志记录、数据转换、简单ETL
CompletionStage
异步计算最终结果的句柄,可获取流的最终状态
Sink.last() / Sink.fold() / Sink.seq()
聚合统计、最终值计算、结果汇总
SourceQueue
可外部主动推送数据的队列接口,提供offer/complete方法
Source.queue()
用户输入、事件驱动、实时数据注入
ActorRef
将流数据路由到指定Actor的引用,实现与Actor系统集成
Sink.actorRef()
命令模式、消息分发、Actor生态整合
这里实现利用 Source/Sink 来计算数值的的方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class PekkoValuesExample extends AllDirectives { public static void main (String[] args) throws Exception { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "routes" ); Materializer materializer = Materializer.createMaterializer(system); Source<Integer, SourceQueueWithComplete<Integer>> source = Source.queue(100 , OverflowStrategy.backpressure()); Sink<Integer, CompletionStage<Integer>> sink = Sink.fold(0 , Integer::sum); Pair<SourceQueueWithComplete<Integer>, CompletionStage<Integer>> pair = source .toMat(sink, Keep.both()) .run(materializer); SourceQueueWithComplete<Integer> queue = pair.first(); CompletionStage<Integer> future = pair.second(); RandomGenerator randomGenerator = RandomGenerator.getDefault(); for (int i = 1 ; i <= 10 ; i++) { int value = randomGenerator.nextInt(10 ); System.out.println("Value = " + value); queue.offer(value); } queue.complete(); future.thenAccept(sum -> { System.out.println("Total = " + sum); }); future.toCompletableFuture().join(); } }
这里就是实现底层异步任务的包装, 如果想灵活使用 pekko 的 actor 功能, 学习包装异步任务是必不可少的任务.
不仅仅是 Stream 数据流需要用到异步任务, 其他很多需要返回结果的任务都是必须要的(比如支付扣款必须要知道结果)
游戏服务端购买都是直接等待异步更新, 对于数据一致性不是那么高, 所以不用实时等待, 也就是购买道具只管下发数值变动即可而不会阻塞等待结果
涉及到需要数据强一致性就需要对异步包装有一定了解, 但是必须要做好取舍: 当包装成异步任务的时候, 执行过程是阻塞等待结果的
高级构建
参考网站: https://pekko.apache.org/japi/pekko/1.0/org/apache/pekko/stream/javadsl/Framing.html
上面已经简单实现 webSocket-echo, 但消息其实不是那么简单, 一般采用 消息长度(int32)+消息ID(int32)+消息主体(byte[]) 格式.
这种就是标准的 header(int64) + body(byte[]) 方式, 还有采用特殊分割符切分的方法
这里的 int32 首位需要拆出来作为消息 id 识别, 次位的 int32 则是用于从流之中提取指定长度内容构建.
现有流处理其实比较抽象且不好理解, 目前很多都会采用专门的消息列表附加合并成然后再执行分帧, 涉及到以下需要思考:
必须在功能类维护 ByteString.emptyByteString() 空消息
读取到客户端消息的时候是将其合并(concat)到 ByteString
合并完成之后再做数据分帧(frame), 跳过不合法的帧来重新切分投递到对应 Actor
这种方式其实也被称为 ‘消息拆帧’, 用来将消息构建成队列化处理
这里其实要用到 pekko 的 Framing.lengthField 来做预定义数据分帧处理, 这里可以看下官方的方法信息:
这里其实内部已经提供两个预定义分包函数:
Framing.delimiter: 提取内部的分割符切分成数据帧
Framing.lengthField: 提取内部的长度字段获取数据长度构成数据帧
Framing.simpleFramingProtocol: 简约版本的 Framing.lengthField 处理
分包方式
适用协议
典型应用
优点
缺点
lengthField
二进制协议
RPC、游戏协议
精准分包
需要预知帧结构
delimiter
文本协议
HTTP、日志
简单直观
分隔符冲突风险
推荐采用 lengthField, 因为 delimiter 可能会出现数据也带有分隔符直接影响分帧
这里提供下具体函数定义:
1 2 3 4 5 6 7 public static Flow<ByteString, ByteString, NotUsed> lengthField ( int fieldLength, int fieldOffset, int maximumFrameLength, java.nio.ByteOrder byteOrder) { }
参数
含义
关键说明
fieldLength
长度字段的字节数
告诉解码器用多少字节来表示后续数据的长度(常用值:1, 2, 4, 8)
fieldOffset
长度字段的偏移量
从帧开始到长度字段起始位置的字节数,允许帧头有其他字段
maximumFrameLength
最大帧长度
包含头部在内的总长度 (fieldOffset + fieldLength + 数据长度),超过此值流会失败
byteOrder
字节序
解码长度字段时使用的大端或小端字节序
假设一个典型帧格式:
1 2 3 4 5 [帧头其他字段] [长度字段] [实际数据] ↑ ↑ ↑ | | | fieldOffset fieldLength 数据长度(由长度字段决定) [起始偏移] [字段的长度值] [数据结构体]
如果是应用到游戏 Protobuf, 建议 maximumFrameLength 按照以下场景来做分配:
游戏类型
典型消息大小
推荐 maximumFrameLength
卡牌/回合制
0.5-2KB
16KB - 64KB
MOBA/RTS
1-10KB
64KB - 256KB
MMORPG
2-50KB
128KB - 1MB
开放世界
10-200KB
512KB - 2MB
批量同步
可达数MB
4MB - 16MB
如果想平衡点处理, 可以采用 256KB = 256*1024 来作为最小单帧长度, 可以覆盖大部分场景
分帧原理就是提取出指定位置的数值当作消息包长度, 然后从数据流当中自动分片出来, 可以通过 PekkoStream 结合构建:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 import org.apache.pekko.actor.typed.ActorSystem;import org.apache.pekko.actor.typed.javadsl.Behaviors;import org.apache.pekko.http.javadsl.Http;import org.apache.pekko.http.javadsl.ServerBinding;import org.apache.pekko.http.javadsl.model.ws.BinaryMessage;import org.apache.pekko.http.javadsl.model.ws.Message;import org.apache.pekko.http.javadsl.server.AllDirectives;import org.apache.pekko.http.javadsl.server.Route;import org.apache.pekko.stream.*;import org.apache.pekko.stream.javadsl.*;import org.apache.pekko.util.ByteString;import java.nio.ByteBuffer;import java.nio.ByteOrder;import java.nio.charset.StandardCharsets;import java.time.Duration;import java.util.Arrays;import java.util.Base64;import java.util.Objects;import java.util.concurrent.CompletionStage;public class PekkoFramedExample extends AllDirectives { final ActorSystem<Void> system; private static final int FIELD_LENGTH = Integer.BYTES; private static final int FIELD_OFFSET = 0 ; private static final int MAX_FRAME_LENGTH = 256 * 1024 ; private static final ByteOrder BYTE_ORDER = ByteOrder.BIG_ENDIAN; private static final int MSG_ID_OFFSET = FIELD_OFFSET + FIELD_LENGTH; private static final int MSG_BODY_OFFSET = MSG_ID_OFFSET + 4 ; public PekkoFramedExample (ActorSystem<Void> system) { this .system = system; } public static void main (String[] args) throws Exception { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "routes" ); String message = "Hello.World" ; byte [] bytes = message.getBytes(StandardCharsets.UTF_8); ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + bytes.length); buffer.putInt(Integer.BYTES + bytes.length); buffer.putInt(10001 ); buffer.put(bytes); System.out.printf("Test Bytes: %s%n" , Arrays.toString(buffer.array())); String base64 = Base64.getEncoder().encodeToString(buffer.array()); System.out.printf("Test Base64: %s%n" , base64); final Http http = Http.get(system); PekkoFramedExample app = new PekkoFramedExample (system); final CompletionStage<ServerBinding> binding = http.newServerAt("localhost" , 18880 ).bind(app.createRoute()); System.out.println("Server online at http://localhost:18880/\nPress RETURN to stop..." ); int ignore = System.in.read(); binding .thenCompose(ServerBinding::unbind) .thenAccept(unbound -> system.terminate()); } private Route createRoute () { return concat(get(() -> pathPrefix("frame" , () -> handleWebSocketMessages(createFrameFlow().mapMaterializedValue(queueRef -> { return queueRef; }))))); } private Flow<Message, Message, ?> createFrameFlow() { Flow<ByteString, ByteString, ?> frameDecoder = Framing.lengthField( FIELD_LENGTH, FIELD_OFFSET, MAX_FRAME_LENGTH, BYTE_ORDER ); return Flow.<Message>create() .map(msg -> { if (msg instanceof BinaryMessage bm) { return bm.getStrictData(); } else { return ByteString.emptyByteString(); } }) .filter(bs -> !bs.isEmpty()) .via(frameDecoder) .recover(Throwable.class, () -> { system.log().error("[拆帧异常]" ); return ByteString.emptyByteString(); }) .filter(byteString -> !byteString.isEmpty()) .map(this ::parseFrame) .filter(Objects::nonNull) .map(body -> { System.out.println("===== 解析到完整消息帧 ==================================" ); System.out.println("消息ID:" + body.id()); System.out.println("实际主体长度:" + body.message.size() + " 字节" ); System.out.println("消息主体内容:" + Arrays.toString(body.message.toArray())); System.out.println("======================================================\n" ); return body; }) .backpressureTimeout(Duration.ofSeconds(5 )) .recover(java.util.concurrent.TimeoutException.class, () -> { system.log().warn("[流超时] 消息帧处理超时" ); return null ; }) .filter(Objects::nonNull) .flatMapConcat(frame -> Source.empty()); } private MessageFrame parseFrame (ByteString frame) { try { if (frame.size() < MSG_BODY_OFFSET) { system.log().warn("[帧解析] 无效帧:长度{} < 最小8字节" , frame.size()); return null ; } ByteBuffer buffer = frame .asByteBuffer() .order(BYTE_ORDER); int msgLength = buffer.getInt(FIELD_OFFSET); if (msgLength + FIELD_LENGTH != frame.size()) { system.log().warn("[帧解析] 长度不匹配:声明{} vs 实际{}" , msgLength, frame.size() - FIELD_LENGTH); return null ; } int msgId = buffer.getInt(MSG_ID_OFFSET); ByteBuffer msgBody = buffer.slice(MSG_BODY_OFFSET, msgLength - 4 ); system.log().info("[帧解析] 成功 - ID:{},总长度:{},主体长度:{}" , msgId, frame.size(), msgBody.remaining()); return new MessageFrame (msgId, ByteString.fromByteBuffer(msgBody)); } catch (Exception e) { system.log().error("[帧解析] 失败" , e); return null ; } } public record MessageFrame ( int id, ByteString message ) { } }
如果更加细致的帧队列维护就需要自己去编写 remainingBytes 不断附加和剔除来手动分帧, 如果不是业务不要操作手动分帧.
手动分帧自己维护 remaining 数据队列见过以前老游戏部分代码用过, 爆内存基本上是家常便饭, 需要靠堆设备硬抗处理.
上面采用 Framing.lengthField 只是做说明, 实际在日常当中如果协议没有特殊要求建议采用 Framing.simpleFramingProtocol:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private Flow<Message, Message, ?> createFrameFlow() { BidiFlow<ByteString, ByteString, ByteString, ByteString, NotUsed> bidiFraming = Framing.simpleFramingProtocol(MAX_FRAME_LENGTH); Flow<ByteString, ByteString, ?> frameDecoder = bidiFraming.join(Flow.create()) .recover(Throwable.class, () -> { system.log().error("[分帧异常] 载荷超限/格式错误" ); return ByteString.emptyByteString(); }); }
这种方式就是比较合理的构建方式, 不过目前都是接收流数据数据, 还没有到处理数据阶段, 这部分篇幅太长需要额外区分开来;
可以说 pekko 就是很值得学习和使用的工具库, 掌握其使用可以构建出很高效的网络 Actor 模型.