会话管理
目前已经调通数据传输的流程, 但是实际上还有不少细节要优化处理, 之前测试推送的时候是这样:
// 测试发送个数据包连通
Events.C2SConnectedEvent event = Events.C2SConnectedEvent
.newBuilder()
.setSessionId("10001")
.setTimestamp(System.currentTimeMillis())
.setIp("127.0.0.1")
.build();
sessionRef.tell(event, ActorRef.noSender());
这里就需要把 WebSocket 和 Actor 功能桥接起来, 并且当 actor 集群节点中断的时候,
就要让目前 websocket 推送的所有消息返回错误消息给客户端说明 服务异常不可用 的状态.
也就是要把 WebSocket 托管给 Actor 节点管理(注意: 这里不需要创建复杂集群节点, 只需要本地节点即可)
本地
Actor节点交换数据的时候不会走序列化流程, 可以直接传递对象实例然后集中转发集群代理对象
这里需要的就是每次 onOpen 的操作的时候动态创建 Actor 会话获取 ActorRef,
每次消息和异常都转发给内部处理, 如果触发 onClose 操作就通知内部 Actor 清除该 Actor.
改进网关
WebSocket 现在功能需要更精简, 只负责当作数据转发的 工具人, 内部不做任何操作,
这里先重置下功能回归成原始样式:
package io.fortress.websocket;
import io.quarkus.websockets.next.*;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.pekko.actor.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* io.fortress.websocket.WebSocketBootstrap.java
* <p>
* 高并发WebSocket网关, @WebSocket 默认会先加载 quarkus.http.root-path 配置在附加:
* <pre>
* quarkus.http.root-path=xxx
* WebSocket(path = "yyy")
* 最后生成的请求路径会成为 /xxx/yyy
* </pre>
* 官方文档: <a href="https://cn.quarkus.io/guides/websockets-next-reference">websockets-next</a>
*/
@ApplicationScoped
@WebSocket(path = "/bootstrap")
@SuppressWarnings("unused")
public class WebSocketBootstrap {
/**
* 日志对象
*/
final static Logger logger = LoggerFactory.getLogger(WebSocketBootstrap.class);
@Inject
ActorSystem system;
/**
* 连接回调, 现在连接会话只需要创建心跳包任务就行了
*/
@OnOpen
public void onConnected(WebSocketConnection session) {
}
/**
* 会话断开
*/
@OnClose
public void onDisconnect(WebSocketConnection session, CloseReason reason) {
}
/**
* 会话异常
*/
@OnError
public void onError(WebSocketConnection session, Exception e) {
}
/**
* 文本消息传递
*/
@OnTextMessage
public void onTextMessage(WebSocketConnection session, String message) {
}
/**
* 二进制消息传递
*/
@OnBinaryMessage
public void onBinaryMessage(WebSocketConnection session, byte[] message) {
}
}
而对于 Actor 系统现在也不需要处理集群连接, 因为现在集群代理的功能是由本地 Actor 节点自己去初始化:
package io.fortress.websocket;
import io.fortress.websocket.config.WebSocketActorSystemConfiguration;
import io.quarkus.arc.DefaultBean;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import org.apache.pekko.actor.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* io.fortress.websocket.WebSocketActorSystem.java
* <p>
* 加载全局初始化 ActorSystem
*/
@ApplicationScoped
public class WebSocketActorSystem {
/**
* 日志对象
*/
final Logger logger = LoggerFactory.getLogger(WebSocketActorSystem.class);
/**
* 加载 ActorSystem 配置
*/
@Inject
WebSocketActorSystemConfiguration configuration;
/**
* 生成全局唯一Actor系统
*/
@Produces
@Startup
@DefaultBean
@ApplicationScoped
public ActorSystem createActorSystem() {
// 打印配置
logger.info("Creating ActorSystem: {}", configuration.name());
configuration.settings().forEach((key, value) -> logger.info("ActorSystem Setting: {} = {}", key, value));
// 生成 Actor System 并注册系统退出方法, 当 Actor 系统退出的时候要求关闭整个服务
// 无论成功失败都关闭 Quarkus 系统
// 现在集群连接不需要 Actor System 初始化处理, 而是内部自己节点去申请处理, 这样抽象出来的功能越简洁干练
return configuration.createActorSystem((terminatedTry -> {
logger.info("Close ActorSystem, Status: {}", terminatedTry.isSuccess() ? "Success" : "Failure");
// 无论成功失败都关闭 Quarkus 系统
Quarkus.asyncExit();
return null;
}));
}
}
那么现在就只需要 WebSocket 获取挂载 ActorSystem 实例之后动态创建 Actor,
这里更新初版的会话监听器:
package io.fortress.websocket;
import io.fortress.common.utils.ActorExtensions;
import io.quarkus.websockets.next.WebSocketConnection;
import org.apache.pekko.actor.*;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.ClusterEvent;
import org.apache.pekko.cluster.Member;
import org.apache.pekko.cluster.sharding.ShardRegion;
import org.apache.pekko.event.LoggingAdapter;
import java.util.Optional;
/**
* io.fortress.websocket.WebSocketSession.java
* <p>
* 动态 WebSocket 会话节点
*/
public class WebSocketSession extends AbstractActor implements ActorExtensions {
/**
* 日志对象
*/
private final LoggingAdapter logger = context().system().log();
/**
* 获取基础集群对象
*/
private final Cluster cluster = cluster(this);
/**
* 连接会话
*/
private final WebSocketConnection connection;
/**
* 是否当前节点是否可用
*/
private boolean enabled = false;
/**
* 集群节点
*/
private final ActorRef clusterActorRef;
/**
* Actor加载
*/
public static Props props(final WebSocketConnection connection, final String typeName, final String role, final ShardRegion.MessageExtractor extractor) {
return Props.create(WebSocketSession.class, () -> new WebSocketSession(connection, typeName, role, extractor));
}
/**
* Actor加载
*/
public static Props props(final WebSocketConnection connection, final String typeName, final ShardRegion.MessageExtractor extractor) {
return Props.create(WebSocketSession.class, () -> new WebSocketSession(connection, typeName, null, extractor));
}
/**
* 构造方法
*/
public WebSocketSession(final WebSocketConnection connection, final String typeName, final String role, final ShardRegion.MessageExtractor extractor) {
this.connection = connection;
// 创建分片集群代理
this.clusterActorRef = clusterSharding(this).startProxy(
typeName,
Optional.ofNullable(role), // 以远程集群什么角色参与, 代理端其实不需要怎么指定
extractor
);
logger.info("Set Cluster Session: {}", clusterActorRef.path());
// 创建集群加入监听回调
cluster.registerOnMemberUp(() -> {
Member member = cluster.selfMember();
logger.info("[Session] Join Cluster({}:{}) By {}", member.address(), member.status(), getSelf().path());
onEnabled(true);
});
// 创建集群移除监听回调
cluster.registerOnMemberRemoved(() -> {
Member member = cluster.selfMember();
logger.info("[Session] Leave Cluster({}:{}) By {}", member.address(), member.status(), getSelf().path());
onEnabled(false);
});
// 其他事件监听回调
cluster.subscribe(
getSelf(),
ClusterEvent.initialStateAsEvents(),
ClusterEvent.UnreachableMember.class,
ClusterEvent.ReachableMember.class
);
// 监控远程的集群代理
getContext().watch(clusterActorRef);
}
/**
* 退出方法
*/
@Override
public void postStop() {
// 如果没有关闭会话就帮助关闭
if (connection != null && connection.isOpen()) connection.closeAndAwait();
logger.info("[Session] Actor:{} Closed", getSelf().path());
// 解除远程的集群代理
getContext().unwatch(clusterActorRef);
// 取消消息订阅
cluster.unsubscribe(getSelf());
// 关闭目前的集群代理
if (clusterActorRef != null) {
logger.info("[Session] Cluster:{} Stopped", clusterActorRef.path());
context().stop(clusterActorRef);
}
}
@Override
public Receive createReceive() {
return receiveBuilder()
// 远程集群可用
.match(ClusterEvent.UnreachableMember.class, () -> enabled, (ClusterEvent.UnreachableMember event) -> {
logger.info("[Session] Cluster:{} Unreachable", event.member().address());
onEnabled(false);
})
// 远程集群不可用
.match(ClusterEvent.ReachableMember.class, () -> !enabled, (ClusterEvent.ReachableMember event) -> {
logger.info("[Session] Cluster:{} Reachable", event.member().address());
onEnabled(true);
})
.build();
}
/**
* 切换启用状态
*/
private void onEnabled(boolean enabled) {
this.enabled = enabled;
// 做些启动|禁用的处理
}
}
然后 WebSocket 网关配置下功能:
/**
* io.fortress.websocket.WebSocketBootstrap.java
* <p>
* 高并发WebSocket网关, @WebSocket 默认会先加载 quarkus.http.root-path 配置在附加:
* <pre>
* quarkus.http.root-path=xxx
* WebSocket(path = "yyy")
* 最后生成的请求路径会成为 /xxx/yyy
* </pre>
* 官方文档: <a href="https://cn.quarkus.io/guides/websockets-next-reference">websockets-next</a>
*/
@ApplicationScoped
@WebSocket(path = "/bootstrap")
@SuppressWarnings("unused")
public class WebSocketBootstrap {
@Inject
ActorSystem system;
/**
* Actor 节点
*/
final static Map<String, ActorRef> actors = new ConcurrentHashMap<>();
/**
* 连接回调, 目前先硬编码生成, 后面需要创建全局配置对象
*/
@OnOpen
public void onConnected(WebSocketConnection session) {
final String clusterName = "session";
final ActorMessageExtractor extractor = new ActorMessageExtractor(5, Collections.emptyList(), "session-");
// 生成子节点
final String sessionId = session.id();
final ActorRef sessionActor = system.actorOf(WebSocketSession.props(
session,
clusterName,
extractor
), sessionId);
actors.put(sessionId, sessionActor);
}
/**
* 会话断开
*/
@OnClose
public void onDisconnect(WebSocketConnection session, CloseReason reason) {
// 退出子节点
final ActorRef sessionActor = actors.remove(session.id());
if (sessionActor != null) system.stop(sessionActor);
}
}
这里运行下就可以看到启动之后动态创建 Actor 情况:
2025-09-09 12:21:19,414 INFO [org.apa.pek.act.ActorSystemImpl] (fortress-cluster-pekko.actor.default-dispatcher-5) Set Cluster Session: pekko://fortress-cluster/system/sharding/sessionProxy
2025-09-09 12:21:19,416 INFO [org.apa.pek.clu.sha.ShardRegion] (fortress-cluster-pekko.actor.default-dispatcher-20) session: Automatic entity passivation: idle entities after [2.000 min], checked every [1.000 min]
2025-09-09 12:21:23,326 INFO [org.apa.pek.clu.Cluster] (fortress-cluster-pekko.actor.default-dispatcher-29) Cluster Node [pekko://[email protected]:2550] - Received InitJoinAck message from [Actor[pekko://[email protected]:2551/system/cluster/core/daemon#-643395250]] to [pekko://[email protected]:2550]
2025-09-09 12:21:23,334 INFO [org.apa.pek.clu.Cluster] (fortress-cluster-pekko.actor.default-dispatcher-29) Cluster Node [pekko://[email protected]:2550] - Welcome from [pekko://[email protected]:2551]
2025-09-09 12:21:23,563 INFO [org.apa.pek.act.ActorSystemImpl] (fortress-cluster-pekko.actor.default-dispatcher-5) [Session] Join Cluster(pekko://[email protected]:2550:Joining) By pekko://fortress-cluster/user/fef83ca2-669b-42c2-a4f4-20fc15f1f1fd
2025-09-09 12:21:23,563 INFO [org.apa.pek.clu.sbr.SplitBrainResolver] (fortress-cluster-pekko.actor.default-dispatcher-5) This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
2025-09-09 12:21:24,320 INFO [org.apa.pek.clu.Cluster] (fortress-cluster-pekko.actor.default-dispatcher-5) Cluster Node [pekko://[email protected]:2550] - is the new leader among reachable nodes (more leaders may exist)
2025-09-09 12:21:33,875 INFO [org.apa.pek.act.ActorSystemImpl] (fortress-cluster-pekko.actor.default-dispatcher-5) [Session] Actor:pekko://fortress-cluster/user/fef83ca2-669b-42c2-a4f4-20fc15f1f1fd Closed
2025-09-09 12:21:33,876 INFO [org.apa.pek.act.ActorSystemImpl] (fortress-cluster-pekko.actor.default-dispatcher-5) [Session] Cluster:pekko://fortress-cluster/system/sharding/sessionProxy Stopped
注意: 连接远程节点是需要一定时间校验的, 所以
WebSocketSession.enabled字段就是确保这段时间内, 客户端请求返回统一正在连接的状态
可以看到目前本地会话已经确认动态创建 Actor 并且连接到远程集群节点, 但是目前只有 Open 和 Close 这两个事件,
这里就额外定义本地 Actor 事件来做传递处理:
/**
* io.fortress.websocket.event.ThrowableEvent.java
* <p>
* 异常事件
*
* @param sessionId
* @param timestamp
* @param connection
* @param exception
*/
public record ThrowableEvent(
String sessionId,
Long timestamp,
WebSocketConnection connection,
Exception exception
) {
}
/**
* io.fortress.websocket.event.BinaryMessageEvent.java
* <p>
* 二进制消息事件
*
* @param sessionId
* @param timestamp
* @param connection
* @param message
*/
public record BinaryMessageEvent(
String sessionId,
Long timestamp,
WebSocketConnection connection,
byte[] message
) {
}
/**
* io.fortress.websocket.event.TextMessageEvent.java
* <p>
* 文本消息事件
*
* @param sessionId
* @param timestamp
* @param connection
* @param message
*/
public record TextMessageEvent(
String sessionId,
Long timestamp,
WebSocketConnection connection,
String message
) {
}
现在事件拆分更加细致了, 拦截处理更加复杂的事件:
- 常规文本内容:
TextMessageEvent - 二进制流内容:
BinaryMessageEvent - 异常回调返回:
ThrowableEvent
这部分全部移交给 Actor 控制, WebSocket 服务只留个外壳做消息转发和关闭就行.
/**
* io.fortress.websocket.WebSocketBootstrap.java
* <p>
* 高并发WebSocket网关, @WebSocket 默认会先加载 quarkus.http.root-path 配置在附加:
* <pre>
* quarkus.http.root-path=xxx
* WebSocket(path = "yyy")
* 最后生成的请求路径会成为 /xxx/yyy
* </pre>
* 官方文档: <a href="https://cn.quarkus.io/guides/websockets-next-reference">websockets-next</a>
*/
@ApplicationScoped
@WebSocket(path = "/bootstrap")
@SuppressWarnings("unused")
public class WebSocketBootstrap {
/**
* Actor 节点
*/
final static Map<String, ActorRef> actors = new ConcurrentHashMap<>();
// 其他略
/**
* 会话异常
*/
@OnError
public void onError(WebSocketConnection session, Exception e) {
// 转发异常
final ActorRef sessionActor = actors.get(session.id());
if (sessionActor != null) {
sessionActor.tell(new ThrowableEvent(
session.id(),
System.currentTimeMillis(),
session,
e
), ActorRef.noSender());
}
}
/**
* 文本消息传递
*/
@OnTextMessage
public void onTextMessage(WebSocketConnection session, String message) {
// 转发文本消息
final ActorRef sessionActor = actors.get(session.id());
if (sessionActor != null) {
sessionActor.tell(new TextMessageEvent(
session.id(),
System.currentTimeMillis(),
session,
message
), ActorRef.noSender());
}
}
/**
* 二进制消息传递
*/
@OnBinaryMessage
public void onBinaryMessage(WebSocketConnection session, byte[] message) {
// 转发二进制消息
final ActorRef sessionActor = actors.get(session.id());
if (sessionActor != null) {
sessionActor.tell(new BinaryMessageEvent(
session.id(),
System.currentTimeMillis(),
session,
message
), ActorRef.noSender());
}
}
}
重申一下: 本地节点转发对象实体是不需要走序列化流程的, 所以直接构建数据对象移交给
Actor就行了
现在本地内部会话节点直接拦截具体的消息就可以测试转发给具体集群代理:
/**
* io.fortress.websocket.WebSocketSession.java
* <p>
* 动态 WebSocket 会话节点
*/
public class WebSocketSession extends AbstractActor implements ActorExtensions {
// 其他略
@Override
public Receive createReceive() {
return receiveBuilder()
// 远程集群可用
.match(ClusterEvent.UnreachableMember.class, () -> enabled, (ClusterEvent.UnreachableMember event) -> {
logger.info("[Session] Cluster:{} Unreachable", event.member().address());
onEnabled(false);
})
// 远程集群不可用
.match(ClusterEvent.ReachableMember.class, () -> !enabled, (ClusterEvent.ReachableMember event) -> {
logger.info("[Session] Cluster:{} Reachable", event.member().address());
onEnabled(true);
})
// 网关消息: 文本消息
.match(TextMessageEvent.class, (textMessageEvent -> {
logger.debug("[Session] TextMessageEvent: {}", textMessageEvent.message());
// 没有加入集群之前, 需要保证返回给客户端: 服务器正在连接
if (enabled) {
// todo: 集群暂未初始化完成
}
// todo: 转发给集群内部消息
//clusterActorRef.tell(...,getSelf());
}))
// 网关消息: 二进制消息
.match(BinaryMessageEvent.class, (BinaryMessageEvent) -> {
logger.debug("[Session] BinaryMessageEvent: {}", Arrays.toString(BinaryMessageEvent.message()));
// 没有加入集群之前, 需要保证返回给客户端: 服务器正在连接
if (enabled) {
// todo: 集群暂未初始化完成
}
// todo: 转发给集群内部消息
//clusterActorRef.tell(...,getSelf());
})
// 网关消息: 异常消息
.match(ThrowableEvent.class, (ThrowableEvent e) -> {
logger.error(e.exception(), "[Session] ThrowableEvent: {}]", e.exception().getMessage());
})
.build();
}
}
这里就是具体的消息转发流程, 测试时候就可以看到具体的打印内容:
2025-09-09 14:02:08,753 INFO [org.apa.pek.act.ActorSystemImpl] (fortress-cluster-pekko.actor.default-dispatcher-17) [Session] Join Cluster(pekko://[email protected]:2550:Joining) By pekko://fortress-cluster/user/079f85c1-fe26-49dd-8baa-078e3bf7c222
2025-09-09 14:02:08,754 INFO [org.apa.pek.clu.sbr.SplitBrainResolver] (fortress-cluster-pekko.actor.default-dispatcher-17) This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
2025-09-09 14:02:09,281 INFO [org.apa.pek.clu.Cluster] (fortress-cluster-pekko.actor.default-dispatcher-17) Cluster Node [pekko://[email protected]:2550] - is the new leader among reachable nodes (more leaders may exist)
2025-09-09 14:02:14,669 INFO [org.apa.pek.act.ActorSystemImpl] (fortress-cluster-pekko.actor.default-dispatcher-17) [Session] BinaryMessageEvent: [26, 80, 8, 1, 16, 100, 26, 36, 56, 101, 56, 97, 49, 100, 100, 97, 45, 102, 99, 100, 97, 45, 52, 57, 57, 56, 45, 97, 98, 101, 101, 45, 49, 52, 57, 54, 57, 98, 102, 98, 56, 52, 50, 56, 34, 36, 48, 57, 101, 97, 100, 56, 56, 101, 45, 52, 101, 51, 99, 45, 52, 51, 50, 52, 45, 57, 102, 53, 98, 45, 102, 56, 98, 57, 50, 48, 100, 101, 99, 54, 99, 54]
至此就完成网关层关于 Actor 和 WebSocket 结合的功能, 剩下就是比较细碎的知识点需要处理, 而且每个人的习惯有所不同代码思路也不一样.
之后差不多到这样就基本上可以构建出简单的集群服务端项目, 后面就是说明一些细致化功能区分逻辑方便去编写不同逻辑的服务端;
就像上面的实现会话管理器的功能, 其实除了给所有玩家会话都分配 Actor, 也有创建单个 Actor 然后 会话+事件 传递方式.
这种都是需要按照不同环境不同考虑来处理, 还有2025年游戏服务端的分区分服为什么还这么流行, 其实大部分都不是技术方面能够解决的.