注意: 需要前置学习 Quarkus 集成 Pekko 篇章配置 Quarkus + Pekko 作为服务端基础
这里的篇章暂时不涉及 集群 概念, 一旦引入集群概念可能 Actor 概念更加复杂.
参考之前说的 skynet 的源码配置就可以知道, 一般都会启动 socket 连接之后会动态创建 Actor 挂载:
如果是初学者推荐采用 WebSocket 做网络数据交换成, 主要原因是:
-
协议简单: 内部已经做好数据分包, 不需要手动去将包划分
-
数据可视化: 数据内容可以比较直观通过客户端发送数据(Postman之类应用可以直接发送)
-
集成度广泛: 基本上全平台通用, 甚至于随便编写 html 页面也能作为客户端
而 WebSocket 的基本回调事件如下, 需要先提前封装对应事件信号(这里的生命周期名称可以随便修改):
| 生命周期名称 |
触发时机 |
核心作用 |
Actor 场景适配 |
Connected |
客户端与服务端成功建立连接会话后 |
初始化连接资源(创建 Actor、绑定会话) |
动态创建 AgentActor,关联会话句柄 |
TextMessage |
服务端收到客户端发送的文本消息时 |
接收并转发文本消息给 Actor 处理 |
封装文本数据包发送给当前连接的 Actor |
BinaryMessage |
服务端收到客户端发送的二进制消息时 |
处理二进制数据(文件、字节流等) |
封装二进制数据包转发给 Actor |
SessionException |
连接过程中发生异常时(IO 错误、解码失败) |
捕获异常,释放资源(销毁 Actor) |
通知 Actor 处理异常,避免资源泄漏 |
Disconnect |
连接关闭时(客户端主动断开/服务端关闭) |
清理资源(销毁 Actor、释放会话) |
发送关闭事件信号给 Actor,触发 Actor 销毁 |
其中还需要说明的是 pekko-actor-typed 的 Signal 机制, 强类型的 Actor 默认在声明的时候就确定消息传递内容.
如果想要传递并非声明类型的消息就需要采用 Signal 自定义系统信号传递, 只需要实现 org.apache.pekko.actor.typed.Signal:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import org.apache.pekko.actor.typed.Signal;
public class OnConnected implements Signal {
private static final OnConnected INSTANCE = new OnConnected();
public static OnConnected instance() { return INSTANCE; } }
|
这里就是简单 OnConnected 信号定义, 而如果要拦截这部分信息需要让 Actor 类去实现 ReceiveBuilder.onSignal 的回调.
因为之前集成过 quarkus cdi(容器管理), 而 quarkus 本身就有外部 websocket 扩展, 所以直接沿用引入即可:
1 2 3 4 5 6 7 8 9 10 11 12
| <dependencies>
<dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-websockets-next</artifactId> </dependency> </dependencies>
|
这部分单独挂载的服务即可, 这部分按照官方文档处理下就行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| import com.google.protobuf.MessageLite; import io.quarkus.websockets.next.*; import jakarta.inject.Inject; import org.apache.pekko.actor.typed.ActorSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.util.Arrays;
@WebSocket(path = "/pino") public class PinoWebSocket {
static final Logger logger = LoggerFactory.getLogger(PinoWebSocket.class);
@Inject ActorSystem<MessageLite> system;
@OnOpen public void onConnected(WebSocketConnection connection) { logger.debug("Websocket Connected, id={}", connection.id());
}
@OnClose public void onDisconnected(WebSocketConnection connection) { logger.debug("Websocket Disconnected, id={}", connection.id()); }
@OnTextMessage public void onTextMessage(WebSocketConnection connection, String message) { logger.debug("Websocket TextMessage, id={}, message={}", connection.id(), message); }
@OnBinaryMessage public void onBinaryMessage(WebSocketConnection connection, byte[] message) { logger.debug("Websocket Binary Message, id={}, message={}", connection.id(), Arrays.toString(message)); }
@OnError public void onError(WebSocketConnection connection, Exception throwable) { logger.error("Websocket Exception", throwable); } }
|
这里随便用类似 Postman 类似客户端就能测试推送, 现在就需要改造 Signal 把参数防止内部, 这里想定义连接和断开信号处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
public record OnConnected(WebSocketConnection socket) implements Signal {
}
public record OnBytes(WebSocketConnection socket, byte[] message) implements Signal { }
public record OnText(WebSocketConnection socket, String message) implements Signal { }
public record OnException(WebSocketConnection socket, Exception exception) implements Signal { }
public record OnDisconnect(WebSocketConnection socket) implements Signal {
}
|
这几个就是转发到 Actor 主要信号内容, 这部分的 WebSocketConnection 对象可以考虑只需要初始化 Actor 传递即可.
一般动态创建 Actor 时就已经视为获取到 WebSocketConnection 句柄, 后续传递其实可有可无, 所以后续考虑不需要附带上
之后就是挂载的会话 Actor, 这部分直接简单处理即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| import com.google.protobuf.MessageLite; import io.meteorcat.game.signal.*; import io.quarkus.websockets.next.WebSocketConnection; import org.apache.pekko.actor.typed.Behavior; import org.apache.pekko.actor.typed.javadsl.AbstractBehavior; import org.apache.pekko.actor.typed.javadsl.ActorContext; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.actor.typed.javadsl.Receive; import org.slf4j.Logger;
public class PinoSession extends AbstractBehavior<MessageLite> {
final WebSocketConnection socket;
final Logger logger = getContext().getLog();
private PinoSession(ActorContext<MessageLite> context, WebSocketConnection socket) { super(context); this.socket = socket; }
public static Behavior<MessageLite> create(WebSocketConnection socket) { return Behaviors.setup(context -> new PinoSession(context, socket)); }
@Override public Receive<MessageLite> createReceive() { return newReceiveBuilder() .onSignal(OnConnected.class, this::onConnected) .onSignal(OnDisconnect.class, this::onDisconnect) .onSignal(OnException.class, this::onException) .onSignal(OnText.class, this::onTextMessage) .onSignal(OnBytes.class, this::onBinaryMessage) .build(); }
private Behavior<MessageLite> onConnected(OnConnected signal) { logger.info("actor connected by id={}", signal.socket().id()); return Behaviors.same(); }
private Behavior<MessageLite> onDisconnect(OnDisconnect signal) { logger.info("actor disconnected by id={}", signal.socket().id());
return Behaviors.stopped(); }
private Behavior<MessageLite> onException(OnException signal) { logger.info("actor exception by id={}", signal.socket().id()); return Behaviors.same(); }
private Behavior<MessageLite> onTextMessage(OnText signal) { logger.info("actor text message by id={}", signal.socket().id()); return Behaviors.same(); }
private Behavior<MessageLite> onBinaryMessage(OnBytes signal) { logger.info("actor binary message by id={}", signal.socket().id()); return Behaviors.same(); } }
|
最后追加代码到具体的 WebSocket 会话对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
| import com.google.protobuf.MessageLite; import io.meteorcat.game.signal.*; import io.quarkus.websockets.next.*; import jakarta.inject.Inject; import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.actor.typed.Props; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.util.Arrays; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap;
@WebSocket(path = "/pino") public class PinoWebSocket {
static final Logger logger = LoggerFactory.getLogger(PinoWebSocket.class);
static Map<WebSocketConnection, ActorRef<MessageLite>> sessions = new ConcurrentHashMap<>();
@Inject ActorSystem<MessageLite> system;
@OnOpen public void onConnected(WebSocketConnection connection) { logger.debug("Websocket Connected, id={}", connection.id());
String name = "session-websocket-%s".formatted(connection.id()); ActorRef<MessageLite> actorRef = system.systemActorOf(PinoSession.create(connection), name, Props.empty()); sessions.put(connection, actorRef);
actorRef.unsafeUpcast().tell(new OnConnected(connection));
logger.debug(system.printTree()); }
@OnClose public void onDisconnected(WebSocketConnection connection) { logger.debug("Websocket Disconnected, id={}", connection.id());
ActorRef<MessageLite> actorRef = sessions.remove(connection); if (Objects.nonNull(actorRef)) { actorRef.unsafeUpcast().tell(new OnDisconnect(connection)); } }
@OnTextMessage public void onTextMessage(WebSocketConnection connection, String message) { logger.debug("Websocket TextMessage, id={}, message={}", connection.id(), message);
ActorRef<MessageLite> actorRef = sessions.get(connection); if (Objects.nonNull(actorRef)) { actorRef.unsafeUpcast().tell(new OnText(connection, message)); }
if (logger.isDebugEnabled() && "TREE".equals(message)) { logger.debug(system.printTree()); } }
@OnBinaryMessage public void onBinaryMessage(WebSocketConnection connection, byte[] message) { logger.debug("Websocket Binary Message, id={}, message={}", connection.id(), Arrays.toString(message));
ActorRef<MessageLite> actorRef = sessions.get(connection); if (Objects.nonNull(actorRef)) { actorRef.unsafeUpcast().tell(new OnBytes(connection, message)); } }
@OnError public void onError(WebSocketConnection connection, Exception throwable) { logger.error("Websocket Exception", throwable);
ActorRef<MessageLite> actorRef = sessions.get(connection); if (Objects.nonNull(actorRef)) { actorRef.unsafeUpcast().tell(new OnException(connection, throwable)); } } }
|
这样就搭建简单的动态扩充 WebSocket 的 Actor 池, 实际其实应该单独处理个根进程来将会话 id 做标识管理, 节点类似如下:
这部分应该全部集中在 websocket 节点下来处理, 放置在外层容易出现 pekko 单节点堆积过多的问题.
一般来说最好同类功能集中于某个分支节点, 不要将所有功能全部集中在统一节点.
至此就处理好数据 Actor 和 WebSocket 的数据交换层, 后续就需要处理以下问题: