Organizations

  • 之前已经展示直接将 Actor System 提取成通用组件, 这部分作为通用模块是可以和 Quarkus 结合在一起, 这里就说明下怎么把 pekko-actor 作为组件封装成 quarkus-extension, 以下是必须条件: JDK17+: 后续版本更迭之后可能 JDK 版本要求更高 Apache Maven 3.9+: 用于调用 mvn 命令拉取官方功能 同时需要注意, Quarkus 其实还有两种具体模式: JVM: 比较常规的 JVM 启动方式, 调用运行的是经典的 JAR 应用程序(正常环境JRE启动) Native: 将 JAVA 程序打包成可执行的二进制功能, 类似于 Golang|Rust 一样直接平台编译运行, 采用 GraalVM 技术 Native 技术采用 GraalVm, 其实就是封装裁剪成小型虚拟机启动 因为以上模式存在, 所以扩展之中对于两者处理可能需要单独分开(有的第三方可能并不支持原生模式打包, 需要编写时单独额外处理等) Quarkus 扩展出来有以下部分, 这也是需要涉足到开发谋爱: runtime: 运行时模块, 作为扩展开发者向应用程序开发者提供的功能, 也就是编写封装自己的功能 deployment: 部署时模块, 用于构建扩展时候的打包功能, JVM 可以发布全局 Bean, Native 可以为 GraalVM 的原生编译做准备 也就是 runtime 就是编写构建暴露自己功能代码, 而 deployment 则是打包编译发布和全局服务挂载
    fortress Java Created Sun, 21 Sep 2025 14:50:52 +0800
  • Quarkus 注入集成 这也是比较好用的技巧, 之前搭建网关常常能够看到以下类型的对象注入: /** * WebSocket 网关 */ @ApplicationScoped @WebSocket(path = "/bootstrap") public class WebSocketBootstrap { /** * 捕获全局的 ActorSystem 注入 Bean 对象 */ @Inject ActorSystem system; } 这种对象注入方式很大程度节约对象传递的功能, 而其实就是利用反射原理给对象实例化传递对应 @Inject 注解的对象实例. 反射 也带来有少许性能损耗, 如果对性能要求比较偏激的情况, 后续可以仅当做了解 这时候就想这种自动注入机制能不能自动加载到 Actor 当中, 让 Props.create 生成出来的对应也支持这种特性. Quarkus 内部已经支持这种方式处理, 这里就是实现方法: /** * 这里需要全局注册实例, 该工厂类能够全局捕获 */ @ApplicationScoped public class QuarkusActorFactory { /** * Quarkus 的全局 Bean 对象实例句柄 */ @Inject Instance<Object> cdiInstance; /** * 通过反射将 cdiInstance 内部的全局实例化对象注入进 Actor 之中 */ void injectFields(Actor actor) throws IllegalAccessException { // 从 Actor 实例的实际类型开始(避免接口/父类类型导致的字段扫描不全) Class<?
    fortress Java Created Fri, 12 Sep 2025 19:56:34 +0800
  • 开发说明 之前部分已经大大小小涵盖作为 RPC 日常用到的知识点, 后面就需要基于自己项目工程经验来处理编写对应业务代码. 这里面设计的分歧点十分巨大, 以至于没有通用的处理方式; 所以只能说下为什么要这么设计的原因方便日常来处理, 其他需要去自己通过业务分析出处理方式 会话节点 关于网关会话需要每次创建 Actor 节点的功能, 其实这里面实现方法有很多种, 如下就是常规能够做到: 全局 ClusterProxy: 其实就是全局在加入集群的时候挂载 clusterSharding 代理地址, 所有会话共享该集群对象 动态 ClusterProxy: WebSocket 会话 Open 的时候依靠 ActorSystem 动态生成 clusterSharding 代理 动态 LocalActor 托管: 在本地动态生成 Actor, 消息和 WebSocket 都移交内部处理处理 按照 pekko 文档来说, actor 节点应该控制好数量避免生成数量过于庞大, 那么直接全局设置 clusterSharding 集群代理应该是最适合的吧? 其实恰恰相反, 动态 LocalActor 托管才是最安全高效的, 这里思考以下问题: 会话隔离性: 需要防止某个网关会话传输时候奔溃, 连锁全部网关的其他运行中的会话连带崩溃 - 奔溃扩散 传输效率性: 全局统一采用共享集群代理句柄, 是否意味着高并发的时候传输都是在同个通道上? - 单点规章 节点可控性: 网关会话是无限制增长吗? 按照网络请求来说, 实际上单个端口服务最大会话数量为 65536 - 会话可控 所以这里面都是十分值得深思的问题, 甚至有的问题需要在生产环境下才能看出异常.
    fortress Java Created Wed, 10 Sep 2025 20:36:33 +0800
  • 会话管理 目前已经调通数据传输的流程, 但是实际上还有不少细节要优化处理, 之前测试推送的时候是这样: // 测试发送个数据包连通 Events.C2SConnectedEvent event = Events.C2SConnectedEvent .newBuilder() .setSessionId("10001") .setTimestamp(System.currentTimeMillis()) .setIp("127.0.0.1") .build(); sessionRef.tell(event, ActorRef.noSender()); 这里就需要把 WebSocket 和 Actor 功能桥接起来, 并且当 actor 集群节点中断的时候, 就要让目前 websocket 推送的所有消息返回错误消息给客户端说明 服务异常不可用 的状态. 也就是要把 WebSocket 托管给 Actor 节点管理(注意: 这里不需要创建复杂集群节点, 只需要本地节点即可) 本地 Actor 节点交换数据的时候不会走序列化流程, 可以直接传递对象实例然后集中转发集群代理对象 这里需要的就是每次 onOpen 的操作的时候动态创建 Actor 会话获取 ActorRef, 每次消息和异常都转发给内部处理, 如果触发 onClose 操作就通知内部 Actor 清除该 Actor. 改进网关 WebSocket 现在功能需要更精简, 只负责当作数据转发的 工具人, 内部不做任何操作, 这里先重置下功能回归成原始样式: package io.fortress.websocket; import io.quarkus.websockets.next.*; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.
    fortress Java Created Tue, 09 Sep 2025 20:51:30 +0800
  • 补充扩展 这章节主要说明后续可能用到扩展工具类方便后续使用, 这些组件基本都放置在 common 子模块之中 雪花ID生成 生产工具类 SnowflakeIdGenerator.java 工具: package io.fortress.common.utils; import java.util.concurrent.atomic.AtomicLong; /** * io.fortress.common.utils.SnowflakeIdGenerator * <p> * 雪花ID生成器的数值总长度为64位, 雪花ID结构: * <p> * 0(1位,首位)- 时间戳(41位)- 数据中心ID(5位)- 工作节点ID(5位)- 序列号(12位) */ @SuppressWarnings("unused") public class SnowflakeIdGenerator { /** * 开始时间戳 (1993-02-09 00:00:00) */ private static final long START_TIMESTAMP = 729187200000L; /** * 数据中心ID所占的位数 */ private static final long DATA_CENTER_ID_BITS = 5L; /** * 工作节点ID所占的位数 */ private static final long WORKER_ID_BITS = 5L; /** * 序列号所占的位数 */ private static final long SEQUENCE_BITS = 12L; /** * 数据中心ID的最大值 */ private static final long MAX_DATA_CENTER_ID = ~(-1L << DATA_CENTER_ID_BITS); /** * 工作节点ID的最大值 */ private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS); /** * 序列号的最大值 */ private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS); /** * 工作节点ID左移位数 */ private static final long WORKER_ID_SHIFT = SEQUENCE_BITS; /** * 数据中心ID左移位数 */ private static final long DATA_CENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS; /** * 时间戳左移位数 */ private static final long TIMESTAMP_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATA_CENTER_ID_BITS; /** * 数据中心ID */ private final long dataCenterId; /** * 工作节点ID */ private final long workerId; /** * 序列号 */ private final AtomicLong sequence = new AtomicLong(0L); /** * 上一次生成ID的时间戳 */ private volatile long lastTimestamp = -1L; /** * 构造函数 * * @param dataCenterId 数据中心ID (0 ~ 31) * @param workerId 工作节点ID (0 ~ 31) */ public SnowflakeIdGenerator(long dataCenterId, long workerId) { if (dataCenterId > MAX_DATA_CENTER_ID || dataCenterId < 0) { throw new IllegalArgumentException(String.
    fortress Java Created Sun, 07 Sep 2025 12:33:36 +0800
  • 架构思考 目前已经确定集群的消息传递正常, 但是在这个时刻就要先停下来思考下整体项目架构. 这边我是按照游戏服务端设计出发, 所以整体架构也是从这方面处理 首先就是请求链路问题, 按照网络访问链路一般如下: 客户端发起对网关请求, 对外网关采用高防网关服务器 网关接收到客户端请求, 通过集群转发给指定集群分片 集群分片动态创建会话, 也就是在某个集群节点下动态挂载专属 Actor 动态挂载的节点需要去数据库加载玩家信息 玩家游玩的功能大部分都是预先封装好的策划玩法 绝大部分情况下, 游戏内部的玩法都是自己针对自己功能逻辑调用 多人游戏服务则是重新设计个 MessageExtractor 用地图|区域ID做标识, 内部节点对加入玩家列表的玩家广播数据 在加入节点的时候有个定时器负责将查询出来的数据实体保存到数据库 玩家主动退出登录的时候也要把数据实体保存到数据库之中 游戏服务端大致流程就是这样, 其实也就是比较传统的 RPC(Remote Procedure Call) 业务, 但是有些细节方面需要注意: IP验证由谁处理? 网关端|集群端 → 网关 心跳包由谁维护? 网关端|集群端 → 网关 权限验证由谁处理? 网关端|集群端 → 网关验证授权, 按照统一 secure 生成授权码给集群验证挂载登录 客户端提交的二进制消息解析由谁处理? 网关端|集群端 → 网关接收消息, 需要解析出消息ID和二进制包装序列化提交给网关 主要作为 WebSocket 网关, 很多底层脏活累活都帮你处理好了, 所以不会像 TCP|UDP 要求那么高 心跳包实现 这里心跳包的话比较简单, 直接在 websocket 项目加个定时器就行: /** * io.fortress.websocket.WebSocketBootstrap.java * <p> * 高并发WebSocket网关, @WebSocket 默认会先加载 quarkus.
    fortress Java Created Sun, 07 Sep 2025 00:03:41 +0800
  • 消息对接 现在已经搭建好集群分片和网关服务, 可以尝试将他们的功能对接起来方便后续的业务开发. Actor System 连接 Cluster Sharding 方式其实也十分简单, 直接启动的时候挂载代理就行了. 注意以下代码哪怕运行之后还是会报错, 还有很多细致化配置参数需要处理 这里是 websocket 项目的全局 ActorSystem 对象生成 Bean: package io.fortress.websocket; import io.fortress.common.actor.ActorMessageExtractor; import io.fortress.common.protobuf.Events; import io.fortress.websocket.config.WebSocketActorSystemConfiguration; import io.quarkus.arc.DefaultBean; import io.quarkus.runtime.Quarkus; import io.quarkus.runtime.Startup; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Produces; import jakarta.inject.Inject; import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.cluster.Cluster; import org.apache.pekko.cluster.sharding.ClusterSharding; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Optional; /** * io.fortress.websocket.WebSocketActorSystem.java * <p> * 加载全局初始化 ActorSystem */ @ApplicationScoped public class WebSocketActorSystem { /** * 日志对象 */ final Logger logger = LoggerFactory.
    fortress Java Created Fri, 05 Sep 2025 20:53:38 +0800
  • 集群实现 这篇章开始设计简单的集群功能, 方便后续的业务开发. 在开始设计集群先说明下 common 项目之前考虑的点: common 通用包里面要不要加入 pekko-cluster-sharding 依赖? 引入之后就 pekko 强绑定了 不引入没有办法设计通用的 消息提取器(MessageExtractor). 而由于之前切换成 Protobuf 序列化功能, 所以内部消息定义可能需要额外做处理, 最终考虑再三之后将 pekko-cluster-sharding 移交到 common 做通用的第三方依赖. 这里在通用 common 设计个取模消息提取器方便后续做分片计算: package io.fortress.common.actor; import io.fortress.common.protobuf.Events; import org.apache.commons.codec.digest.MurmurHash3; import org.apache.pekko.cluster.sharding.ShardRegion; import java.nio.charset.StandardCharsets; import java.util.function.Function; /** * io.fortress.common.actor.ModuloMessageExtractor.java * <p> * 取模消息提取器 */ @SuppressWarnings("unused") public class ModuloMessageExtractor implements ShardRegion.MessageExtractor { /** * 分片总数 */ protected final int numberOfShards; /** * MurMur3哈希Seed定义, 默认为0 */ protected final int hashSeed; /** * 生成分片ID回调 */ protected final Function<Integer, String> createShardId; /** * 构造方法 */ public ModuloMessageExtractor(int numberOfShards, Function<Integer, String> createShardId, int hashSeed) { this.
    fortress Java Created Sat, 30 Aug 2025 14:36:03 +0800
  • 集群功能 这里需要扩展下集群的成员与事件状态监听, 主要是要了解成员状态变化, 只有了解内部的周期变化才能知道动态关闭加入之后应该怎么去重新参与到集群事物之中. 首先需要 application.properties 比较关键的参数: ## ============================================================================== ## Actor 集群配置 # 默认加载 pekko-cluster_2.13-1.1.5.jar!\reference.conf 内部配置 ## ============================================================================== fortress.actor.name=fortress-cluster ## ============================================================================== ## 基本 Actor 配置 # pekko.actor.provider 就是 actor 启动的服务类型, 共有以下类型: # - local: 仅在单个 JVM 进程内运行的 Actor 系统, 不支持集群功能 # - cluster: 启用 Pekko Cluster 功能, 支持跨多个节点的分布式 Actor 系统 # # 如果启动集群服务, 那么就需要启用 pekko.remote.artery.enabled=on # 通过 artery.canonical.hostname 和 artery.canonical.port 创建节点参与入口 # 也就是其他远程节点都是通过这入口来参与数据传递 ## ============================================================================== fortress.actor.settings.pekko.actor.provider=cluster fortress.actor.settings.pekko.remote.artery.enabled=on fortress.actor.settings.pekko.remote.artery.canonical.hostname=127.0.0.1 fortress.actor.settings.pekko.remote.artery.canonical.port=2551 ## ============================================================================== ## 集群节点目前的角色身份 # 这里是针对节点细分化处理, 支持从集群当中挑选只某些角色的主机做指定业务 # 比如有台高配置的主机将其角色声明为 fortress-performance, 这台主机专门负责大量负载计算 # 那么可以网关之类服务只发给集群下面指定主机的运算, 而不是从所有主机里面提取一台做运算 # 这种角色分配的作用很大: # 比如游戏大地图设计的时候, 就需要把复杂运算方面提交给高性能主机 # 其他用户节点服务可以在相对性能不是那么高的集群节点中运算 ## ============================================================================== fortress.
    fortress Java Created Sat, 30 Aug 2025 13:21:16 +0800
  • 网关挂载 这里实现个挂载 WebSocket 网关服务, 作为数据转发的前置功能, 这里的 pom.xml 配置如下: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <!-- 父依赖 --> <parent> <groupId>io.fortress</groupId> <artifactId>boot</artifactId> <version>1.0-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent> <!-- 子属性 --> <modelVersion>4.0.0</modelVersion> <artifactId>fortress-websocket</artifactId> <name>Fortress WebSocket Module</name> <description>Fortress WebSocket Core</description> <packaging>jar</packaging> <!-- 第三方依赖 --> <dependencies> <!-- 通用全局库 --> <dependency> <groupId>io.fortress</groupId> <artifactId>fortress-common</artifactId> </dependency> <!-- Protobuf 核心依赖 --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> </dependency> <!-- Pekko 集群配置 --> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-cluster-sharding_${pekko.platform.scala-version}</artifactId> </dependency> <!-- WebSocket 核心依赖 --> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-websockets-next</artifactId> </dependency> </dependencies> </project> 这个配置相对比较简单, 接下来就需要声明应用入口, 后面如果涉及到需要项目启动的都是按照以下定义入口功能:
    fortress Java Created Fri, 29 Aug 2025 20:14:29 +0800