WebSocket 和 Pekko 集成

注意: 需要前置学习 Quarkus 集成 Pekko 篇章配置 Quarkus + Pekko 作为服务端基础

这里的篇章暂时不涉及 集群 概念, 一旦引入集群概念可能 Actor 概念更加复杂.

参考之前说的 skynet 的源码配置就可以知道, 一般都会启动 socket 连接之后会动态创建 Actor 挂载:

  • 连接建立时动态创建 Actor 并挂载

  • 连接断开时销毁 Actor

  • 所有业务逻辑通过 Actor 异步处理, 天然实现连接级别的隔离和并发安全

如果是初学者推荐采用 WebSocket 做网络数据交换成, 主要原因是:

  • 协议简单: 内部已经做好数据分包, 不需要手动去将包划分

  • 数据可视化: 数据内容可以比较直观通过客户端发送数据(Postman之类应用可以直接发送)

  • 集成度广泛: 基本上全平台通用, 甚至于随便编写 html 页面也能作为客户端

WebSocket 的基本回调事件如下, 需要先提前封装对应事件信号(这里的生命周期名称可以随便修改):

生命周期名称 触发时机 核心作用 Actor 场景适配
Connected 客户端与服务端成功建立连接会话后 初始化连接资源(创建 Actor、绑定会话) 动态创建 AgentActor,关联会话句柄
TextMessage 服务端收到客户端发送的文本消息时 接收并转发文本消息给 Actor 处理 封装文本数据包发送给当前连接的 Actor
BinaryMessage 服务端收到客户端发送的二进制消息时 处理二进制数据(文件、字节流等) 封装二进制数据包转发给 Actor
SessionException 连接过程中发生异常时(IO 错误、解码失败) 捕获异常,释放资源(销毁 Actor) 通知 Actor 处理异常,避免资源泄漏
Disconnect 连接关闭时(客户端主动断开/服务端关闭) 清理资源(销毁 Actor、释放会话) 发送关闭事件信号给 Actor,触发 Actor 销毁

其中还需要说明的是 pekko-actor-typedSignal 机制, 强类型的 Actor 默认在声明的时候就确定消息传递内容.

如果想要传递并非声明类型的消息就需要采用 Signal 自定义系统信号传递, 只需要实现 org.apache.pekko.actor.typed.Signal:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.apache.pekko.actor.typed.Signal;

/**
* 自定义的连接信号事件
* <p>
* 而强类型 ActorRef 需要发送这类事件就需要采用以下方式:
*
* <pre>
* // 比如推送 OnConnected 信号
* actorRef.unsafeUpcast().tell(new OnConnected());
* // 而如果内部型号仅仅做传递不需要附加其他数据, 建议采用静态化对象传递
* actorRef.unsafeUpcast().tell(OnConnected.instance());
* </pre>
*/
public class OnConnected implements Signal {

/**
* 模拟如果没有参数传递仅仅需要做信号通知, 直接采用静态实例化对象
*/
private static final OnConnected INSTANCE = new OnConnected();

public static OnConnected instance() {
return INSTANCE;
}
}

这里就是简单 OnConnected 信号定义, 而如果要拦截这部分信息需要让 Actor 类去实现 ReceiveBuilder.onSignal 的回调.

因为之前集成过 quarkus cdi(容器管理), 而 quarkus 本身就有外部 websocket 扩展, 所以直接沿用引入即可:

1
2
3
4
5
6
7
8
9
10
11
12
 <!-- 第三方包依赖配置 -->
<dependencies>
<!-- 其他略 -->


<!-- Quarkus WebSocket依赖 -->
<!-- 文档参考: https://quarkus.io/guides/websockets-next-tutorial -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-websockets-next</artifactId>
</dependency>
</dependencies>

这部分单独挂载的服务即可, 这部分按照官方文档处理下就行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import com.google.protobuf.MessageLite;
import io.quarkus.websockets.next.*;
import jakarta.inject.Inject;
import org.apache.pekko.actor.typed.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;

/**
* 声明 WebSocket 服务层
* 访问地址: ws://{HOSTNAME}:{PORT}/pino
*/
@WebSocket(path = "/pino")
public class PinoWebSocket {

/**
* 日志对象
*/
static final Logger logger = LoggerFactory.getLogger(PinoWebSocket.class);

/**
* 全局可用的 Actor 系统 - 备用
*/
@Inject
ActorSystem<MessageLite> system;

/**
* 连接回调
*/
@OnOpen
public void onConnected(WebSocketConnection connection) {
logger.debug("Websocket Connected, id={}", connection.id());

}

/**
* 端开回调
*/
@OnClose
public void onDisconnected(WebSocketConnection connection) {
logger.debug("Websocket Disconnected, id={}", connection.id());
}

/**
* 文本消息传递回调
*/
@OnTextMessage
public void onTextMessage(WebSocketConnection connection, String message) {
logger.debug("Websocket TextMessage, id={}, message={}", connection.id(), message);
}

/**
* 二进制消息传递回调
*/
@OnBinaryMessage
public void onBinaryMessage(WebSocketConnection connection, byte[] message) {
logger.debug("Websocket Binary Message, id={}, message={}", connection.id(), Arrays.toString(message));
}

/**
* 异常传递回调
*/
@OnError
public void onError(WebSocketConnection connection, Exception throwable) {
logger.error("Websocket Exception", throwable);
}
}

这里随便用类似 Postman 类似客户端就能测试推送, 现在就需要改造 Signal 把参数防止内部, 这里想定义连接和断开信号处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* OnConnected.java
* 连接信号事件, 这里采用 Java 高版本 record 特性, 避免写 getter/setter
*
* <pre>
* // 比如推送 OnConnected 信号
* actorRef.unsafeUpcast().tell(new OnConnected());
* // 而如果内部型号仅仅做传递不需要附加其他数据, 建议采用静态化对象传递
* actorRef.unsafeUpcast().tell(OnConnected.instance());
* </pre>
*
* @param socket 会话对象
*/
public record OnConnected(WebSocketConnection socket) implements Signal {

}


/**
* OnBytes.java
* 二进制数据传递
*
* @param socket 会话对象
* @param message 会话消息
*/
public record OnBytes(WebSocketConnection socket, byte[] message) implements Signal {
}

/**
* OnText.java
* 文本数据
*
* @param socket 会话对象
* @param message 会话消息
*/
public record OnText(WebSocketConnection socket, String message) implements Signal {
}


/**
* OnException.java
* 异常信号
*
* @param socket 会话对象
* @param exception 异常对象
*/
public record OnException(WebSocketConnection socket, Exception exception) implements Signal {
}


/**
* OnDisconnect.java
* 会话端开信号
*
* @param socket 会话对象
*/
public record OnDisconnect(WebSocketConnection socket) implements Signal {

}

这几个就是转发到 Actor 主要信号内容, 这部分的 WebSocketConnection 对象可以考虑只需要初始化 Actor 传递即可.

一般动态创建 Actor 时就已经视为获取到 WebSocketConnection 句柄, 后续传递其实可有可无, 所以后续考虑不需要附带上

之后就是挂载的会话 Actor, 这部分直接简单处理即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import com.google.protobuf.MessageLite;
import io.meteorcat.game.signal.*;
import io.quarkus.websockets.next.WebSocketConnection;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.Receive;
import org.slf4j.Logger;

/**
* WebSocket 独立的会话 Actor
*/
public class PinoSession extends AbstractBehavior<MessageLite> {

/**
* 会话对象
*/
final WebSocketConnection socket;

/**
* 日志对象
*/
final Logger logger = getContext().getLog();

/**
* 实例化
*
* @param context 运行时对象
* @param socket 会话对象
*/
private PinoSession(ActorContext<MessageLite> context, WebSocketConnection socket) {
super(context);
this.socket = socket;
}

/**
* 静态构建
*/
public static Behavior<MessageLite> create(WebSocketConnection socket) {
return Behaviors.setup(context -> new PinoSession(context, socket));
}


/**
* 消息拦截
*/
@Override
public Receive<MessageLite> createReceive() {
return newReceiveBuilder()
.onSignal(OnConnected.class, this::onConnected)
.onSignal(OnDisconnect.class, this::onDisconnect)
.onSignal(OnException.class, this::onException)
.onSignal(OnText.class, this::onTextMessage)
.onSignal(OnBytes.class, this::onBinaryMessage)
.build();
}

/**
* 连接事件
*/
private Behavior<MessageLite> onConnected(OnConnected signal) {
logger.info("actor connected by id={}", signal.socket().id());
return Behaviors.same();
}

/**
* 断开事件
*/
private Behavior<MessageLite> onDisconnect(OnDisconnect signal) {
logger.info("actor disconnected by id={}", signal.socket().id());

return Behaviors.stopped(); // 注意这里需要在此声明断开, 否则 Actor 不会主动退出
}

/**
* 异常事件
*/
private Behavior<MessageLite> onException(OnException signal) {
logger.info("actor exception by id={}", signal.socket().id());
return Behaviors.same();
}

/**
* 文本传递事件
*/
private Behavior<MessageLite> onTextMessage(OnText signal) {
logger.info("actor text message by id={}", signal.socket().id());
return Behaviors.same();
}

/**
* 二进制传递事件
*/
private Behavior<MessageLite> onBinaryMessage(OnBytes signal) {
logger.info("actor binary message by id={}", signal.socket().id());
return Behaviors.same();
}
}

最后追加代码到具体的 WebSocket 会话对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import com.google.protobuf.MessageLite;
import io.meteorcat.game.signal.*;
import io.quarkus.websockets.next.*;
import jakarta.inject.Inject;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.Props;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

/**
* 声明 WebSocket 服务层
*/
@WebSocket(path = "/pino")
public class PinoWebSocket {

/**
* 日志对象
*/
static final Logger logger = LoggerFactory.getLogger(PinoWebSocket.class);


/**
* 会话列表
*/
static Map<WebSocketConnection, ActorRef<MessageLite>> sessions = new ConcurrentHashMap<>();

/**
* 全局可用的 Actor 系统 - 备用
*/
@Inject
ActorSystem<MessageLite> system;


/**
* 连接回调
*/
@OnOpen
public void onConnected(WebSocketConnection connection) {
logger.debug("Websocket Connected, id={}", connection.id());

// 动态创建 Actor
String name = "session-websocket-%s".formatted(connection.id());
ActorRef<MessageLite> actorRef = system.systemActorOf(PinoSession.create(connection), name, Props.empty());
sessions.put(connection, actorRef);

// 传递连接信号
actorRef.unsafeUpcast().tell(new OnConnected(connection));

// 打印节点树, ActorSystem.printTree() 是内置方法, 用于获取内部节点关系的字符串
logger.debug(system.printTree());
}

/**
* 端开回调
*/
@OnClose
public void onDisconnected(WebSocketConnection connection) {
logger.debug("Websocket Disconnected, id={}", connection.id());

// 删除会话
ActorRef<MessageLite> actorRef = sessions.remove(connection);
if (Objects.nonNull(actorRef)) {
actorRef.unsafeUpcast().tell(new OnDisconnect(connection));
}
}

/**
* 文本消息传递回调
*/
@OnTextMessage
public void onTextMessage(WebSocketConnection connection, String message) {
logger.debug("Websocket TextMessage, id={}, message={}", connection.id(), message);

// 投递文本消息会话
ActorRef<MessageLite> actorRef = sessions.get(connection);
if (Objects.nonNull(actorRef)) {
actorRef.unsafeUpcast().tell(new OnText(connection, message));
}

// 如果是 TREE 字符串就调用打印树
if (logger.isDebugEnabled() && "TREE".equals(message)) {
logger.debug(system.printTree());
}
}

/**
* 二进制消息传递回调
*/
@OnBinaryMessage
public void onBinaryMessage(WebSocketConnection connection, byte[] message) {
logger.debug("Websocket Binary Message, id={}, message={}", connection.id(), Arrays.toString(message));

// 投递二进制消息会话
ActorRef<MessageLite> actorRef = sessions.get(connection);
if (Objects.nonNull(actorRef)) {
actorRef.unsafeUpcast().tell(new OnBytes(connection, message));
}
}

/**
* 异常传递回调
*/
@OnError
public void onError(WebSocketConnection connection, Exception throwable) {
logger.error("Websocket Exception", throwable);

// 投递异常会话
ActorRef<MessageLite> actorRef = sessions.get(connection);
if (Objects.nonNull(actorRef)) {
actorRef.unsafeUpcast().tell(new OnException(connection, throwable));
}
}
}

这样就搭建简单的动态扩充 WebSocket 的 Actor 池, 实际其实应该单独处理个根进程来将会话 id 做标识管理, 节点类似如下:

  • pekko://{地址}/websocket/e0f05827-83c2-435c-8830-9c1dbb298dfb

  • pekko://{地址}/websocket/e34x3hgc-x34x-s34x-xv3c-9ccv325c8dfc

这部分应该全部集中在 websocket 节点下来处理, 放置在外层容易出现 pekko 单节点堆积过多的问题.

一般来说最好同类功能集中于某个分支节点, 不要将所有功能全部集中在统一节点.

至此就处理好数据 Actor 和 WebSocket 的数据交换层, 后续就需要处理以下问题:

  • 数据库管理

  • 定时器管理

  • 业务逻辑处理