MeteorCat / Fortress 6 - 集群实现

Created Sat, 30 Aug 2025 14:36:03 +0800 Modified Wed, 29 Oct 2025 23:24:54 +0800

集群实现

这篇章开始设计简单的集群功能, 方便后续的业务开发.

在开始设计集群先说明下 common 项目之前考虑的点: common 通用包里面要不要加入 pekko-cluster-sharding 依赖?

  • 引入之后就 pekko 强绑定了
  • 不引入没有办法设计通用的 消息提取器(MessageExtractor).

而由于之前切换成 Protobuf 序列化功能, 所以内部消息定义可能需要额外做处理, 最终考虑再三之后将 pekko-cluster-sharding 移交到 common 做通用的第三方依赖.

这里在通用 common 设计个取模消息提取器方便后续做分片计算:

package io.fortress.common.actor;


import io.fortress.common.protobuf.Events;
import org.apache.commons.codec.digest.MurmurHash3;
import org.apache.pekko.cluster.sharding.ShardRegion;

import java.nio.charset.StandardCharsets;
import java.util.function.Function;

/**
 * io.fortress.common.actor.ModuloMessageExtractor.java
 * <p>
 * 取模消息提取器
 */
@SuppressWarnings("unused")
public class ModuloMessageExtractor implements ShardRegion.MessageExtractor {

    /**
     * 分片总数
     */
    protected final int numberOfShards;

    /**
     * MurMur3哈希Seed定义, 默认为0
     */
    protected final int hashSeed;


    /**
     * 生成分片ID回调
     */
    protected final Function<Integer, String> createShardId;


    /**
     * 构造方法
     */
    public ModuloMessageExtractor(int numberOfShards, Function<Integer, String> createShardId, int hashSeed) {
        this.numberOfShards = numberOfShards;
        this.createShardId = createShardId;
        this.hashSeed = hashSeed;
    }

    /**
     * 构造方法
     */
    public ModuloMessageExtractor(int numberOfShards, Function<Integer, String> createShardId) {
        this(numberOfShards, createShardId, 0);
    }


    /**
     * 构造方法
     */
    public ModuloMessageExtractor(int numberOfShards, String prefix, int hashSeed) {
        this(numberOfShards, (integer -> prefix + integer), hashSeed);
    }


    /**
     * 构造方法
     */
    public ModuloMessageExtractor(int numberOfShards, String prefix) {
        this(numberOfShards, (integer -> prefix + integer), 0);
    }


    /**
     * 从消息中提取分片的实体ID
     * 下面就是为什么要将 protobuf 的 sessionId 定义为 string 的原因之一
     */
    @Override
    public String entityId(Object message) {

        // 这部分后面可能需要优化处理下, 基本约等于将事件硬编码到这里识别
        // 感觉这里配置不是那么灵活
        if (message instanceof Events.C2SConnectedEvent event) {
            return event.getSessionId();
        } else if (message instanceof Events.C2SDisconnectEvent event) {
            return event.getSessionId();
        } else if (message instanceof Events.C2SMessageEvent event) {
            return event.getSessionId();
        } else if (message instanceof Events.S2CDisconnectEvent event) {
            return event.getSessionId();
        } else if (message instanceof Events.S2CMessageEvent event) {
            return event.getSessionId();
        }

        return null;
    }

    /**
     * 提取实际要发送给实体的消息
     */
    @Override
    public Object entityMessage(Object message) {
        return message;
    }


    /**
     * 基于实体ID的哈希值取模计算分片ID
     */
    @Override
    public String shardId(Object message) {
        String entityId = entityId(message);
        if (entityId == null) return null;

        // 采用murmur3将哈希成 uint32 值, 确保哈希值最后得出为正数
        byte[] bytes = entityId.getBytes(StandardCharsets.UTF_8);
        int hash = Math.abs(MurmurHash3.hash32x86(bytes, 0, bytes.length, hashSeed));

        // 计算分片抽取, 确保结果在[0, numberOfShards-1]范围内
        Integer shared = hash % numberOfShards;
        return createShardId.apply(shared);
    }
}

内部采用了 MurMur3 提取的 uint32 再取模换算出具体的分片ID, 这里添加个测试单元确认代码无误:

package io.fortress.common.actor;

import io.fortress.common.protobuf.Events;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.common.constraint.Assert;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * io.fortress.common.actor.ModuloMessageExtractorTest.java
 * <p>
 * 取模集群分片算法
 */
@QuarkusTest
class ModuloMessageExtractorTest {

    /**
     * 日志句柄
     */
    private static final Logger log = LoggerFactory.getLogger(ModuloMessageExtractorTest.class);


    /**
     * 取模分片算法
     */
    private final ModuloMessageExtractor extractor = new ModuloMessageExtractor(
            10,
            integer -> "shared-" + integer,
            0
    );

    /**
     * 提取集群分片的
     */
    @Test
    void entityId() {
        // 生成 C2SConnectedEvent 事件结构
        Events.C2SConnectedEvent c2SConnectedEvent = Events.C2SConnectedEvent
                .newBuilder()
                .setSessionId("1000001")
                .setTimestamp(System.currentTimeMillis())
                .setIp("127.0.0.1")
                .build();
        var entityId = extractor.entityId(c2SConnectedEvent);
        Assert.assertNotNull(entityId);
        log.info("entityId: {}", entityId);
    }


    /**
     * 检索分片ID
     */
    @Test
    void shardId() {

        // 随机生成批量消息事件
        for (int i = 0; i < 5; i++) {
            // 生成 C2SDisconnectEvent 事件结构
            Events.C2SDisconnectEvent c2SDisconnectEvent = Events.C2SDisconnectEvent
                    .newBuilder()
                    .setSessionId(String.valueOf(RandomUtils.insecure().randomLong(100000, 900000)))
                    .setTimestamp(System.currentTimeMillis())
                    .build();

            // 检索分片ID
            var shardId = extractor.shardId(c2SDisconnectEvent);
            Assert.assertNotNull(shardId);
            log.info("shardId: {}", shardId);
        }
    }
}

实际上之前就想用 int32/int64 替代掉 sessionIdstring 类型传输, 后面发现底层很多哈希路由很多依赖于 String 做处理, 最后没什么太多选择只能将对应的标识改由 string 传递.

现在前期集群算法的编写工作已经完成, 之后就是开始设计挂载集群服务了.

集群服务

首先确保创建 cluster 子模块, 内部的 pom.xml 配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <!-- 父依赖 -->
    <parent>
        <groupId>io.fortress</groupId>
        <artifactId>boot</artifactId>
        <version>1.0-SNAPSHOT</version>
        <relativePath>../../pom.xml</relativePath>
    </parent>

    <!-- 子属性 -->
    <modelVersion>4.0.0</modelVersion>
    <artifactId>fortress-cluster</artifactId>
    <name>Fortress Cluster Module</name>
    <description>Fortress cluster Core</description>
    <packaging>jar</packaging>

    <!-- 第三方依赖 -->
    <dependencies>

        <!-- 通用全局库 -->
        <dependency>
            <groupId>io.fortress</groupId>
            <artifactId>fortress-common</artifactId>
        </dependency>

        <!-- Protobuf 核心依赖 -->
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
        </dependency>

    </dependencies>
</project>

因为集群本身就是启动的应用, 所以需要定义应用入口:

package io.fortress.cluster;

import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.annotations.QuarkusMain;

/**
 * io.fortress.cluster.ClusterApplication.java
 * <p>
 * 项目启动入口
 * 参考文档: <a href="https://cn.quarkus.io/guides/lifecycle">应用程序的初始化和终止</a>
 */
@QuarkusMain
public class ClusterApplication {

    /**
     * 程序启动入口
     *
     * @param args 参数列表
     */
    public static void main(String[] args) {
        Quarkus.run(args);
    }
}

还要有全局通用的 Actor System, 需要添加扩展的配置接口和全局对象注入:

package io.fortress.cluster.config;


import io.fortress.common.config.ActorSystemConfiguration;
import io.smallrye.config.ConfigMapping;

/**
 * io.fortress.cluster.config.ClusterActorSystemConfiguration.java
 * <p>
 * 重载通用配置
 */
@ConfigMapping(prefix = "fortress.actor")
public interface ClusterActorSystemConfiguration extends ActorSystemConfiguration {

}

Actor System 通用系统基本和 websocket 那边差不多, 只是追加了节点启动之后绑定集群业务的逻辑:

package io.fortress.cluster;

import io.fortress.cluster.config.ClusterActorSystemConfiguration;
import io.fortress.cluster.session.SessionManager;
import io.fortress.common.actor.ModuloMessageExtractor;
import io.quarkus.arc.DefaultBean;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.sharding.ClusterSharding;
import org.apache.pekko.cluster.sharding.ClusterShardingSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * io.fortress.cluster.ClusterActorSystem.java
 * <p>
 * 加载全局初始化 ActorSystem
 */
@ApplicationScoped
public class ClusterActorSystem {

    /**
     * 日志对象
     */
    final Logger logger = LoggerFactory.getLogger(ClusterActorSystem.class);


    /**
     * 加载配置
     */
    @Inject
    ClusterActorSystemConfiguration configuration;

    /**
     * 生成全局唯一Actor系统
     */
    @Produces
    @Startup
    @DefaultBean
    @ApplicationScoped
    public ActorSystem createActorSystem() {
        // 打印配置
        logger.info("Creating ActorSystem: {}", configuration.name());
        configuration.settings().forEach((key, value) -> logger.info("ActorSystem Setting: {} = {}", key, value));

        // 生成 Actor System 并注册系统退出方法, 当 Actor 系统退出的时候要求关闭整个服务
        ActorSystem system = configuration.createActorSystem((terminatedTry -> {
            logger.info("Close ActorSystem, Status: {}", terminatedTry.isSuccess() ? "Success" : "Failure");

            // 无论成功失败都关闭 Quarkus 系统
            Quarkus.asyncExit();
            return null;
        }));


        // 确认是否加入集群, 加入集群的同时需要启动集群分片
        Cluster cluster = Cluster.get(system);
        cluster.registerOnMemberUp(() -> {
            logger.info("Joined Cluster: {}", system.name());


            // todo: 创建集群分片业务, 后面这里可能要生成动态的配置加载

            // 设定会话集群对象: Session
            ClusterSharding clusterSharding = ClusterSharding.get(system);
            ClusterShardingSettings settings = ClusterShardingSettings.create(system);
            ModuloMessageExtractor extractor = new ModuloMessageExtractor(5, "session-");
            ActorRef sessionActorRef = clusterSharding.start(
                    "session",
                    SessionManager.props,
                    settings,
                    extractor
            );
            logger.info("Create Cluster Sharding: {}", sessionActorRef.path().address());

            // 对于游戏服务端来说, 除了玩家自身会话之外, 可能还有涉及到其他类型分片业务
            //  - clusterSharding.start("chat",....) // 聊天系统
            //  - clusterSharding.start("rank",....) // 排行系统
            //  - clusterSharding.start("battle",....) // 战斗系统
            //  - clusterSharding.start("achieve",....) // 成就系统
            // 这部分系统就是最开我担心的, 单纯的 ModuloMessageExtractor 系统已经无法覆盖到其中分片
            // 不过目前还没用到的相关, 可以先处理玩家个人的消息集群分片业务
        });

        // 离开集群的时候的回调, 这里可以通知该集群下面的分片做些处理
        cluster.registerOnMemberRemoved(() -> {
            logger.info("Removed Cluster: {}", system.name());

        });

        return system;
    }
}

这里简单实现个 会话管理器 功能就行了, 后面再一起扩展出来不同的业务玩法:

package io.fortress.cluster.session;

import io.fortress.common.protobuf.Events;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.Props;
import org.apache.pekko.event.Logging;
import org.apache.pekko.event.LoggingAdapter;

/**
 * io.fortress.cluster.session.SessionManager.java
 * <p>
 * 会话集群Actor
 */
public class SessionManager extends AbstractActor {

    public static Props props = Props.create(SessionManager.class);

    final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(Events.C2SConnectedEvent.class, event -> {
            logger.info("Received C2SConnectedEvent: {}", event.toString());
        }).match(Events.S2CDisconnectEvent.class, event -> {
            logger.info("Received S2CDisconnectEvent: {}", event.toString());
        }).build();
    }
}

最后配置文件 application.properties 编写内容:

# Actor 集群配置
# 参考: pekko-cluster_2.13-1.1.5.jar!\reference.conf 内部配置重载
fortress.actor.name=fortress-cluster
# 直接角色配置项声明即可
fortress.actor.settings.pekko.cluster.roles.0=fortress-cluster
fortress.actor.settings.pekko.cluster.roles.1=fortress-game
fortress.actor.settings.pekko.cluster.roles.2=fortress-center
# 重载 pekko 配置项目 - 基础 Actor 设置
fortress.actor.settings.pekko.actor.provider=cluster
fortress.actor.settings.pekko.remote.artery.enabled=on
fortress.actor.settings.pekko.remote.artery.canonical.hostname=127.0.0.1
fortress.actor.settings.pekko.remote.artery.canonical.port=2551
# Actor 序列化处理
fortress.actor.settings.pekko.actor.serializers.proto=org.apache.pekko.remote.serialization.ProtobufSerializer
fortress.actor.settings.pekko.actor.serialization-bindings."com.google.protobuf.Message"=proto
# 重载 pekko 配置项目 - 集群节点 Actor(注意:自己最好在首个节点)
fortress.actor.settings.pekko.cluster.seed-nodes.0=pekko://${fortress.actor.name}@127.0.0.1:2551
#fortress.actor.settings.pekko.cluster.seed-nodes.1=pekko://${fortress.actor.name}@127.0.0.1:2552
# 重载 pekko 配置项目 - 集群 Actor 决策配置
fortress.actor.settings.pekko.cluster.downing-provider-class=org.apache.pekko.cluster.sbr.SplitBrainResolverProvider
fortress.actor.settings.pekko.cluster.split-brain-resolver.active-strategy=keep-majority

启动目前是没有问题, 但是目前并没有具体的业务逻辑处理, 也没有相关的数据加载传输, 下面就是打印的日志内容:

2025-09-04 18:56:19,154 INFO  [io.for.clu.ClusterActorSystem] (fortress-cluster-pekko.actor.default-dispatcher-17) Joined Cluster: fortress-cluster
2025-09-04 18:56:19,170 INFO  [io.for.clu.ClusterActorSystem] (fortress-cluster-pekko.actor.default-dispatcher-17) Create Cluster Sharding: pekko://fortress-cluster
2025-09-04 18:56:19,173 INFO  [org.apa.pek.clu.sha.ShardRegion] (fortress-cluster-pekko.actor.default-dispatcher-17) session: Automatic entity passivation: idle entities after [2.000 min], checked every [1.000 min]
2025-09-04 18:56:19,180 INFO  [org.apa.pek.clu.sin.ClusterSingletonManager] (fortress-cluster-pekko.actor.default-dispatcher-17) Singleton manager starting singleton actor [pekko://fortress-cluster/system/sharding/sessionCoordinator/singleton]
2025-09-04 18:56:19,181 INFO  [org.apa.pek.clu.sin.ClusterSingletonManager] (fortress-cluster-pekko.actor.default-dispatcher-17) ClusterSingletonManager state change [Start -> Oldest]
2025-09-04 18:56:19,186 INFO  [org.apa.pek.clu.sha.DDataShardCoordinator] (fortress-cluster-pekko.actor.default-dispatcher-16) session: ShardCoordinator was moved to the active state with [0] shards

这里日志打印 ShardCoordinator was moved to the active state with [0] shards 说明集群分片已经启动, 但是目前没有会话连接到内部.

补充说明

为什么需要传递 ShardRegion.MessageExtractor 这个类?

其实集群分片的原理就是在原本的集群 Cluster 之上加上动态扩展节点, 并且追加崩溃状态转移等处理方式; 单个集群服务之下需要按照规则来做动态节点创建, 这个动态创建的规则就是 MessageExtractor 定义, 按照接入集群规则流程如下:

  1. 外部请求接入集群分片节点
  2. 消息包装时候调用 MessageExtractor 提取出 entityId 作为集群标识
  3. 集群分片接收到消息调用 MessageExtractor 提取 shardId 作为分片标识
  4. 获取到 shardId 判断是否在 entityId 之下存在
  5. 如果存在 shardId 就直接转发给对应 Actor 节点对象
  6. 如果不存在 shardId 就动态挂载关联的 Actor 节点之后再转发处理

这里就是基本上的处理流程, 所以说 MessageExtractor 才是很重要, 消息到达与否和他有密切联系; 但是目前在 common 的组件其实很大概率不具有通用性, 之前注释也说明过:

// 这部分后面可能需要优化处理下, 基本约等于将事件硬编码到这里识别
// 感觉这里配置不是那么灵活
if (message instanceof Events.C2SConnectedEvent event) {
    return event.getSessionId();
} else if (message instanceof Events.C2SDisconnectEvent event) {
    return event.getSessionId();
} else if (message instanceof Events.C2SMessageEvent event) {
    return event.getSessionId();
} else if (message instanceof Events.S2CDisconnectEvent event) {
    return event.getSessionId();
} else if (message instanceof Events.S2CMessageEvent event) {
    return event.getSessionId();
}

所以后面也需要改进下让其能够通用化实现, 所以重构这部分功能采用通用化实现:

package io.fortress.common.actor;

import org.apache.commons.codec.digest.MurmurHash3;
import org.apache.pekko.cluster.sharding.ShardRegion;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.function.Function;

/**
 * io.fortress.common.actor.ActorMessageExtractor.java
 * <p>
 * 支持通用的的消息提取器, 外部需要传入实现 Extractor 的列表, 才会被检索判断出来
 */
@SuppressWarnings("unused")
public class ActorMessageExtractor implements ShardRegion.MessageExtractor {


    /**
     * 默认提取接口
     */
    public interface Extractor {

        /**
         * 获取实体ID
         */
        String getEntityId(Object message);

        /**
         * 获取分片ID
         */
        String getShardId(Object message);
    }


    /**
     * 分片总数
     */
    protected final int numberOfShards;

    /**
     * MurMur3哈希Seed定义, 默认为0
     */
    protected final int hashSeed;

    /**
     * 消息提取器列表
     */
    protected final List<Extractor> extractors;

    /**
     * 生成分片ID回调
     */
    protected final Function<Integer, String> createShardId;


    /**
     * 构造方法
     */
    public ActorMessageExtractor(int numberOfShards, List<Extractor> extractors, Function<Integer, String> createShardId, int hashSeed) {
        if (numberOfShards <= 0) throw new IllegalArgumentException("numberOfShards must be greater than 0");
        this.numberOfShards = numberOfShards;
        this.extractors = extractors;
        this.createShardId = createShardId;
        this.hashSeed = hashSeed;
    }

    /**
     * 构造方法
     */
    public ActorMessageExtractor(int numberOfShards, List<Extractor> extractors, Function<Integer, String> createShardId) {
        this(numberOfShards, extractors, createShardId, 0);
    }


    /**
     * 构造方法
     */
    public ActorMessageExtractor(int numberOfShards, List<Extractor> extractors, String prefix, int hashSeed) {
        this(numberOfShards, extractors, (integer -> prefix + integer), hashSeed);
    }


    /**
     * 构造方法
     */
    public ActorMessageExtractor(int numberOfShards, List<Extractor> extractors, String prefix) {
        this(numberOfShards, extractors, (integer -> prefix + integer), 0);
    }


    /**
     * 检索实体ID
     */
    @Override
    public String entityId(Object message) {
        if (extractors.isEmpty() || message == null) return null;

        // 遍历列表当中符合条件的对象
        return extractors
                .stream()
                .filter(e -> e.getEntityId(message) != null)
                .findFirst()
                .map(value -> value.getEntityId(message))
                .orElse(null);
    }

    /**
     * 检索实体消息, 保持不变即可
     */
    @Override
    public Object entityMessage(Object message) {
        return message;
    }

    /**
     * 检索分片ID
     */
    @Override
    public String shardId(Object message) {
        if (extractors.isEmpty() || message == null) return null;
        String shardId = extractors.stream()
                .filter(e -> e.getShardId(message) != null)
                .findFirst()
                .map(value -> value.getShardId(message))
                .orElse(null);
        if (shardId == null) return null;

        // 取模算法
        byte[] bytes = shardId.getBytes(StandardCharsets.UTF_8);
        int hash = Math.abs(MurmurHash3.hash32x86(bytes, 0, bytes.length, hashSeed));
        Integer shardNumber = hash % numberOfShards;
        return createShardId.apply(shardNumber);
    }

}

这里编写个测试单元确认功能运行成功:

package io.fortress.common.actor;

import io.fortress.common.protobuf.Events;
import io.smallrye.common.constraint.Assert;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
 * io.fortress.common.actor.ActorMessageExtractorTest.java
 * <p>
 * ActorMessageExtractor 测试单元
 */
class ActorMessageExtractorTest {

    /**
     * 日志句柄
     */
    private static final Logger log = LoggerFactory.getLogger(ActorMessageExtractorTest.class);


    /**
     * 需要实现提取器功能:C2SConnectedEvent
     */
    record C2SConnectedEventExtractor() implements ActorMessageExtractor.Extractor {

        @Override
        public String getEntityId(Object message) {
            if (message instanceof Events.C2SConnectedEvent event) {
                return event.getSessionId();
            }
            return null;
        }

        @Override
        public String getShardId(Object message) {
            if (message instanceof Events.C2SConnectedEvent event) {
                return event.getSessionId();
            }
            return null;
        }
    }

    /**
     * 需要实现提取器功能:C2SDisconnectEvent
     */
    record C2SDisconnectedEventExtractor() implements ActorMessageExtractor.Extractor {

        @Override
        public String getEntityId(Object message) {
            if (message instanceof Events.C2SDisconnectEvent event) {
                return event.getSessionId();
            }
            return null;
        }

        @Override
        public String getShardId(Object message) {
            if (message instanceof Events.C2SDisconnectEvent event) {
                return event.getSessionId();
            }
            return null;
        }
    }


    /**
     * 通用取模分片算法
     */
    private final ActorMessageExtractor extractor = new ActorMessageExtractor(
            10,
            List.of(
                    // 注入消息提取器
                    new C2SConnectedEventExtractor(),
                    new C2SDisconnectedEventExtractor()
            ),
            integer -> "shared-" + integer,
            0
    );


    /**
     * 提取集群实体ID
     */
    @Test
    void entityId() {
        // 生成 C2SConnectedEvent 事件结构
        Events.C2SConnectedEvent c2SConnectedEvent = Events.C2SConnectedEvent
                .newBuilder()
                .setSessionId("1000001")
                .setTimestamp(System.currentTimeMillis())
                .setIp("127.0.0.1")
                .build();
        var entityId = extractor.entityId(c2SConnectedEvent);
        Assert.assertNotNull(entityId);
        log.info("entityId: {}", entityId);
    }


    /**
     * 检索分片ID
     */
    @Test
    void shardId() {
        // 随机生成批量消息事件
        for (int i = 0; i < 5; i++) {
            // 生成 C2SDisconnectEvent 事件结构
            Events.C2SDisconnectEvent c2SDisconnectEvent = Events.C2SDisconnectEvent
                    .newBuilder()
                    .setSessionId(String.valueOf(RandomUtils.insecure().randomLong(100000, 900000)))
                    .setTimestamp(System.currentTimeMillis())
                    .build();

            // 检索分片ID
            var shardId = extractor.shardId(c2SDisconnectEvent);
            Assert.assertNotNull(shardId);
            log.info("shardId: {}", shardId);
        }
    }
}

这里可以看到这个接口功能通用性更强, 支持外部去定制提取器列表分析消息返回.

但是问题也在通用性方面, 为了支持外部高度自定义消息解析采用 List, 每次消息都会去遍历是否类型匹配; 无论消息是否被认证过都会被列表迭代查询一次, 这其中的损耗其实就看你是否能够接受, 所以消息列表应该尽可能控制少点从而避免迭代查询次数过多.

改进之后的 ActorSystem 代码就像这样:

package io.fortress.cluster;

import io.fortress.cluster.config.ClusterActorSystemConfiguration;
import io.fortress.cluster.session.SessionManager;
import io.fortress.common.actor.ActorMessageExtractor;
import io.fortress.common.actor.ModuloMessageExtractor;
import io.fortress.common.protobuf.Events;
import io.quarkus.arc.DefaultBean;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.sharding.ClusterSharding;
import org.apache.pekko.cluster.sharding.ClusterShardingSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
 * io.fortress.cluster.ClusterActorSystem.java
 * <p>
 * 加载全局初始化 ActorSystem
 */
@ApplicationScoped
public class ClusterActorSystem {

    /**
     * 日志对象
     */
    final Logger logger = LoggerFactory.getLogger(ClusterActorSystem.class);


    /**
     * 加载配置
     */
    @Inject
    ClusterActorSystemConfiguration configuration;


    /**
     * 获取消息提取器列表
     * 简单提取两个事件, 后面这个通用消息会统一封装
     */
    public List<ActorMessageExtractor.Extractor> getActorExtractor() {
        return List.of(
                new ActorMessageExtractor.Extractor() {
                    @Override
                    public String getEntityId(Object message) {
                        if (message instanceof Events.C2SConnectedEvent event) {
                            return event.getSessionId();
                        }
                        return null;
                    }

                    @Override
                    public String getShardId(Object message) {
                        if (message instanceof Events.C2SConnectedEvent event) {
                            return event.getSessionId();
                        }
                        return null;
                    }
                },

                new ActorMessageExtractor.Extractor() {

                    @Override
                    public String getEntityId(Object message) {
                        if (message instanceof Events.C2SDisconnectEvent event) {
                            return event.getSessionId();
                        }
                        return null;
                    }

                    @Override
                    public String getShardId(Object message) {
                        if (message instanceof Events.C2SDisconnectEvent event) {
                            return event.getSessionId();
                        }
                        return null;
                    }
                }
        );
    }


    /**
     * 生成全局唯一Actor系统
     */
    @Produces
    @Startup
    @DefaultBean
    @ApplicationScoped
    public ActorSystem createActorSystem() {
        // 打印配置
        logger.info("Creating ActorSystem: {}", configuration.name());
        configuration.settings().forEach((key, value) -> logger.info("ActorSystem Setting: {} = {}", key, value));

        // 生成 Actor System 并注册系统退出方法, 当 Actor 系统退出的时候要求关闭整个服务
        ActorSystem system = configuration.createActorSystem((terminatedTry -> {
            logger.info("Close ActorSystem, Status: {}", terminatedTry.isSuccess() ? "Success" : "Failure");

            // 无论成功失败都关闭 Quarkus 系统
            Quarkus.asyncExit();
            return null;
        }));


        // 确认是否加入集群, 加入集群的同时需要启动集群分片
        Cluster cluster = Cluster.get(system);
        cluster.registerOnMemberUp(() -> {
            logger.info("Joined Cluster: {}", system.name());


            // todo: 创建集群分片业务, 后面这里可能要生成动态的配置加载

            // 设定会话集群对象: Session
            ClusterSharding clusterSharding = ClusterSharding.get(system);
            ClusterShardingSettings settings = ClusterShardingSettings.create(system);
            ActorMessageExtractor extractor = new ActorMessageExtractor(5, getActorExtractor(), "session-");
            ActorRef sessionActorRef = clusterSharding.start(
                    "session",
                    SessionManager.props,
                    settings,
                    extractor
            );
            logger.info("Create Cluster Sharding: {}", sessionActorRef.path().address());

            // 对于游戏服务端来说, 除了玩家自身会话之外, 可能还有涉及到其他类型分片业务
            //  - clusterSharding.start("chat",....) // 聊天系统
            //  - clusterSharding.start("rank",....) // 排行系统
            //  - clusterSharding.start("battle",....) // 战斗系统
            //  - clusterSharding.start("achieve",....) // 成就系统
            // 这部分系统就是最开我担心的, 单纯的 ModuloMessageExtractor 系统已经无法覆盖到其中分片
            // 不过目前还没用到的相关, 可以先处理玩家个人的消息集群分片业务
        });

        // 离开集群的时候的回调, 这里可以通知该集群下面的分片做些处理
        cluster.registerOnMemberRemoved(() -> {
            logger.info("Removed Cluster: {}", system.name());

        });

        return system;
    }

}

再次启动服务正常挂载就没问题了