JavaPekkoPekko 消息交互 Actor
MeteorCatPekko 消息交互
其他文章已经揭示过 Pekko 目前主流支持的网络流消息交互:
基本上需要的网络流处理, Pekko 都封装完成了, 所以也就不需要依赖第三方来处理这些.
TCP/UDP 文档我看目前没有跟进到最新强类型版本处理, 这里主要还是以 TCP 流处理为主
TCP/UDP 之类依赖只需要基础的 actor 即可, 无需其他多余依赖, 目前官网 TCP 样例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| import java.net.InetSocketAddress;
import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.actor.Props; import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.io.Tcp; import org.apache.pekko.io.Tcp.Bound; import org.apache.pekko.io.Tcp.CommandFailed; import org.apache.pekko.io.Tcp.Connected; import org.apache.pekko.io.Tcp.ConnectionClosed; import org.apache.pekko.io.Tcp.Received; import org.apache.pekko.io.TcpMessage; import org.apache.pekko.util.ByteString;
static class Client extends AbstractActor {
final InetSocketAddress remote; final ActorRef listener;
public static Props props(InetSocketAddress remote, ActorRef listener) { return Props.create(Client.class, remote, listener); }
public Client(InetSocketAddress remote, ActorRef listener) { this.remote = remote; this.listener = listener;
final ActorRef tcp = Tcp.get(getContext().getSystem()).manager(); tcp.tell(TcpMessage.connect(remote), getSelf()); }
@Override public Receive createReceive() { return receiveBuilder() .match( CommandFailed.class, msg -> { listener.tell("failed", getSelf()); getContext().stop(getSelf()); }) .match( Connected.class, msg -> { listener.tell(msg, getSelf()); getSender().tell(TcpMessage.register(getSelf()), getSelf()); getContext().become(connected(getSender())); }) .build(); }
private Receive connected(final ActorRef connection) { return receiveBuilder() .match( ByteString.class, msg -> { connection.tell(TcpMessage.write((ByteString) msg), getSelf()); }) .match( CommandFailed.class, msg -> { }) .match( Received.class, msg -> { listener.tell(msg.data(), getSelf()); }) .matchEquals( "close", msg -> { connection.tell(TcpMessage.close(), getSelf()); }) .match( ConnectionClosed.class, msg -> { getContext().stop(getSelf()); }) .build(); } }
|
之后启动获取地址的方式如下:
1
| final ActorRef tcpManager = Tcp.get(getContext().getSystem()).manager();
|
目前强类型 typed 还没有对应文档说明怎么用, 还是推荐采用经典的 actor 声明, 这里面就涉及到转化功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
public <T> org.apache.pekko.actor.ActorRef convertActorRef(ActorRef<T> actorRef) { return Adapter.toClassic(actorRef); }
public <T> org.apache.pekko.actor.ActorContext convertActorContext(ActorContext<T> actorContext) { return Adapter.toClassic(actorContext); }
public <T> org.apache.pekko.actor.ActorSystem convertActorSystem(ActorSystem<T> actorSystem) { return Adapter.toClassic(actorSystem); }
public org.apache.pekko.actor.Scheduler convertActorScheduler(Scheduler scheduler) { return Adapter.toClassic(scheduler); }
|
这样 Pekko 就可以回滚成经典的 Actor 接口来处理对应所有功能, 但是需要注意以下问题:
-
仅用于创建经典 Actor(如桥接器), 不要用 Classic Context 处理 Typed 消息, 只要 ActorRef 推送避免使用到 ActorContext
-
避免在 Typed Actor 中过度使用 Classic Context, 否则会丧失 Typed 的类型安全优势
-
禁止直接向转化后的 Classic ActorRef 发送原生 IO 消息, 也就是发送不是原本 Typed 声明类型的消息
-
有些老模板只有经典的 Actor 实现, 如果强类型声明太过复杂的情况可以直接改由经典模式声明 Actor
所以这样处理时候的 TCP 服务端只需要按照下面这样编写即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
| import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.typed.*; import org.apache.pekko.actor.typed.javadsl.*; import org.apache.pekko.event.Logging; import org.apache.pekko.event.LoggingAdapter; import org.apache.pekko.io.Tcp; import org.apache.pekko.io.TcpMessage; import org.apache.pekko.util.ByteString;
import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Objects;
public class PekkoTcpExample {
public static void main(String[] args) throws Exception { final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "tcp-example");
final ActorRef<Void> server = system.systemActorOf(Behaviors.setup(context -> { org.apache.pekko.actor.ActorContext classicContext = Adapter.toClassic(context); classicContext.actorOf(PekkoTcpActor.props( org.apache.pekko.actor.ActorRef.noSender(), "127.0.0.1", 18881, 128 )); return Behaviors.empty(); }), "tcp-service", Props.empty());
System.out.println("Server online at tcp://localhost:18880/\nPress RETURN to stop..."); int ignore = System.in.read(); system.terminate(); }
public static class PekkoTcpActor extends AbstractActor {
final org.apache.pekko.actor.ActorRef manager;
final org.apache.pekko.actor.ActorRef handler;
final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);
final String hostname;
final int port;
final int backlog;
private PekkoTcpActor(org.apache.pekko.actor.ActorRef manager, String hostname, int port, int backlog) { this.manager = manager; this.handler = Tcp.get(getContext().getSystem()).manager(); this.hostname = hostname; this.port = port; this.backlog = backlog; }
public static org.apache.pekko.actor.Props props(org.apache.pekko.actor.ActorRef manager, String hostname, int port, int backlog) { return org.apache.pekko.actor.Props.create(PekkoTcpActor.class, () -> new PekkoTcpActor(manager, hostname, port, backlog)); }
@Override public void preStart() { handler.tell(TcpMessage.bind(getSelf(), new InetSocketAddress(hostname, port), backlog), getSelf()); }
@Override public void postStop() { handler.tell(TcpMessage.unbind(), getSelf()); }
@Override public Receive createReceive() { return receiveBuilder() .match(Tcp.Bound.class, msg -> { logger.info("TCP Listen:{}", msg.localAddress()); if (Objects.nonNull(manager)) { manager.tell(msg, getSelf()); } }) .match(Tcp.CommandFailed.class, msg -> { logger.error("TCP Listen Failed, Address:{}:{}, already in use?", hostname, port); getContext().stop(getSelf()); }) .match(Tcp.Connected.class, connected -> { logger.info("New Connect, REMOTE:{} → LOCAL:{}", connected.remoteAddress(), connected.localAddress());
if (Objects.nonNull(manager)) { manager.tell(connected, getSelf()); }
final org.apache.pekko.actor.ActorRef handler = getContext().actorOf(org.apache.pekko.actor.Props.create(SimplisticHandler.class));
handler.tell(connected, getSender()); }).build(); } }
static class SimplisticHandler extends AbstractActor {
final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);
@Override public Receive createReceive() { return receiveBuilder() .match(Tcp.Connected.class, msg -> { logger.info("Session Connected: {}", msg.remoteAddress());
getSender().tell(TcpMessage.register(getSelf()), getSelf()); }) .match(Tcp.Received.class, msg -> { final ByteString data = msg.data(); logger.info("Message: {}", Arrays.toString(data.toArray()));
getSender().tell(TcpMessage.write(data), getSelf()); }) .match(Tcp.ConnectionClosed.class, msg -> { logger.info("Closed, Cause: {}", msg.getErrorCause()); getContext().stop(getSelf()); }) .build(); } } }
|
这里编写了个 echo 功能的系统服务, 具体直接用以下命令即可连接:
1 2 3
| nc 127.0.0.1 18881 # 有的发行版内置 netcat, 而找不到 nc 指令就用以下命令 netcat 127.0.0.1 18881
|
客户端连接到服务端之后每个连接都会被动态创建 Actor 维护, 可以方便做些业务逻辑, 比如挂载自己编写鉴权模块等处理.
这里就是很标准的动态 Actor 托管设计, 建议如果要设计自己的 Actor 管理器最好学习理解
理解和设计
如果是设计对外暴露的网络消息传输功能, 那么不可避免的需要支持多个客户端同时连接到服务端并且保证非阻塞且线程安全.
如果将所有会话读写操作集中在单个 Actor 之中运行, 会导致完全无法发挥服务器的 CPU 性能.
单个 Actor 的执行流程是单线程运行的, 多个 Actor 才会运行多个 CPU 核心之中, 这也是 Actor 天然线程安全的关键
这里还需要结合流处理设计读写双队列就可以做到消息分帧操作, 结合起来就可以手写出不错的 Actor 服务端功能
需要重新整合出 SimplisticHandler 类来追加流处理功能(注意: 这里要用到 pekko-stream 相关)
双队列(inbound + outbound)设计可以参照关于 PekkoStream 相关文章, 很好实现的消息队列功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
| import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.typed.*; import org.apache.pekko.actor.typed.javadsl.*; import org.apache.pekko.event.Logging; import org.apache.pekko.event.LoggingAdapter; import org.apache.pekko.io.Tcp; import org.apache.pekko.io.TcpMessage; import org.apache.pekko.stream.*; import org.apache.pekko.stream.javadsl.*; import org.apache.pekko.util.ByteString; import org.apache.pekko.util.ByteStringBuilder;
import java.net.InetSocketAddress; import java.util.Objects;
public class PekkoTcpExample {
public static void main(String[] args) throws Exception { final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "tcp-example");
final ActorRef<Void> server = system.systemActorOf(Behaviors.setup(context -> { org.apache.pekko.actor.ActorContext classicContext = Adapter.toClassic(context); classicContext.actorOf(PekkoTcpActor.props( org.apache.pekko.actor.ActorRef.noSender(), "127.0.0.1", 18881, 128 )); return Behaviors.empty(); }), "tcp-service", Props.empty());
System.out.println("Server online at tcp://localhost:18880/\nPress RETURN to stop..."); int ignore = System.in.read(); system.terminate(); }
public static class PekkoTcpActor extends AbstractActor {
final org.apache.pekko.actor.ActorRef manager;
final org.apache.pekko.actor.ActorRef handler;
final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);
final String hostname;
final int port;
final int backlog;
private PekkoTcpActor(org.apache.pekko.actor.ActorRef manager, String hostname, int port, int backlog) { this.manager = manager; this.handler = Tcp.get(getContext().getSystem()).manager(); this.hostname = hostname; this.port = port; this.backlog = backlog; }
public static org.apache.pekko.actor.Props props(org.apache.pekko.actor.ActorRef manager, String hostname, int port, int backlog) { return org.apache.pekko.actor.Props.create(PekkoTcpActor.class, () -> new PekkoTcpActor(manager, hostname, port, backlog)); }
@Override public void preStart() { handler.tell(TcpMessage.bind(getSelf(), new InetSocketAddress(hostname, port), backlog), getSelf()); }
@Override public void postStop() { handler.tell(TcpMessage.unbind(), getSelf()); }
@Override public Receive createReceive() { return receiveBuilder() .match(Tcp.Bound.class, msg -> { logger.info("TCP Listen:{}", msg.localAddress()); if (Objects.nonNull(manager)) { manager.tell(msg, getSelf()); } }) .match(Tcp.CommandFailed.class, msg -> { logger.error("TCP Listen Failed, Address:{}:{}, already in use?", hostname, port); getContext().stop(getSelf()); }) .match(Tcp.Connected.class, connected -> { logger.info("New Connect, REMOTE:{} → LOCAL:{}", connected.remoteAddress(), connected.localAddress());
if (Objects.nonNull(manager)) { manager.tell(connected, getSelf()); }
final org.apache.pekko.actor.ActorRef handler = getContext().actorOf(org.apache.pekko.actor.Props.create(SimplisticHandler.class)); handler.tell(connected, getSender()); }).build(); } }
static class SimplisticHandler extends AbstractActor {
final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);
final Materializer materializer = Materializer.createMaterializer(getContext().getSystem());
private org.apache.pekko.actor.ActorRef session;
record FrameMessage(ByteString message) { }
final SourceQueueWithComplete<ByteString> inbound = Source.<ByteString>queue( 1024, OverflowStrategy.backpressure() ).via(Framing.delimiter( ByteString.fromString("\n"), 1024, FramingTruncation.ALLOW )).to(Sink.foreach(message -> { logger.info("Message Request: {}", message);
getSelf().tell(new FrameMessage(message), getSender()); })).run(materializer);
final SourceQueueWithComplete<ByteString> outbound = Source.<ByteString>queue( 1024, OverflowStrategy.backpressure() ).to(Sink.foreach(message -> {
ByteStringBuilder builder = ByteString.createBuilder(); builder.append(ByteString.fromString("Pekko Say: ")); builder.append(message); builder.append(ByteString.fromString("\n")); ByteString newMessage = builder.result();
logger.info("Message Request: {}", newMessage);
session.tell(TcpMessage.write(newMessage), getSelf()); })).run(materializer);
@Override public Receive createReceive() { return receiveBuilder() .match(Tcp.Connected.class, msg -> { logger.info("Session Connected: {}", msg.remoteAddress()); this.session = getSender();
session.tell(TcpMessage.register(getSelf()), getSelf()); })
.match(Tcp.Received.class, msg -> { final ByteString data = msg.data();
inbound.offer(data).whenComplete((result, throwable) -> { if (Objects.nonNull(throwable)) { logger.warning(throwable, "消息队列异常"); } else { if (QueueOfferResult.enqueued().equals(result)) { logger.debug("消息入队成功:{}字节", data.length()); } else if (QueueOfferResult.dropped().equals(result)) { logger.warning("队列满,消息被丢弃:{}字节", data.length()); } else if (QueueOfferResult.closed().equals(result)) { logger.warning("队列已关闭,消息入队失败:{}字节", data.length()); } } });
}) .match(FrameMessage.class, frameMessage -> { ByteString message = frameMessage.message(); logger.info("Request Frame Message = {}", message.utf8String());
outbound.offer(frameMessage.message()).whenComplete((result, throwable) -> { if (Objects.nonNull(throwable)) { logger.warning(throwable, "消息队列异常"); } else { if (QueueOfferResult.enqueued().equals(result)) { logger.debug("消息入队成功:{}字节", message.length()); } else if (QueueOfferResult.dropped().equals(result)) { logger.warning("队列满,消息被丢弃:{}字节", message.length()); } else if (QueueOfferResult.closed().equals(result)) { logger.warning("队列已关闭,消息入队失败:{}字节", message.length()); } } }); })
.match(Tcp.ConnectionClosed.class, msg -> { logger.info("Closed, Cause: {}", msg.getErrorCause()); inbound.complete(); outbound.complete(); getContext().stop(getSelf()); }) .build(); } } }
|
这就是很简易的双队列消息推送机制, 抛弃了入站数据即出站数据的思路来支持异步队列操作, 外部要发消息通知会话 Actor 来发送.
后续可以扩展出来自行封装成需要的通用网络组件, 因为网络库基本上都是常用功能, 封装起来方便后续复用
实践过程当中发现很多 TypedActor 功能其实还没有实现, 只能转换成经典的 Actor 设计, 后来感觉经典版本 Actor 兼容性和支持度更好点
WebSocket 流
对于 TCP/UDP 做调试测试的时候很麻烦, 想测试消息推送的时候没办法做到客户端直接支持 Framing 分帧消息发送处理
所以接下来需要引入 WebSocket 网络做数据流传输, 相比于 TCP/UDP 客户端, WebSocket 客户端支持广泛(支持二进制转Base64传输)
参考网址: https://pekko.apache.org/docs/pekko/current/stream/stream-refs.html
直接搭建 WebSocket 框架测试样例即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
| import org.apache.pekko.Done; import org.apache.pekko.NotUsed; import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.actor.Props; import org.apache.pekko.event.Logging; import org.apache.pekko.event.LoggingAdapter; import org.apache.pekko.http.javadsl.Http; import org.apache.pekko.http.javadsl.ServerBinding; import org.apache.pekko.http.javadsl.model.AttributeKeys; import org.apache.pekko.http.javadsl.model.HttpRequest; import org.apache.pekko.http.javadsl.model.HttpResponse; import org.apache.pekko.http.javadsl.model.StatusCodes; import org.apache.pekko.http.javadsl.model.ws.BinaryMessage; import org.apache.pekko.http.javadsl.model.ws.Message; import org.apache.pekko.http.javadsl.model.ws.TextMessage; import org.apache.pekko.http.javadsl.server.AllDirectives; import org.apache.pekko.japi.Pair; import org.apache.pekko.stream.CompletionStrategy; import org.apache.pekko.stream.Materializer; import org.apache.pekko.stream.OverflowStrategy; import org.apache.pekko.stream.javadsl.Flow; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.util.ByteString; import org.apache.pekko.util.ByteStringBuilder;
import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage;
public class PekkoFramedExample extends AllDirectives {
public static void main(String[] args) throws Exception { String hostname = "127.0.0.1"; int port = 18889; String path = "/test";
ActorSystem system = ActorSystem.create("websockets");
ActorRef actorRef = system.actorOf(PekkoWebSocketActor.create( hostname, port, path ));
System.out.printf("Server online at ws://%s:%d%s%n", hostname, port, path); System.out.println("Press RETURN to stop..."); int ignore = System.in.read();
system.terminate(); }
public record Connected( ActorRef outbound ) {
}
public static class PekkoWebSocketActor extends AbstractActor {
final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);
final String hostname;
final int port;
final String path;
final Http http = Http.get(getContext().getSystem());
final Materializer materializer = Materializer.createMaterializer(getContext().getSystem());
private CompletionStage<ServerBinding> binding;
private PekkoWebSocketActor(String hostname, int port, String path) { this.hostname = hostname; this.port = port; this.path = path; }
public static Props create(String hostname, int port, String path) { return Props.create(PekkoWebSocketActor.class, () -> new PekkoWebSocketActor( hostname, port, path )); }
@Override public void preStart() { logger.info("WebSocket Started: {}:{}{}", hostname, port, path); this.binding = http .newServerAt(hostname, port) .bind(this::createRoute) .whenComplete((res, throwable) -> { if (Objects.nonNull(throwable)) { logger.error(throwable, "异常的 WebSocket"); getContext().stop(getSelf()); } else { logger.info("启动服务成功"); } }); }
private CompletionStage<HttpResponse> createRoute(HttpRequest request) { if (request.getUri().path().equals(path)) { return CompletableFuture.completedFuture(request .getAttribute(AttributeKeys.webSocketUpgrade) .map(upgrade -> upgrade.handleMessagesWith(createFlow())) .orElse(HttpResponse.create() .withStatus(StatusCodes.BAD_REQUEST) .withEntity("Expected WebSocket request") )); } else { return CompletableFuture.completedFuture(HttpResponse .create() .withStatus(StatusCodes.NOT_FOUND)); } }
private Flow<Message, Message, NotUsed> createFlow() { String name = UUID.randomUUID().toString(); ActorRef actorRef = getContext().actorOf(Props.create( PekkoWebSocketSession.class, PekkoWebSocketSession::new ), name); logger.info("创建新 Actor: {}", actorRef.path());
Pair<ActorRef, Source<Message, NotUsed>> source = Source.<Message>actorRef(elem -> { if (elem == Done.done()) { return Optional.of(CompletionStrategy.immediately()); } else { return Optional.empty(); } }, elem -> Optional.empty(), 1024, OverflowStrategy.dropHead() ).preMaterialize(materializer); actorRef.tell(new Connected(source.first()), getSelf());
Sink<Message, CompletionStage<Done>> sinkBuilder = Sink.foreach(message -> actorRef.tell(message, getSelf())); Sink<Message, NotUsed> sink = sinkBuilder.mapMaterializedValue(future -> { future.whenComplete((done, throwable) -> { this.createQuitComplete(done, throwable, actorRef); getContext().stop(source.first()); }); return NotUsed.getInstance(); });
return Flow.fromSinkAndSource(sink, source.second()) .watchTermination((ignore, future) -> { future.whenComplete((done, throwable) -> { this.createQuitComplete(done, throwable, actorRef); getContext().stop(source.first()); }); return NotUsed.getInstance(); }); }
private void createQuitComplete(Done ignore, Throwable throwable, ActorRef actorRef) { if (Objects.nonNull(throwable)) { logger.error(throwable, "WebSocket 异常断开:{}", actorRef.path()); } else { logger.info("WebSocket 正常断开:{}", actorRef.path()); }
getContext().stop(actorRef); }
@Override public void postStop() { if (Objects.nonNull(binding)) { binding.thenCompose(ServerBinding::unbind) .thenAccept((done) -> logger.info("解除绑定 WebSocket 成功")); } }
@Override public Receive createReceive() { return receiveBuilder() .build(); } }
public static class PekkoWebSocketSession extends AbstractActor {
final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);
ActorRef outbound;
@Override public void postStop() { logger.info("Actor[{}] 已退出", getSelf().path()); }
@Override public Receive createReceive() { return receiveBuilder() .match(Connected.class, command -> this.outbound = command.outbound) .match(BinaryMessage.class, message -> { logger.info("binary message = {}", message.getStrictData());
ByteStringBuilder builder = ByteString.createBuilder(); builder.append(ByteString.fromString("Pekko Say: ")); builder.append(message.getStrictData()); outbound.tell(BinaryMessage.create(builder.result()), getSelf()); }) .match(TextMessage.class, message -> { logger.info("text message = {}", message.getStrictText());
outbound.tell(TextMessage.create("Pekko Say: %s".formatted(message)), getSelf()); }) .build(); } } }
|
这里就是直接做基础读写流操作, 目前没有引入 Framing 功能来做数据分帧
不过因为 WebSocket 本身是高级协议不好动底层, 所有都是有上级 WebSocket Actor 做管道转发过来, 数据量上来的时候可能有瓶颈.
这里最后扩展是在会话的 Actor 追加 inbound 入站的队列, 然后外部数据尽可能快填充到队列, 而不是数据到来直接运行业务逻辑.
避免业务和分帧处理在同一个运行时处理, 后面的会话 Actor 自行读取队列数据来做业务功能处理
强类型 TCP 服务
在查询官方文档之后, 确认其实强类型 TCP/UDP 是支持挂载的, 这边也测试下看看是否能够运行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
| import org.apache.pekko.actor.typed.*; import org.apache.pekko.actor.typed.javadsl.AbstractBehavior; import org.apache.pekko.actor.typed.javadsl.ActorContext; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.actor.typed.javadsl.Receive; import org.apache.pekko.event.Logging; import org.apache.pekko.event.LoggingAdapter; import org.apache.pekko.io.Tcp; import org.apache.pekko.io.TcpExt; import org.apache.pekko.io.TcpMessage;
import java.io.IOException; import java.net.InetSocketAddress;
public class PekkoTcpTypedExample {
public static void main(String[] args) throws IOException {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "pekko-typed-stream"); ActorRef<Tcp.Message> handler = system.systemActorOf(TcpActorStream.create("127.0.0.1", 18889), "tcp-service", Props.empty());
system.getWhenTerminated().thenAccept(done -> { handler.unsafeUpcast().tell(PostStop.instance()); });
System.out.println("Press any key to stop..."); int ignore = System.in.read(); system.terminate(); }
public static class TcpActorStream extends AbstractBehavior<Tcp.Message> {
final TcpExt handler = Tcp.get(getContext().getSystem());
final LoggingAdapter log = Logging.getLogger(getContext().getSystem().classicSystem(), this);
final String hostname;
final int port;
private TcpActorStream(ActorContext<Tcp.Message> context, String hostname, int port) { super(context); this.hostname = hostname; this.port = port; handler.getManager().tell(TcpMessage.bind( getContext().classicActorContext().self(), new InetSocketAddress(hostname, port), 100 ), getContext().classicActorContext().self()); }
public static Behavior<Tcp.Message> create(String hostname, int port) { return Behaviors .setup(context -> new TcpActorStream(context, hostname, port)); }
@Override public Receive<Tcp.Message> createReceive() { return newReceiveBuilder() .onSignal(PostStop.class, (command) -> { log.warning("Actor stopped"); handler.getManager().tell(TcpMessage.unbind(), getContext().classicActorContext().self()); return Behaviors.stopped(); })
.onMessage(Tcp.Bound.class, (command) -> { log.info("Tcp bound {}", command.localAddress()); return Behaviors.same(); })
.onMessage(Tcp.Connected.class, (command) -> { log.info("Connecting to {}", command.remoteAddress());
getContext().classicActorContext().sender().tell( TcpMessage.register(getContext().classicActorContext().self()), getContext().classicActorContext().self() );
return Behaviors.same(); })
.onMessage(Tcp.CommandFailed.class, (command) -> { log.info("Connection failed by {}", command); return Behaviors.same(); })
.onMessage(Tcp.Received.class, (command) -> { log.info("Received {}", command); return Behaviors.same(); })
.onAnyMessage((event) -> { log.info("Other event: {}", event); return Behaviors.same(); })
.build(); } } }
|