网关挂载
这里实现个挂载 WebSocket 网关服务, 作为数据转发的前置功能, 这里的 pom.xml 配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- 父依赖 -->
<parent>
<groupId>io.fortress</groupId>
<artifactId>boot</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<!-- 子属性 -->
<modelVersion>4.0.0</modelVersion>
<artifactId>fortress-websocket</artifactId>
<name>Fortress WebSocket Module</name>
<description>Fortress WebSocket Core</description>
<packaging>jar</packaging>
<!-- 第三方依赖 -->
<dependencies>
<!-- 通用全局库 -->
<dependency>
<groupId>io.fortress</groupId>
<artifactId>fortress-common</artifactId>
</dependency>
<!-- Protobuf 核心依赖 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<!-- Pekko 集群配置 -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-cluster-sharding_${pekko.platform.scala-version}</artifactId>
</dependency>
<!-- WebSocket 核心依赖 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-websockets-next</artifactId>
</dependency>
</dependencies>
</project>
这个配置相对比较简单, 接下来就需要声明应用入口, 后面如果涉及到需要项目启动的都是按照以下定义入口功能:
package io.fortress.websocket;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.annotations.QuarkusMain;
/**
* io.fortress.websocket.WebSocketApplication.java
* <p>
* 项目启动入口
* 参考文档: <a href="https://cn.quarkus.io/guides/lifecycle">应用程序的初始化和终止</a>
*/
@QuarkusMain
public class WebSocketApplication {
/**
* 程序启动入口
*
* @param args 参数列表
*/
public static void main(String[] args) {
Quarkus.run(args);
}
}
这里声明个 WebSocketBootstrap 作为核心的挂载服务:
package io.fortress.websocket;
import com.google.protobuf.ByteString;
import io.fortress.common.protobuf.Events;
import io.quarkus.websockets.next.*;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
/**
* io.fortress.websocket.WebSocketBootstrap.java
* <p>
* 高并发WebSocket网关, @WebSocket 默认会先加载 quarkus.http.root-path 配置在附加:
* <pre>
* quarkus.http.root-path=/xxx
* WebSocket(path = "/yyy")
* 最后生成的请求路径会成为 /xxx/yyy
* </pre>
* 官方文档: <a href="https://cn.quarkus.io/guides/websockets-next-reference">websockets-next</a>
*/
@ApplicationScoped
@WebSocket(path = "/bootstrap")
@SuppressWarnings("unused")
public class WebSocketBootstrap {
/**
* 日志对象
*/
final static Logger logger = LoggerFactory.getLogger(WebSocketBootstrap.class);
/**
* 在线会话列表集合
*/
@Inject
OpenConnections sessions;
/**
* 获取IP地址
*/
private String getIpAddress(WebSocketConnection connection) {
HandshakeRequest request = connection.handshakeRequest();
// 检测 X-Forwarded-For 代理头
String forwardedFor = request.header("X-Forwarded-For");
if (forwardedFor != null && !forwardedFor.isBlank()) {
String[] ips = forwardedFor.split(",");
if (ips.length > 0) return ips[0].trim();
}
// 检测 X-Real-IP 代理头
String realIp = request.header("X-Real-IP");
if (realIp != null && !realIp.isBlank()) {
return realIp.trim();
}
return request.host(); // 未知IP, 返回hostIP
}
/**
* 连接回调
*/
@OnOpen
public void onConnected(WebSocketConnection session) {
// 打包成 Protobuf 消息
String sessionId = "10001"; // 假设玩家ID
String ipAddress = getIpAddress(session); // 获取IP地址
Events.C2SConnectedEvent event = Events.C2SConnectedEvent
.newBuilder()
.setSessionId(sessionId)
.setTimestamp(System.currentTimeMillis())
.setIp(ipAddress)
.build();
logger.info("[WebSocket] Connected By {}, IP: {}", sessionId, ipAddress);
if (logger.isDebugEnabled()) {
logger.debug("[WebSocket] Connected Session: {}", session);
logger.debug("[WebSocket] Connected Protobuf {}", event.toString().replace("\n", ", "));
logger.debug("[WebSocket] Connected Connections: {}", sessions.listAll().size());
}
}
/**
* 会话断开
*/
@OnClose
public void onDisconnect(WebSocketConnection session, CloseReason reason) {
// 打包成 Protobuf 消息
String sessionId = "10001"; // 假设玩家ID
Events.C2SDisconnectEvent event = Events.C2SDisconnectEvent
.newBuilder()
.setSessionId(sessionId)
.setTimestamp(System.currentTimeMillis())
.build();
logger.info("[WebSocket] Disconnect By {}, Status {}", sessionId, reason.getCode());
if (logger.isDebugEnabled()) {
logger.debug("[WebSocket] Disconnect Protobuf {}", event.toString().replace("\n", ", "));
logger.debug("[WebSocket] Disconnect Connections: {}", sessions.listAll().size());
}
}
/**
* 会话异常
*/
@OnError
public void onError(WebSocketConnection session, Exception e) {
String sessionId = "10001"; // 假设玩家ID
logger.error("[WebSocket] Error By {}", sessionId, e.getCause());
}
/**
* 文本消息传递
*/
@OnTextMessage
public void onTextMessage(WebSocketConnection session, String message) {
// 打包成 Protobuf 消息
String sessionId = "10001"; // 假设玩家ID
Events.C2SMessageEvent event = Events.C2SMessageEvent
.newBuilder()
.setSessionId(sessionId)
.setMessageId(1)
.setMessageData(ByteString.copyFromUtf8(message))
.build();
logger.debug("[WebSocket] TextMessage By {}, Message: {}", sessionId, message);
if (logger.isDebugEnabled()) {
logger.debug("[WebSocket] TextMessage Protobuf {}", event.toString().replace("\n", ", "));
}
}
/**
* 二进制消息传递
*/
@OnBinaryMessage
public void onBinaryMessage(WebSocketConnection session, byte[] message) {
// 打包成 Protobuf 消息
String sessionId = "10001"; // 假设玩家ID
int messageId = 1; // 假设请求的业务命令ID
Events.C2SMessageEvent event = Events.C2SMessageEvent
.newBuilder()
.setSessionId(sessionId)
.setMessageId(messageId)
.setMessageData(ByteString.copyFrom(message))
.build();
logger.debug("[WebSocket] BinaryMessage By {}, Message: {}", sessionId, Arrays.toString(message));
if (logger.isDebugEnabled()) {
logger.debug("[WebSocket] BinaryMessage Protobuf {}", event.toString().replace("\n", ", "));
}
// 推送集群消息
}
}
这里设定下项目 application.properties 配置, 先跑个服务确认没问题:
# WebSocket服务配置
quarkus.http.host=127.0.0.1
quarkus.http.port=8082
quarkus.http.root-path=/fortress
这里最终请求的访问地址为:
- HTTP:
ws://127.0.0.1:8082/fortress/bootstrap - HTTPS:
wss://127.0.0.1:8082/fortress/bootstrap
如果过程当中需要变动游戏相关路径可以直接修改 quarkus.http.root-path, 最终运行之后就能看到如下打印:
2025-09-04 14:46:56,712 INFO [io.for.web.WebSocketBootstrap] (vert.x-worker-thread-1) [WebSocket] Connected By 10001, IP: 127.0.0.1
2025-09-04 14:46:56,712 DEBUG [io.for.web.WebSocketBootstrap] (vert.x-worker-thread-1) [WebSocket] Connected Session: WebSocket connection [endpointId=io.fortress.websocket.WebSocketBootstrap, path=/fortress/bootstrap, id=a68f714f-7af1-40dc-8797-05a8537c3375]
2025-09-04 14:46:56,712 DEBUG [io.for.web.WebSocketBootstrap] (vert.x-worker-thread-1) [WebSocket] Connected Protobuf sessionId: "10001", timestamp: 1756968416712, ip: "127.0.0.1",
2025-09-04 14:46:56,713 DEBUG [io.for.web.WebSocketBootstrap] (vert.x-worker-thread-1) [WebSocket] Connected Connections: 1
2025-09-04 14:46:57,416 INFO [io.for.web.WebSocketBootstrap] (vert.x-worker-thread-1) [WebSocket] Disconnect By 10001, Status 1000
2025-09-04 14:46:57,416 DEBUG [io.for.web.WebSocketBootstrap] (vert.x-worker-thread-1) [WebSocket] Disconnect Protobuf sessionId: "10001", timestamp: 1756968417416,
2025-09-04 14:46:57,416 DEBUG [io.for.web.WebSocketBootstrap] (vert.x-worker-thread-1) [WebSocket] Disconnect Connections: 0
确认之前通用库封装的 Protbuf 事件能够正确生成, 之后就缺个全局的 ActorSystem 来准备加载,
首先还是扩展通用配置 ActorSystemConfiguration 接口:
package io.fortress.websocket.config;
import io.fortress.common.config.ActorSystemConfiguration;
import io.smallrye.config.ConfigMapping;
/**
* io.fortress.websocket.config.WebSocketActorSystemConfiguration.java
* <p>
* 扩展的通用 ActorSystem 配置
*/
@ConfigMapping(prefix = "fortress.actor")
public interface WebSocketActorSystemConfiguration extends ActorSystemConfiguration {
// 没有什么其他扩展的字段
}
之后就是注入全局可用 ActorSystem 唯一实例:
package io.fortress.websocket;
import io.fortress.websocket.config.WebSocketActorSystemConfiguration;
import io.quarkus.arc.DefaultBean;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import org.apache.pekko.actor.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* io.fortress.websocket.WebSocketActorSystem.java
* <p>
* 加载全局初始化 ActorSystem
*/
@ApplicationScoped
public class WebSocketActorSystem {
/**
* 日志对象
*/
final Logger logger = LoggerFactory.getLogger(WebSocketActorSystem.class);
/**
* 加载 ActorSystem 配置
*/
@Inject
WebSocketActorSystemConfiguration actorSystemConfiguration;
/**
* 生成全局唯一Actor系统
* DefaultBean 设置为全局默认 ActorSystem 的 Bean
* Startup 代表优先启动加载
* Produces 代表对象生成方法
* ApplicationScoped 代表启动为系统应用级别
*/
@Produces
@Startup
@DefaultBean
@ApplicationScoped
public ActorSystem createActorSystem() {
// 打印配置
logger.info("Creating ActorSystem: {}", actorSystemConfiguration.name());
actorSystemConfiguration.settings().forEach((key, value) -> logger.info("ActorSystem Setting: {} = {}", key, value));
// 生成 Actor System 并注册系统退出方法, 当 Actor 系统退出的时候要求关闭整个服务
return actorSystemConfiguration.createActorSystem((terminatedTry -> {
logger.info("Close ActorSystem, Status: {}", terminatedTry.isSuccess() ? "Success" : "Failure");
// 无论成功失败都关闭 Quarkus 系统
Quarkus.asyncExit();
return null;
}));
}
}
目前就是简单的生成 WebSocket 网关, 大部分功能还没实现, 需要等待集群服务编写完成启动的时候再将对应功能组件组合起来形成相关服务集群.