架构思考
目前已经确定集群的消息传递正常, 但是在这个时刻就要先停下来思考下整体项目架构.
这边我是按照游戏服务端设计出发, 所以整体架构也是从这方面处理
首先就是请求链路问题, 按照网络访问链路一般如下:
- 客户端发起对网关请求, 对外网关采用高防网关服务器
- 网关接收到客户端请求, 通过集群转发给指定集群分片
- 集群分片动态创建会话, 也就是在某个集群节点下动态挂载专属 Actor
- 动态挂载的节点需要去数据库加载玩家信息
- 玩家游玩的功能大部分都是预先封装好的策划玩法
- 绝大部分情况下, 游戏内部的玩法都是自己针对自己功能逻辑调用
- 多人游戏服务则是重新设计个
MessageExtractor用地图|区域ID做标识, 内部节点对加入玩家列表的玩家广播数据 - 在加入节点的时候有个定时器负责将查询出来的数据实体保存到数据库
- 玩家主动退出登录的时候也要把数据实体保存到数据库之中
游戏服务端大致流程就是这样, 其实也就是比较传统的 RPC(Remote Procedure Call) 业务, 但是有些细节方面需要注意:
- IP验证由谁处理? 网关端|集群端 → 网关
- 心跳包由谁维护? 网关端|集群端 → 网关
- 权限验证由谁处理? 网关端|集群端 → 网关验证授权, 按照统一 secure 生成授权码给集群验证挂载登录
- 客户端提交的二进制消息解析由谁处理? 网关端|集群端 → 网关接收消息, 需要解析出消息ID和二进制包装序列化提交给网关
主要作为
WebSocket网关, 很多底层脏活累活都帮你处理好了, 所以不会像 TCP|UDP 要求那么高
心跳包实现
这里心跳包的话比较简单, 直接在 websocket 项目加个定时器就行:
/**
* io.fortress.websocket.WebSocketBootstrap.java
* <p>
* 高并发WebSocket网关, @WebSocket 默认会先加载 quarkus.http.root-path 配置在附加:
* <pre>
* quarkus.http.root-path=xxx
* WebSocket(path = "yyy")
* 最后生成的请求路径会成为 /xxx/yyy
* </pre>
* 官方文档: <a href="https://cn.quarkus.io/guides/websockets-next-reference">websockets-next</a>
*/
@ApplicationScoped
@WebSocket(path = "/bootstrap")
@SuppressWarnings("unused")
public class WebSocketBootstrap {
/**
* 心跳包定时器任务池
*/
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
/**
* 心跳包任务
*/
final Map<String, ScheduledFuture<?>> heartbeats = new ConcurrentHashMap<>();
/**
* 连接回调
*/
@OnOpen
public void onConnected(WebSocketConnection session) {
// 其他略
// 加入心跳包任务
heartbeats.put(session.id(), scheduler.scheduleAtFixedRate(
() -> this.onHeartbeat(session.id(), session), 0,
HEARTBEAT_INTERVAL_SECONDS,
TimeUnit.SECONDS
));
}
/**
* 心跳包任务
*/
public void onHeartbeat(String id, WebSocketConnection connection) {
// 连接已关闭
if (connection.isClosed()) {
ScheduledFuture<?> future = heartbeats.remove(id);
if (future != null) {
future.cancel(true);
}
return;
}
// 心跳推送
try {
connection.sendPongAndAwait(Buffer.buffer("Heartbeat"));
logger.debug("[WebSocket] Heartbeat Session: {}", id);
} catch (Exception e) {
logger.error("[WebSocket] Heartbeat Closed", e.getCause());
connection.closeAndAwait(CloseReason.INTERNAL_SERVER_ERROR);
}
}
/**
* 会话断开
*/
@OnClose
public void onDisconnect(WebSocketConnection session, CloseReason reason) {
// 其他略
// 关闭心跳任务
ScheduledFuture<?> future = heartbeats.remove(session.id());
if (future != null) future.cancel(true);
}
/**
* 会话异常
*/
@OnError
public void onError(WebSocketConnection session, Exception e) {
// 其他略
// 关闭心跳任务
ScheduledFuture<?> future = heartbeats.remove(session.id());
if (future != null) future.cancel(true);
}
}
这里采用 WebSocket 的本身 PING|PONG 服务, 他会发送特殊消息包而不会响应具体内容, 可以避免干扰响应具体内容.
会话异常的时候最好也移除下心跳任务, 有的程序异常但是不走 OnClose 也从而让事件的列表没办法释放定时器任务
这里定时器池默认为 1, 可以按照日常要求配置, 但是这部分最好暴露给外部配置方便在外部配置文件对内部做自定义处理.
这部分先简单处理, 后面会专门定制全局对象容器注入处理, 当然最好是自己试试去编写出来.
权限验证
这里是另外的重点, 之前能够看到 @OnOpen/@OnClose 等都默认只做简单信息推送:
String sessionId = "10001"; // 假设玩家ID
Events.C2SDisconnectEvent event = Events.C2SDisconnectEvent
.newBuilder()
.setSessionId(sessionId)
.setTimestamp(System.currentTimeMillis())
.build();
sessionId 其实就是默认集群分片的标识对象, 但是怎么获取到这个授权标识就是我们需要考虑的.
对于 WebSocket网关 可以按照以下方法处理授权方面问题:
- 从
header获取authorization字段去提取验证授权, 进而加载出完整的玩家数据库实体对象 - 默认连接的时候没有任何状态, 只有
@OnMessage传递protobuf登录请求, 提取参数字段从数据库加载实体
其实比较推荐第二种方法, 虽然多引入 protobuf, 但是流程方面稳定且后续切换 TCP 也方便.
注: 网关的 Protobuf 不是对外共享的, 所以只需要在项目内处理
那么 websocket 网关也需要引入 protobuf 模块:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- 父依赖 -->
<parent>
<groupId>io.fortress</groupId>
<artifactId>boot</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<!-- 子属性 -->
<modelVersion>4.0.0</modelVersion>
<artifactId>fortress-websocket</artifactId>
<name>Fortress WebSocket Module</name>
<description>Fortress WebSocket Core</description>
<packaging>jar</packaging>
<!-- 第三方依赖 -->
<dependencies>
<!-- 通用全局库 -->
<dependency>
<groupId>io.fortress</groupId>
<artifactId>fortress-common</artifactId>
</dependency>
<!-- Protobuf 核心依赖 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<!-- Pekko 集群配置 -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-cluster-sharding_${pekko.platform.scala-version}</artifactId>
</dependency>
<!-- WebSocket 核心依赖 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-websockets-next</artifactId>
</dependency>
<!-- Quarkus 测试依赖 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<!-- 扩展打包编译配置 -->
<build>
<plugins>
<!-- Protobuf 打包插件 -->
<!-- https://www.xolstice.org/protobuf-maven-plugin/compile-mojo.html -->
<plugin>
<groupId>${protobuf.plugin.group-id}</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf.plugin.version}</version>
<extensions>true</extensions>
<configuration>
<!-- Protobuf编译器版本 -->
<protocArtifact>
${protobuf.compiler.group-id}:${protobuf.compiler.artifact-id}:${protobuf.compiler.version}:exe:${os.detected.classifier}
</protocArtifact>
<!-- *.proto 源文件路径, 直接默认即可, 系统事件不需要其他共享 -->
<!-- <protoSourceRoot>${project.parent.basedir}/proto</protoSourceRoot>-->
<!-- 生成Java文件, 直接默认就行, IDEA会自动识别到这部分代码 -->
<!--<outputDirectory>${project.basedir}/src/main/java</outputDirectory>-->
<!-- 是否生成之前清空目录, 最好不要随便乱动 -->
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<!-- 执行时机 -->
<executions>
<!-- 执行mvn compile的时候打包生成 Java 文件 -->
<execution>
<id>compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<!-- boot 调用 generate-code 的时候打包生成 Java 文件 -->
<execution>
<id>generate-resources</id>
<phase>generate-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
之后在 src/main/proto 目录创建 authority.proto 文件作为指令集合:
syntax = "proto3";
option java_package = "io.fortress.websocket.protobuf";
// 登录请求
// 登录授权是走另外 Web 网关, 流程如下
// 1. 也就是需要在其他 Web 服务请求登录授权(涉及第三方授权等)
// 2. Web 服务验证成功返回服务器列表(对应连接服务 hostname:port)和 UID, Token, 这里有几种主流处理方法:
// - 2-1. Redis|MySQL 记录会话标识, 服务端后续去检查是否存在代表授权成功
// - 2-2. 加入 Sign 字段, Web和服务端约定获取签名的 secure, 按照 KEY 正序排列成 K1=V1&K2=V2 最后附加 secure 哈希成MD5验证
// - 上面两种方法性能最好的是 Sign 验证, 因为无需在走网络协议确认数据库那些, 但是安全性最好的是数据库入库记录, 按照个人习惯去选择就行了
// 3. 客户端拿到 sid(服务器ID), uid(用户ID), token(登录凭证), sign(参数哈希)
message C2SLoginEvent{
int32 sid = 1;
int64 uid = 2;
string token = 3;
string sign = 4;
}
// 登录响应, status=0 代表登录成功
// 其他都为失败的错误, 客户端捕获到的时候确认错误码对应UI展示就行
// sid: 服务器ID - ServerId
// uid: 用户ID - UserId
// rid: 角色ID - RoleId
//
// uid 和 rid 区别:
// 比如登录一个账号(uid)之后, 是可以由选服功能的
// 选服之后的其实需要在作为游戏角色的唯一标识必须是 uid+sid 做唯一标识(也就是用户A在服务器B的角色)
// 假设账号登录用户为 A, 那么他在 微信-1服 里面的记为 A1, 他在 QQ-2服 里面记为 A2, 以此类推
// 另外为什么玩家要分服这部分后面会说明, 可以说在 2025年还要这样设计不是因为技术问题
//
// 角色ID的类型选择一直感觉挺麻烦, 因为没办法预知玩家最后的数据定义是 UUID, UINT64, 雪花ID等
// UUID 虽然字符串从而导致无序性, 性能方面有所降低, 但是对于服务端来说都是直接拿到标识加载出来之后不参与任何业务查询, 碰撞几率很小且方便后期做合服处理
// 后面为了弥补 UUID 性能问题推出雪花ID来, 采用 64bit 的 long 做递增处理, 让主键标识有序化, 方便数据库的数据页优化处理
// 这里推荐 uid 采用雪花ID生成玩家角色ID, 在性能和碰撞上面都还可以
message S2CLoginEvent{
int32 status = 1;
int32 sid = 2;
int64 uid = 3;
int64 rid = 4;
}
需要编译下项目之后才会成 Java 功能类, 这边编写测试代码(这里记得把二进制转为 Base64, 后面客户端测试可以用到):
package io.fortress.websocket;
import com.google.protobuf.ByteString;
import io.fortress.common.protobuf.Events;
import io.fortress.websocket.protobuf.Authority;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Base64;
import java.util.UUID;
/**
* io.fortress.websocket.ProtobufTests.java
* <p>
* Protobuf格式测试
*/
@QuarkusTest
public class ProtobufTests {
/**
* 日志句柄
*/
final Logger logger = LoggerFactory.getLogger(ProtobufTests.class);
/**
* 登录测试
*/
@Test
public void c2sLogin() {
Authority.C2SLoginEvent event = Authority
.C2SLoginEvent
.newBuilder()
.setSid(1)
.setUid(100)
.setToken(UUID.randomUUID().toString())
.setSign(UUID.randomUUID().toString())
.build();
Assertions.assertNotNull(event);
// 将他包装成请求事件
Events.C2SMessageEvent c2SMessageEvent = Events
.C2SMessageEvent
.newBuilder()
.setMessageId(0) // 消息ID设为 0
.setSessionId("") // 会话也不需要
.setMessageData(ByteString.copyFrom(event.toByteArray())) // 把对应消息包装起来
.build();
Assertions.assertNotNull(c2SMessageEvent);
// 这里尝试转化为 Base64, 后面可以直接丢到其他客户端做连接测试
byte[] eventBytes = c2SMessageEvent.toByteArray();
// 这里可以丢给常用 Postman 客户端用 Base64 格式传输数据
Base64.Encoder encoder = Base64.getEncoder();
logger.info("C2SLoginEvent = {}", encoder.encodeToString(eventBytes));
}
}
这里最后打印内容如下:
2025-09-05 16:09:29,956 INFO [io.for.web.ProtobufTests] (main) C2SLoginEvent = GlAIARBkGiQ4ZThhMWRkYS1mY2RhLTQ5OTgtYWJlZS0xNDk2OWJmYjg0MjgiJDA5ZWFkODhlLTRlM2MtNDMyNC05ZjViLWY4YjkyMGRlYzZjNg==
# 这里主要还是编码出来内容, 方便后面客户端调试使用
GlAIARBkGiQ4ZThhMWRkYS1mY2RhLTQ5OTgtYWJlZS0xNDk2OWJmYjg0MjgiJDA5ZWFkODhlLTRlM2MtNDMyNC05ZjViLWY4YjkyMGRlYzZjNg==
现在就回过头来看, 客户端发起的几个事件调用时机的情况就不一样:
C2SConnectedEvent: 授权完成之后首个事件, 必须在C2SLoginEvent验证之后推送集群C2SMessageEvent: 没授权的时候判断是否内容是C2SLoginEvent, 不是就跳过处理, 是的话获取角色ID转发集群
这里重新设计网关集群功能:
/**
* io.fortress.websocket.WebSocketBootstrap.java
* <p>
* 高并发WebSocket网关, @WebSocket 默认会先加载 quarkus.http.root-path 配置在附加:
* <pre>
* quarkus.http.root-path=xxx
* WebSocket(path = "yyy")
* 最后生成的请求路径会成为 /xxx/yyy
* </pre>
* 官方文档: <a href="https://cn.quarkus.io/guides/websockets-next-reference">websockets-next</a>
*/
@ApplicationScoped
@WebSocket(path = "/bootstrap")
@SuppressWarnings("unused")
public class WebSocketBootstrap {
/**
* 日志对象
*/
final static Logger logger = LoggerFactory.getLogger(WebSocketBootstrap.class);
/**
* 在线会话列表集合
*/
@Inject
OpenConnections sessions;
/**
* 心跳包定时器
*/
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
/**
* 心跳包任务
*/
final Map<String, ScheduledFuture<?>> heartbeats = new ConcurrentHashMap<>();
/**
* 完成授权的会话
*/
final Map<String, String> authorities = new ConcurrentHashMap<>();
/**
* 心跳间隔 30s, 日常足够用了
*/
private static final int HEARTBEAT_INTERVAL_SECONDS = 30;
/**
* 获取IP地址
*/
private String getIpAddress(WebSocketConnection connection) {
HandshakeRequest request = connection.handshakeRequest();
// 检测 X-Forwarded-For 代理头
String forwardedFor = request.header("X-Forwarded-For");
if (forwardedFor != null && !forwardedFor.isBlank()) {
String[] ips = forwardedFor.split(",");
if (ips.length > 0) return ips[0].trim();
}
// 检测 X-Real-IP 代理头
String realIp = request.header("X-Real-IP");
if (realIp != null && !realIp.isBlank()) {
return realIp.trim();
}
return request.host(); // 未知IP, 返回hostIP
}
/**
* 连接回调, 现在连接会话只需要创建心跳包任务就行了
*/
@OnOpen
public void onConnected(WebSocketConnection session) {
// 加入心跳包任务
heartbeats.put(session.id(), scheduler.scheduleAtFixedRate(
() -> this.onHeartbeat(session.id(), session), 0,
HEARTBEAT_INTERVAL_SECONDS,
TimeUnit.SECONDS
));
}
/**
* 心跳包任务
*/
public void onHeartbeat(String id, WebSocketConnection connection) {
// 连接已关闭
if (connection.isClosed()) {
ScheduledFuture<?> future = heartbeats.remove(id);
if (future != null) {
future.cancel(true);
}
return;
}
// 心跳推送
try {
connection.sendPongAndAwait(Buffer.buffer("Heartbeat"));
logger.debug("[WebSocket] Heartbeat Session: {}", id);
} catch (Exception e) {
logger.error("[WebSocket] Heartbeat Closed", e.getCause());
connection.closeAndAwait(CloseReason.INTERNAL_SERVER_ERROR);
}
}
/**
* 会话断开
*/
@OnClose
public void onDisconnect(WebSocketConnection session, CloseReason reason) {
// 打包成 Protobuf 消息
String sessionId = "10001"; // 假设玩家ID
Events.C2SDisconnectEvent event = Events.C2SDisconnectEvent
.newBuilder()
.setSessionId(sessionId)
.setTimestamp(System.currentTimeMillis())
.build();
logger.info("[WebSocket] Disconnect By {}, Status {}", sessionId, reason.getCode());
if (logger.isDebugEnabled()) {
logger.debug("[WebSocket] Disconnect Protobuf {}", event.toString().replace("\n", ", "));
logger.debug("[WebSocket] Disconnect Connections: {}", sessions.listAll().size());
}
// 关闭心跳任务
ScheduledFuture<?> future = heartbeats.remove(session.id());
if (future != null) future.cancel(true);
// 清空授权
authorities.remove(session.id());
}
/**
* 会话异常
*/
@OnError
public void onError(WebSocketConnection session, Exception e) {
String sessionId = "10001"; // 假设玩家ID
logger.error("[WebSocket] Error By {}", sessionId, e.getCause());
// 关闭心跳任务
ScheduledFuture<?> future = heartbeats.remove(session.id());
if (future != null) future.cancel(true);
// 清空授权
authorities.remove(session.id());
}
/**
* 文本消息传递
*/
@OnTextMessage
public void onTextMessage(WebSocketConnection session, String message) {
onMessage(session, message.getBytes(StandardCharsets.UTF_8));
}
/**
* 二进制消息传递
*/
@OnBinaryMessage
public void onBinaryMessage(WebSocketConnection session, byte[] message) {
onMessage(session, message);
}
/**
* 消息处理
*/
public void onMessage(WebSocketConnection session, byte[] message) {
// 确认事件格式正确
try {
Events.C2SMessageEvent event = Events.C2SMessageEvent.parseFrom(message);
String sessionId = authorities.get(session.id());
if (sessionId == null) {
onAuthorization(session, event);
} else {
onForwardCluster(session, event, sessionId);
}
} catch (InvalidProtocolBufferException e) {
logger.debug(e.getMessage(), e.getCause());
}
}
/**
* 授权验证
*/
private void onAuthorization(WebSocketConnection session, Events.C2SMessageEvent event) throws InvalidProtocolBufferException {
// 这里需要确认提交事件标识正确, 不是正确标识就不去处理
if (event.getMessageId() != 0 || event.getMessageData().size() <= 0) return;
// 验证是否为授权指令
Authority.C2SLoginEvent c2SLoginEvent = Authority.C2SLoginEvent.parseFrom(event.getMessageData());
logger.debug("[WebSocket] Authorization Session: {}", c2SLoginEvent);
// todo: 数据库验证授权
long rid = 100000001L; // 假设获取到角色ID
String sessionId = String.valueOf(rid);
// 写入在线
authorities.put(session.id(), sessionId);
// 发给集群目前会话在线
Events.C2SConnectedEvent c2SConnectedEvent = Events
.C2SConnectedEvent
.newBuilder()
.setSessionId(sessionId)
.setTimestamp(System.currentTimeMillis())
.setIp(getIpAddress(session))
.build();
logger.debug("[WebSocket] Connected Session: {}", c2SConnectedEvent);
// todo: 发送到集群内部
// 登录完成直接响应代表已经登录成功
Authority.S2CLoginEvent s2CLoginEvent = Authority
.S2CLoginEvent
.newBuilder()
.setStatus(0)
.setSid(c2SLoginEvent.getSid())
.setUid(c2SLoginEvent.getUid())
.setRid(rid)
.build();
// 打包响应给客户端数据
Events.S2CMessageEvent s2CMessageEvent = Events
.S2CMessageEvent
.newBuilder()
.setSessionId(sessionId)
.setTimestamp(System.currentTimeMillis())
.setMessageId(0) // 这里消息可以按需调整
.setMessageData(s2CLoginEvent.toByteString())
.build();
if (logger.isDebugEnabled()) {
logger.debug("[WebSocket] S2CMessageEvent: {}", Arrays.toString(s2CMessageEvent.toByteArray()));
}
// 返回给客户端, 客户端准备切换场景和加载玩家信息
session.sendBinaryAndAwait(s2CMessageEvent.toByteArray());
}
/**
* 转发消息
*/
private void onForwardCluster(WebSocketConnection session, Events.C2SMessageEvent event, String sessionId) {
// 这里直接验证转发到网关就行了
}
}
这里就是大概思路, 注意授权请求 C2SLoginEvent 的 messageId 并不是必须要0, 可以按照习惯业务最后匹配确认是正式消息结构标识值.
最后 Postman 的测试功能如下: