MeteorCat / Fortress 5 - 集群功能

Created Sat, 30 Aug 2025 13:21:16 +0800 Modified Wed, 29 Oct 2025 23:24:54 +0800

集群功能

这里需要扩展下集群的成员与事件状态监听, 主要是要了解成员状态变化, 只有了解内部的周期变化才能知道动态关闭加入之后应该怎么去重新参与到集群事物之中.

首先需要 application.properties 比较关键的参数:

## ==============================================================================
## Actor 集群配置
# 默认加载 pekko-cluster_2.13-1.1.5.jar!\reference.conf 内部配置
## ==============================================================================
fortress.actor.name=fortress-cluster
## ==============================================================================
## 基本 Actor 配置
# pekko.actor.provider 就是 actor 启动的服务类型, 共有以下类型:
#   - local: 仅在单个 JVM 进程内运行的 Actor 系统, 不支持集群功能
#   - cluster: 启用 Pekko Cluster 功能, 支持跨多个节点的分布式 Actor 系统
# 
# 如果启动集群服务, 那么就需要启用 pekko.remote.artery.enabled=on
# 通过 artery.canonical.hostname 和 artery.canonical.port 创建节点参与入口
# 也就是其他远程节点都是通过这入口来参与数据传递
## ============================================================================== 
fortress.actor.settings.pekko.actor.provider=cluster
fortress.actor.settings.pekko.remote.artery.enabled=on
fortress.actor.settings.pekko.remote.artery.canonical.hostname=127.0.0.1
fortress.actor.settings.pekko.remote.artery.canonical.port=2551
## ==============================================================================
## 集群节点目前的角色身份
# 这里是针对节点细分化处理, 支持从集群当中挑选只某些角色的主机做指定业务
#   比如有台高配置的主机将其角色声明为 fortress-performance, 这台主机专门负责大量负载计算
#   那么可以网关之类服务只发给集群下面指定主机的运算, 而不是从所有主机里面提取一台做运算
# 这种角色分配的作用很大:
# 比如游戏大地图设计的时候, 就需要把复杂运算方面提交给高性能主机
# 其他用户节点服务可以在相对性能不是那么高的集群节点中运算
## ==============================================================================
fortress.actor.settings.pekko.cluster.roles.0=fortress-cluster
fortress.actor.settings.pekko.cluster.roles.1=fortress-player
fortress.actor.settings.pekko.cluster.roles.2=fortress-scene
fortress.actor.settings.pekko.cluster.roles.3=fortress-chat
## ==============================================================================
## 序列化 Actor 配置
# 声明消息处理采用的序列化功能
# pekko.actor.serializers.xxx 其实就是定义序列化接口的功能类
# fortress.actor.settings.serialization-bindings.YYY=xxx 就是将对应消息绑定到具体功能定义之中
# 如果没有特殊配置, 采用默认的 protobuf 即可, 序列化传输性能基本上没有太大问题 
## ==============================================================================
fortress.actor.settings.pekko.actor.serializers.proto=org.apache.pekko.remote.serialization.ProtobufSerializer
fortress.actor.settings.pekko.actor.serialization-bindings."com.google.protobuf.Message"=proto
## ==============================================================================
## 集群参与种子节点
# 设定所有参与业务服务节点都需要写入, 其实就是设定提供服务的 cluster 集群节点
# 如果某个集群服务崩溃会自动按照规则选取某个节点重新提供服务, 注意只有参与集群业务才需要加入,
# 不参与业务的节点比如网关节点等, 都不需要加入其中, 该配置所有使用到的地方都要保持一致
## ==============================================================================
fortress.actor.settings.pekko.cluster.seed-nodes.0=pekko://${fortress.actor.name}@127.0.0.1:2551
fortress.actor.settings.pekko.cluster.seed-nodes.1=pekko://${fortress.actor.name}@127.0.0.1:2552
fortress.actor.settings.pekko.cluster.seed-nodes.2=pekko://${fortress.actor.name}@127.0.0.1:2553
fortress.actor.settings.pekko.cluster.seed-nodes.4=pekko://${fortress.actor.name}@127.0.0.1:2554
## ==============================================================================
## 映射到 pekko.cluster.downing-provider-class 的集群配置项
# pekko 集群断开时候的处理类, 他会在集群节点断开的时候自动标识 `down(下线)` 并重新分配出可以的集群重新连通
# 如果不配置该项, pekko 默认采用 ManualDowningProvider 集群配置, 该配置需要手动自定义编写代码处理 `down` 标记并节点分配规则
# 另外还需要注意 `脑裂(Split Brain)` 的问题:
# 在集群中, 脑裂是指因 `网络分区(Network Partition)` 导致集群分裂成多个相互独立和无法通信的 '子集群', 且每个子集群都认为自己是 '唯一有效的主集群' 的故障状态.
# 脑裂状态会破坏集群的一致性, 可能导致数据冲突、资源竞争甚至业务中断
# 举个浅显的例子, 脑裂就像是两个在争论孩子所有权的母亲:
#   - 母亲A可能是因为服务端网络/设备重启/网络波动等问题临时断开, 这时候 ActorSystem 重新分配给母亲B让他赶快喂养孩子
#   - 当母亲A处理完断开的问题之后重新作为集群节点上线, 如果没有做好分配措施那么后续孩子的管理应该分配给母亲A还是母亲B?
#   - 这里面危害就是在管理的过程当中, 应该给母亲A买奶粉的钱被分配给母亲B买尿布, 虽然任务是对但是分配混乱导致数据全部错误
# 这里就是脑裂造成的危害, 所以需要提供 downing-provider-class 和 split-brain-resolver 内置策略处理这种问题
## ==============================================================================
fortress.actor.settings.pekko.cluster.downing-provider-class=org.apache.pekko.cluster.sbr.SplitBrainResolverProvider
## ==============================================================================
## 映射到 pekko.cluster.split-brain-resolver.active-strategy 的集群配置项
# 指定 downing-provider-class 的分配规则, 具体参数:
#   - keep-majority: 动态节点参与, 仅保留所有集群节点内部指定分裂数量的结群, 其他节点下线处理等待故障转移
#   - keep-oldest: 保留首个(seed-nodes)第一个集群
#   - keep-referee: 抽取其中一个节点为主要核心节点做调配任务
#   ......
# 还有其他扩展出来的集群策略, 这部分按照网络功能选择:
#   - 中小规模集群(节点数 ≤10): keep-majority
#   - 固定规模集群或需明确阈值:static-quorum
#   - 有核心节点或多机房部署:keep-referee
#   - 跨数据中心的大规模集群:lease-majority
#   - 极高一致性要求,可接受停服:down-all
#
# keep-majority 处理脑裂的方法就是保留指定数量集群(一半数量), 其他集群下线
# 当因为外部问题导致集群下线的时候, 之前预留的下线集群会自动补充上来作为 '新集群节点'
# 那么重新上线的 `老节点` 会因为不满足多数策略(上线节点已经被占坑完了)被切换成下线节点
# 这样就不会因为重新节点上线导致抢占替换上场的新节点任务
## ==============================================================================
fortress.actor.settings.pekko.cluster.split-brain-resolver.active-strategy=keep-majority
## ==============================================================================
## Pekko 日志复用[可选]
# 默认 pekko 的日志打印是另外的日志库处理, 所以两套日志系统维护起来挺麻烦
# 这里就直接把日志实现移交给 `Quarkus` 统一管理
# pekko.logging-filter 负责将日志实现转移到 Slf4jLoggingFilter 实现
# settings.pekko.loggers.0 负责确认输出到日志对象
# pekko.log-dead-letters 和 dead-letters-during-shutdown 关闭自带的格式采用默认 Quarkus 日志格式
# Fory 序列化内部是另外管理的日志库, 只需要声明下全局启用就行:
# org.apache.fory.logging.LoggerFactory.useSlf4jLogging(true);
# 可以在 ActorSystemConfig.java 追加 foryUseSlf4j 配置:
#   @WithDefault("true")
#   boolean foryUseSlf4j();
# 不过这个需要额外追加依赖库, 所以直接全局采用 Slf4j 全局指定就行了
## ==============================================================================
fortress.actor.settings.pekko.logging-filter=org.apache.pekko.event.slf4j.Slf4jLoggingFilter
fortress.actor.settings.pekko.loggers.0=org.apache.pekko.event.slf4j.Slf4jLogger
fortress.actor.settings.pekko.log-dead-letters=off
fortress.actor.settings.pekko.log-dead-letters-during-shutdown=off

pekko 官方文档对于集群相关概念说明:

概念 核心解释
Node(节点) 集群的最小单元,每个节点是一个独立运行的 Pekko 应用进程(JVM 实例),有唯一标识(address,如 pekko://cluster-system@host:port)。
Cluster System 每个节点内初始化的“集群系统”,通过 Cluster(actorSystem).join(seedNode) 加入集群,是节点参与集群的入口。
Seed Node(种子节点) 集群的“引导节点”,新节点通过连接种子节点发现集群其他成员,避免“脑裂初始化”。通常配置 2-3 个(奇数,提高可用性)。
Member Status(成员状态) 节点在集群中的生命周期状态,决定其是否参与集群工作:- Joining:正在加入,未就绪;- Up:正常运行,可承担任务;- Leaving/Exiting:主动退出,正在释放资源;- Down:被标记为故障,即将被移除;- Removed:已从集群中删除。
Cluster Events(集群事件) 集群状态变化时触发的事件(如节点加入/离开、状态变更),可通过 Cluster(actorSystem).subscribe(actorRef, classOf[ClusterEvent.MemberEvent]) 订阅,用于自定义集群逻辑(如动态扩缩容通知)。

集群节点的启动流程, 按照顺序从上到下:

状态 状态变化(org.apache.pekko.cluster.ClusterEvent) 中文名称 含义与触发场景
Joining ClusterEvent.MemberJoined 正在加入 节点刚启动,通过种子节点(seed-node)向集群发送“加入请求”,正在等待集群验证(如身份合法性、角色兼容性)。
WeaklyUp ClusterEvent.MemberWeaklyUp 弱可用(可选) 仅在配置 pekko.cluster.allow-weakly-up-members = on 时生效,用于网络不稳定场景:节点暂时无法与所有成员通信,但已通过基础验证,先标记为“弱可用”,待网络恢复后转为 Up
Up ClusterEvent.MemberUp 正常运行 节点通过验证,已加入集群并可参与业务(如接收消息、承担分片任务、参与投票等),是“活跃节点”的标志。
Leaving ClusterEvent.MemberLeft 正在离开 节点主动调用 Cluster(system).leave(selfAddress) 发起退出请求,集群通知其他节点“该节点即将下线”,此时节点不再接收新任务,但会继续处理现有任务。
Exiting ClusterEvent.MemberExited 正在退出 节点完成 Leaving 阶段的任务清理后,状态转为 Exiting,等待集群确认“可以安全移除”。
Down ClusterEvent.MemberDowned 已下线 节点被标记为“故障”或“无效”,可能因:- 其他节点检测到其不可达(如网络故障、宕机);- 脑裂解决策略(如 keep-majority)判定其所在子集群无效;- 人工调用 Cluster(system).down(address) 标记。
Removed ClusterEvent.MemberRemoved 已移除 Down 状态的节点被集群彻底从成员列表中删除,不再参与任何集群活动,需重新启动才能再次加入。

这里就是集群的基本流程, 接下来就是更加关键的 集群分片(Cluster Sharding) 的概念, 也是作为游戏服务端必须清楚的核心重点.

集群分片

在没有集群分片的时候, 如果作为游戏服务端服务情况下, 我们需要手动挂载普通集群 pekko-cluster, 然后在集群节点生成 SessionActor 服务, 负责接受请求并且动态创建客户端的会话 Actor 挂载到节点中心当中.

如果手动实现这方面逻辑需要注意实现以下基础功能:

  • 维护 ‘ID→节点’ 映射表关联
  • 实现节点选择策略
  • 检测故障并重建 Actor
  • 动态集群节点参与扩展

这些方面的逻辑编写都是大工程, 并且很容易出现问题, 所以 pekko 直接封装抽象出高级 Cluster Sharding 帮助省略编写复杂业务.

另外集群分片还有故障转移的功能, 也就是在节点崩溃会保存当前 Actor 状态, 之后转移到可用节点上恢复状态, 核心方式有两种:

  • 事件溯源(Event Sourcing): 记录实体的所有状态变更事件(而非直接保存状态),恢复时重放事件重建状态
  • 快照(Snapshot): 定期保存实体的完整状态快照, 加速恢复过程(减少事件重放数量)

这里需要把原来的 AbstractActor 该由 AbstractPersistentActor 实现:

package io.fortress.cluster.session;

import org.apache.pekko.persistence.AbstractPersistentActor;
import org.apache.pekko.persistence.SnapshotOffer;

import java.io.Serializable;


/**
 * 战斗管理器
 */
public class BattleManager extends AbstractPersistentActor {


    /**
     * 注意: 如果更换序列化工具, 那么成员对象也需要一起切换成对应序列化对象
     * 因为本身同步数据就是依靠序列化来同步传输, 所以也需要跟随一起做序列处理
     */
    static class Vec2 implements Serializable {
        protected int x = 0;
        protected int y = 0;
    }

    /**
     * 默认位置
     */
    private final Vec2 pos = new Vec2();


    /**
     * 场景ID, 也可以说是地图ID
     */
    private final String sceneId;


    /**
     * 当次战斗ID, 也就是开启当前战斗的唯一标识
     */
    private final String battleId;


    /**
     * 初始化
     */
    public BattleManager(String sceneId, String battleId) {
        this.sceneId = sceneId;
        this.battleId = battleId;

        // 可能需要通过拿到地图ID加载策划配置的地图坐标和事件等
    }


    /**
     * 移动事件
     *
     * @param playerId
     * @param x
     * @param y
     */
    record MoveCommand(
            int playerId,
            int x,
            int y
    ) {

    }


    /**
     * 常规事件拦截调用
     */
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(MoveCommand.class, cmd -> {
                    // 将事件保存到持久化记录当中, 成功之后会去更新事件
                    persist(cmd, this::updatePosition);
                })
                .build();
    }


    /**
     * 更新当前的坐标
     */
    private void updatePosition(MoveCommand cmd) {
        // 更新当前业务
        pos.x = cmd.x;
        pos.y = cmd.y;

        // 每执行100个事件直接保存镜像, 防止突然服务闪断
        if (lastSequenceNr() % 100 == 0) {
            saveSnapshot(pos);
        }
    }


    /**
     * 崩溃回滚, 恢复时处理事件和快照
     */
    @Override
    public Receive createReceiveRecover() {
        // 重放之前的 Actor 推送事件
        return receiveBuilder()
                // 重放之前的事件
                // 相当于播放器按下重新播放, 从而让当前 Actor 恢复成崩溃之前的状态
                // 这部分有点类似于游戏当中的追帧, 他会不断重放知道拉到最新的快照那一帧
                .match(MoveCommand.class, this::updatePosition)

                // 快照恢复
                // 快照会把崩溃前的最后状态挂载到当前 Actor 之中
                .match(SnapshotOffer.class, snapshotOffer -> {

                    // 恢复节点的位置信息
                    if (snapshotOffer.snapshot() instanceof Vec2 vec2) {
                        this.pos.x = vec2.x;
                        this.pos.y = vec2.y;
                    }
                })
                .build();
    }


    /**
     * 标识本地 Actor 服务的持久化标识
     * 因为是战斗相关, 所以一般可能会生成当次战斗标识
     * 可以看作保存数据库当中的主键, 而数据内容则是当次的所有战斗信息
     */
    @Override
    public String persistenceId() {
        return "battle-" + this.battleId;
    }
}

这里就是模拟实现战斗场景服务, 另外需要设置保存奔溃状态的配置:

## ==============================================================================
## Actor 集群故障检测配置
#   - failure-detector.threshold: 设置故障敏感度配置, 默认为 0.5
#       值越低: 故障检测器越敏感, 即使短暂的网络延迟也可能被判定为节点故障
#       值越低: 故障检测器越迟钝, 需要更长时间的心跳丢失才会判定为故障(容忍更长的网络波动)
#       内网服务可以适当设置为 0.5~1.0, 跨机房|云集群推荐 1.0~3.0
#       这个值很重要, 最好在做好网络测速之后配置
#
#   - failure-detector.heartbeat-interval: 敏感度心跳发送频率, 默认值 1s
#       和 failure-detector.threshold 配合使用的检测故障心跳包
#       常规网络服务 1s 已经足够日常使用, 跨机房|云集群差不多 2~5s 左右
#       这个值很重要, 最好在做好网络测速之后配置
## ==============================================================================
fortress.actor.settings.pekko.cluster.failure-detector.threshold=0.5
fortress.actor.settings.pekko.cluster.failure-detector.heartbeat-interval=1s
## ==============================================================================
## Actor 故障转移
#   - migration-parallelism: 分片迁移时的并行度, 默认为 1
#       其实相当于分片迁移的队列批次, 即同一时间内正在迁移的分片总数
#       如果超过这部分分片将会进入队列等待下次轮到他们才迁移完成
#       如果崩溃的分片服务很多而该值设置比较小, 会导致节点恢复缓慢, 需要权衡具体的应用场景
#
#   - entity-restart-backoff.min-backoff: 初始次故障重启的最小间隔, 也就是最小故障重启时间
#   - entity-restart-backoff.max-backoff: 最大故障重启间隔, 也就是超过这个时间直接当作故障无法重启
#   - entity-restart-backoff.random-factor: 重试递增因子, 在最小~最大重启之间不断递增重试的随机因子
#       假设  min-backoff=1s, max-backoff=10s, random-factor=0.2
#       也就是如果第 1 次崩溃, 将会在  1s ± 0.2s(0.8s~1.2s) 之后重启
#       第 2 次崩溃则是递增到 2s ± 0.4s(上一次间隔的 2 倍,叠加随机波动) 之后重启
#       一直持续到 10s ± 2s(达到 max-backoff 上限,不再重试恢复)
#       这个功能主要是防止那些网络波动导致连接缓慢被判定奔溃问题, 只有超过 max-backoff 没有响应才会判断需要崩溃转移
#
#   - coordinator-failure-backoff: 异常崩溃重启等待时间, 当节点心跳异常|消息投递失败的时候会在这时重试 
#       如果网络不稳定的情况可以适当调整为 5s
## ==============================================================================
fortress.actor.settings.pekko.cluster-sharding.migration-parallelism=3
fortress.actor.settings.pekko.cluster-sharding.entity-restart-backoff.min-backoff=1s
fortress.actor.settings.pekko.cluster-sharding.entity-restart-backoff.max-backoff=10s
fortress.actor.settings.pekko.cluster-sharding.entity-restart-backoff.random-factor=0.2
fortress.actor.settings.pekko.cluster-sharding.coordinator-failure-backoff=3s

之后就是比较核心的关键点, 集群分片的数据保存在哪里, 官方支持以下状态保存持久化:

  • LevelDB: 本地文件存储, 只能测试用下, 正式环境在多集群下基本上没有太大可用性
  • Cassandra: 分布式高可用的 NoSQL 数据库, 官方主推的持久化状态保持方案
  • JDBC: 通过 JDBC 接口支持关系型数据库, 可以复用 MySQL/MariaDB/Postgresql
  • 其他还有 DynamoDBR2DBC 之类, 可以去官方文档选择查看

这部分最通用的是 JDBC, 最高效的是 Cassandra, 测试的时候就用 LevelDB 即可; 目前当前系统还没用到这部分, 等需要用到的时候在说明怎么设计这方面功能, 基本上问题也不是那么大.