MeteorCat / Fortress 9 - 补充扩展

Created Sun, 07 Sep 2025 12:33:36 +0800 Modified Wed, 29 Oct 2025 23:24:54 +0800

补充扩展

这章节主要说明后续可能用到扩展工具类方便后续使用, 这些组件基本都放置在 common 子模块之中

雪花ID生成

生产工具类 SnowflakeIdGenerator.java 工具:

package io.fortress.common.utils;

import java.util.concurrent.atomic.AtomicLong;

/**
 * io.fortress.common.utils.SnowflakeIdGenerator
 * <p>
 * 雪花ID生成器的数值总长度为64位, 雪花ID结构:
 * <p>
 * 0(1位,首位)- 时间戳(41位)- 数据中心ID(5位)- 工作节点ID(5位)- 序列号(12位)
 */
@SuppressWarnings("unused")
public class SnowflakeIdGenerator {

    /**
     * 开始时间戳 (1993-02-09 00:00:00)
     */
    private static final long START_TIMESTAMP = 729187200000L;

    /**
     * 数据中心ID所占的位数
     */
    private static final long DATA_CENTER_ID_BITS = 5L;


    /**
     * 工作节点ID所占的位数
     */
    private static final long WORKER_ID_BITS = 5L;


    /**
     * 序列号所占的位数
     */
    private static final long SEQUENCE_BITS = 12L;

    /**
     * 数据中心ID的最大值
     */
    private static final long MAX_DATA_CENTER_ID = ~(-1L << DATA_CENTER_ID_BITS);


    /**
     * 工作节点ID的最大值
     */
    private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);


    /**
     * 序列号的最大值
     */
    private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);

    /**
     * 工作节点ID左移位数
     */
    private static final long WORKER_ID_SHIFT = SEQUENCE_BITS;

    /**
     * 数据中心ID左移位数
     */
    private static final long DATA_CENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS;

    /**
     * 时间戳左移位数
     */
    private static final long TIMESTAMP_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATA_CENTER_ID_BITS;

    /**
     * 数据中心ID
     */
    private final long dataCenterId;

    /**
     * 工作节点ID
     */
    private final long workerId;

    /**
     * 序列号
     */
    private final AtomicLong sequence = new AtomicLong(0L);

    /**
     * 上一次生成ID的时间戳
     */
    private volatile long lastTimestamp = -1L;

    /**
     * 构造函数
     *
     * @param dataCenterId 数据中心ID (0 ~ 31)
     * @param workerId     工作节点ID (0 ~ 31)
     */
    public SnowflakeIdGenerator(long dataCenterId, long workerId) {
        if (dataCenterId > MAX_DATA_CENTER_ID || dataCenterId < 0) {
            throw new IllegalArgumentException(String.format("The data center-id must be between 0 and %d.", MAX_DATA_CENTER_ID));
        }
        if (workerId > MAX_WORKER_ID || workerId < 0) {
            throw new IllegalArgumentException(String.format("The data work-id must be between 0 and %d.", MAX_WORKER_ID));
        }
        this.dataCenterId = dataCenterId;
        this.workerId = workerId;
    }

    /**
     * 构造函数
     * 获取单个数值拆分出 int16 + int16 拆分
     * 这部分可以用于服务器ID等单标识
     *
     * @param id 数值ID
     */
    public SnowflakeIdGenerator(int id) {
        int dataCenterId = Math.abs((id >> 16) % 31); // 取高16位计算
        int workerId = Math.abs((id & 0xFFFF) % 31);  // 取低16位计算
        this.dataCenterId = dataCenterId;
        this.workerId = workerId;
    }

    /**
     * 生成下一个ID
     *
     * @return 雪花ID
     */
    public synchronized long nextId() {
        long timestamp = System.currentTimeMillis();

        // 如果当前时间小于上一次生成ID的时间戳,说明系统时钟回退过,抛出异常
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(String.format("System clock rolled back, ID Generation rejected. Time difference: %d ms.", lastTimestamp - timestamp));
        }

        // 如果是同一时间生成的,则序列号加1
        if (timestamp == lastTimestamp) {
            long nextSequence = sequence.incrementAndGet() & MAX_SEQUENCE;
            // 如果序列号达到最大值,则等待下一个毫秒
            if (nextSequence == 0) {
                timestamp = waitUntilNextMillis(lastTimestamp);
            }
            sequence.set(nextSequence);
        } else {
            // 如果是新的毫秒,序列号重置为0
            sequence.set(0L);
        }

        // 更新上一次生成ID的时间戳
        lastTimestamp = timestamp;

        // 拼接并返回雪花ID
        return ((timestamp - START_TIMESTAMP) << TIMESTAMP_SHIFT)
                | (dataCenterId << DATA_CENTER_ID_SHIFT)
                | (workerId << WORKER_ID_SHIFT)
                | sequence.get();
    }

    /**
     * 等待直到下一个毫秒
     *
     * @param lastTimestamp 上一次生成ID的时间戳
     * @return 新的时间戳
     */
    private long waitUntilNextMillis(long lastTimestamp) {
        long timestamp = System.currentTimeMillis();
        while (timestamp <= lastTimestamp) {
            timestamp = System.currentTimeMillis();
        }
        return timestamp;
    }

    /**
     * 从ID中解析出时间戳
     *
     * @param id 雪花ID
     * @return 时间戳(毫秒)
     */
    public long parseTimestamp(long id) {
        return (id >>> TIMESTAMP_SHIFT) + START_TIMESTAMP;
    }

    /**
     * 从ID中解析出数据中心ID
     *
     * @param id 雪花ID
     * @return 数据中心ID
     */
    public long parseDataCenterId(long id) {
        return (id >>> DATA_CENTER_ID_SHIFT) & MAX_DATA_CENTER_ID;
    }

    /**
     * 从ID中解析出工作节点ID
     *
     * @param id 雪花ID
     * @return 工作节点ID
     */
    public long parseWorkerId(long id) {
        return (id >>> WORKER_ID_SHIFT) & MAX_WORKER_ID;
    }

    /**
     * 从ID中解析出序列号
     *
     * @param id 雪花ID
     * @return 序列号
     */
    public long parseSequence(long id) {
        return id & MAX_SEQUENCE;
    }
}

之后生成工具 SnowflakeIdGeneratorFactory.java 的类:

package io.fortress.common.utils;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * io.fortress.common.utils.SnowflakeIdGeneratorFactory.java
 * <p>
 * 雪花ID生成器工厂类
 */
@SuppressWarnings("unused")
public class SnowflakeIdGeneratorFactory {

    /**
     * 生成器列表
     */
    private static final Map<String, SnowflakeIdGenerator> generators = new ConcurrentHashMap<>();


    /**
     * 私有构造函数,防止实例化
     */
    private SnowflakeIdGeneratorFactory() {
    }


    /**
     * 获取指定数据中心ID和工作节点ID的生成器实例
     *
     * @param dataCenterId 数据中心ID
     * @param workerId     工作节点ID
     * @return 雪花ID生成器实例
     */
    public static SnowflakeIdGenerator getGenerator(long dataCenterId, long workerId) {
        String key = dataCenterId + "-" + workerId;
        return generators.computeIfAbsent(key, k -> new SnowflakeIdGenerator(dataCenterId, workerId));
    }

    /**
     * 获取单个ID的生成器实例
     *
     * @param id 数值ID
     * @return 雪花ID生成器实例
     */
    public static SnowflakeIdGenerator getGenerator(int id) {
        int dataCenterId = Math.abs((id >> 16) % 31); // 取高16位计算
        int workerId = Math.abs((id & 0xFFFF) % 31);  // 取低16位计算
        return getGenerator(dataCenterId, workerId);
    }

    /**
     * 关闭并移除指定的生成器实例
     *
     * @param dataCenterId 数据中心ID
     * @param workerId     工作节点ID
     */
    public static SnowflakeIdGenerator removeGenerator(long dataCenterId, long workerId) {
        String key = dataCenterId + "-" + workerId;
        return generators.remove(key);
    }

    /**
     * 关闭并移除指定的生成器实例
     *
     * @param id 数值ID
     */
    public static SnowflakeIdGenerator removeGenerator(int id) {
        int dataCenterId = Math.abs((id >> 16) % 31); // 取高16位计算
        int workerId = Math.abs((id & 0xFFFF) % 31);  // 取低16位计算
        return removeGenerator(dataCenterId, workerId);
    }

    /**
     * 关闭所有生成器实例
     */
    public static void clear() {
        generators.clear();
    }
}

这部分编写个测试单元功能确认下:

package io.fortress.common.utils;

import io.quarkus.test.junit.QuarkusTest;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * io.fortress.common.utils.SnowflakeIdGeneratorFactoryTest.java
 * <p>
 * 雪花ID生成器测试单元
 */
@QuarkusTest
class SnowflakeIdGeneratorFactoryTest {

    /**
     * 日志句柄
     */
    Logger logger = LoggerFactory.getLogger(SnowflakeIdGeneratorFactoryTest.class);


    /**
     * 简单生成
     */
    @Test
    public void simpleGenerator() {
        // 创建雪花ID生成器实例(数据中心ID=1,工作节点ID=1)
        SnowflakeIdGenerator generator = SnowflakeIdGeneratorFactory.getGenerator(1, 1);

        // 生成10个ID并打印
        for (int i = 0; i < 10; i++) {
            long id = generator.nextId();
            logger.info("Snowflake Id : {}", id);

            // 解析ID中的信息
            logger.info("Timestamp : {}", generator.parseTimestamp(id));
            logger.info("Data Center ID : {}", generator.parseDataCenterId(id));
            logger.info("Worker Id: {}", generator.parseWorkerId(id));
            logger.info("Sequence: {}", generator.parseSequence(id));
            logger.info("================================================");
        }
    }

    /**
     * 从游戏创建角色来看, 数据中心ID和工作节点ID, 这两个值不能大于等于31, 也就值只能取 0~30
     */
    @Test
    public void gameGenerator() {
        // 微信1服, 但是日常服务器ID可能自动开服, 所以本身值很大
        int serverId = RandomUtils.insecure().randomInt(1, 10001);

        // 这里采用偏移填充, 把单纯 int 值切分成两个 16 位
        int dataCenterId = Math.abs((serverId >> 16) % 31); // 取高16位计算
        int workerId = Math.abs((serverId & 0xFFFF) % 31);  // 取低16位计算
        SnowflakeIdGenerator generator = SnowflakeIdGeneratorFactory.getGenerator(dataCenterId, workerId);
        logger.info("1:Snowflake Role Id : {}", generator.nextId()); // 得出最后角色ID
        logger.info("2:Snowflake Role Id : {}", generator.nextId()); // 得出最后角色ID
        logger.info("3:Snowflake Role Id : {}", generator.nextId()); // 得出最后角色ID
        logger.info("4:Snowflake Role Id : {}", generator.nextId()); // 得出最后角色ID
    }
}

Actor通用管理

Pekko 内部的 Actor 系统已经集成得够好了, 只是有些功能需要提取出来封装; 所以基于这种情况, 建议抽取单独的接口扩展功能来使用:

package io.fortress.common.utils;

import org.apache.pekko.actor.*;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.sharding.ClusterSharding;
import org.apache.pekko.cluster.sharding.ClusterShardingSettings;
import org.apache.pekko.cluster.sharding.ShardCoordinator;
import org.apache.pekko.cluster.sharding.ShardRegion;

import java.time.Duration;
import java.util.Optional;

/**
 * io.fortress.common.utils.ActorExtensions.java
 * <p>
 * Actor 扩展快捷功能, 提供给 AbstractActor 继承节点
 */
@SuppressWarnings("unused")
public interface ActorExtensions {

    // -----------------------------------------------------------
    //  定时器功能: Scheduler
    // -----------------------------------------------------------


    /**
     * 定时器任务的节点对象
     *
     * @param system
     * @param scheduler
     * @param cancellable
     */
    record SchedulerNode(
            ActorSystem system,
            Scheduler scheduler,
            Cancellable cancellable
    ) {

        @Override
        public String toString() {
            return "SchedulerNode{" +
                    "system=" + system.toString() +
                    ", scheduler=" + scheduler.toString() +
                    ", cancellable=" + cancellable.toString() +
                    '}';
        }
    }

    /**
     * 创建单次延迟执行任务 - 延迟后仅执行一次
     *
     * @param system   ActorSystem
     * @param delay    延迟时间
     * @param runnable 延迟任务
     * @return 定时器任务的节点对象
     */
    default SchedulerNode createScheduleOnce(ActorSystem system, Duration delay, Runnable runnable) {
        Scheduler scheduler = system.scheduler();
        Cancellable cancellable = scheduler.scheduleOnce(delay, runnable, system.dispatcher());
        return new SchedulerNode(system, scheduler, cancellable);
    }

    /**
     * 创建单次延迟执行任务 - 延迟后仅执行一次
     *
     * @param actor    Actor节点
     * @param delay    延迟时间
     * @param runnable 延迟任务
     * @param <T>      集成实现 AbstractActor
     * @return 定时器任务的节点对象
     */
    default <T extends AbstractActor> SchedulerNode createScheduleOnce(T actor, Duration delay, Runnable runnable) {
        return createScheduleOnce(actor.context().system(), delay, runnable);
    }


    /**
     * 创建固定频率任务 - 固定间隔触发(忽略任务耗时)
     *
     * @param system       ActorSystem
     * @param initialDelay 初始化执行延迟
     * @param interval     间隔延迟
     * @param runnable     延迟任务
     * @return 定时器任务的节点对象
     */
    default SchedulerNode createScheduledAtFixedRate(ActorSystem system, Duration initialDelay, Duration interval, Runnable runnable) {
        Scheduler scheduler = system.scheduler();
        Cancellable cancellable = scheduler.scheduleAtFixedRate(
                initialDelay,
                interval,
                runnable,
                system.dispatcher()
        );
        return new SchedulerNode(system, scheduler, cancellable);
    }

    /**
     * 创建固定频率任务 - 固定间隔触发(忽略任务耗时)
     *
     * @param actor        Actor节点
     * @param initialDelay 初始化执行延迟
     * @param interval     间隔延迟
     * @param runnable     延迟任务
     * @param <T>          集成实现 AbstractActor
     * @return 定时器任务的节点对象
     */
    default <T extends AbstractActor> SchedulerNode createScheduledAtFixedRate(T actor, Duration initialDelay, Duration interval, Runnable runnable) {
        return createScheduledAtFixedRate(actor.context().system(), initialDelay, interval, runnable);
    }

    /**
     * 创建固定频率任务 - 任务结束后固定延迟再触发
     *
     * @param system       ActorSystem
     * @param initialDelay 初始化执行延迟
     * @param interval     间隔延迟
     * @param runnable     延迟任务
     * @return 定时器任务的节点对象
     */
    default SchedulerNode createScheduleWithFixedDelay(ActorSystem system, Duration initialDelay, Duration interval, Runnable runnable) {
        Scheduler scheduler = system.scheduler();
        Cancellable cancellable = scheduler.scheduleWithFixedDelay(
                initialDelay,
                interval,
                runnable,
                system.dispatcher()
        );
        return new SchedulerNode(system, scheduler, cancellable);
    }


    /**
     * 创建固定频率任务 - 任务结束后固定延迟再触发
     *
     * @param actor        Actor节点
     * @param initialDelay 初始化执行延迟
     * @param interval     间隔延迟
     * @param runnable     延迟任务
     * @param <T>          集成实现 AbstractActor
     * @return 定时器任务的节点对象
     */
    default <T extends AbstractActor> SchedulerNode createScheduleWithFixedDelay(T actor, Duration initialDelay, Duration interval, Runnable runnable) {
        return createScheduleWithFixedDelay(actor.context().system(), initialDelay, interval, runnable);
    }


    // -----------------------------------------------------------
    //  简单集群功能: Cluster
    // -----------------------------------------------------------

    /**
     * 获取基础集群对象
     *
     * @param system ActorSystem
     * @return 基础集群对象
     */
    default Cluster cluster(ActorSystem system) {
        return Cluster.get(system);
    }


    /**
     * 从 Actor 对象提取集群
     *
     * @param actor Actor节点
     * @param <T>   集成实现 AbstractActor
     * @return 基础集群对象
     */
    default <T extends AbstractActor> Cluster cluster(T actor) {
        return cluster(actor.getContext().system());
    }

    /**
     * 集群节点加入时候回调
     *
     * @param system   ActorSystem
     * @param runnable 执行回调
     * @return 基础集群对象
     */
    default Cluster registerOnMemberUp(ActorSystem system, Runnable runnable) {
        Cluster cluster = cluster(system);
        cluster.registerOnMemberUp(runnable);
        return cluster;
    }

    /**
     * 集群节点加入时候回调
     *
     * @param actor    Actor节点
     * @param runnable 执行回调
     * @param <T>      集成实现 AbstractActor
     * @return 基础集群对象
     */
    default <T extends AbstractActor> Cluster registerOnMemberUp(T actor, Runnable runnable) {
        return registerOnMemberUp(actor.context().system(), runnable);
    }

    /**
     * 集群节点移除时候回调
     *
     * @param system   ActorSystem
     * @param runnable 执行回调
     * @return 基础集群对象
     */
    default Cluster registerOnMemberRemoved(ActorSystem system, Runnable runnable) {
        Cluster cluster = cluster(system);
        cluster.registerOnMemberRemoved(runnable);
        return cluster;
    }


    /**
     * 集群节点移除时候回调
     *
     * @param actor    Actor节点
     * @param runnable 执行回调
     * @param <T>      集成实现 AbstractActor
     * @return 基础集群对象
     */
    default <T extends AbstractActor> Cluster registerOnMemberRemoved(T actor, Runnable runnable) {
        return registerOnMemberRemoved(actor.context().system(), runnable);
    }


    /**
     * 集群节点加入和移除时候回调
     *
     * @param system ActorSystem
     * @param up     registerOnMemberUp 回调
     * @param down   registerOnMemberRemoved 回调
     * @return 基础集群对象
     */
    default Cluster registerOnMemberUpAndRemoved(ActorSystem system, Runnable up, Runnable down) {
        Cluster cluster = cluster(system);
        cluster.registerOnMemberUp(up);
        cluster.registerOnMemberRemoved(down);
        return cluster;
    }

    /**
     * 集群节点加入和移除时候回调
     *
     * @param actor Actor节点
     * @param up    registerOnMemberUp 回调
     * @param down  registerOnMemberRemoved 回调
     * @param <T>   集成实现 AbstractActor
     * @return 基础集群对象
     */
    default <T extends AbstractActor> Cluster registerOnMemberUpAndRemoved(T actor, Runnable up, Runnable down) {
        return registerOnMemberUpAndRemoved(actor.context().system(), up, down);
    }


    // -----------------------------------------------------------
    //  集群分片功能: ClusterSharding
    // -----------------------------------------------------------

    /**
     * 获取集群分片对象
     *
     * @param system ActorSystem
     * @return 集群分片对象
     */
    default ClusterSharding clusterSharding(ActorSystem system) {
        return ClusterSharding.get(system);
    }

    /**
     * 从 Actor 对象提取集群分片
     *
     * @param actor Actor节点
     * @param <T>   集成实现 AbstractActor
     * @return 集群分片对象
     */
    default <T extends AbstractActor> ClusterSharding clusterSharding(T actor) {
        return clusterSharding(actor.getContext().system());
    }


    // -----------------------------------------------------------
    //  集群分片服务: ClusterSharding
    // -----------------------------------------------------------


    /**
     * 集群分片的节点对象
     *
     * @param clusterSharding
     * @param actorRef
     */
    record ClusterShardingNode(
            ActorSystem system,
            ClusterSharding clusterSharding,
            ActorRef actorRef
    ) {

        /**
         * 格式化类的内容
         *
         * @return String
         */
        @Override
        public String toString() {
            return "ClusterShardingNode{" +
                    "system=" + system.toString() +
                    ", clusterSharding=" + clusterSharding.toString() +
                    ", actorRef=" + actorRef.toString() +
                    '}';
        }
    }


    /**
     * 创建 ClusterSharding 服务
     *
     * @param system             Actor System
     * @param typeName           集群分片名称
     * @param props              Actor 设置
     * @param settings           集群分片配置
     * @param extractor          消息提取器
     * @param strategy           自定义节点选举策略
     * @param handOffStopMessage 集群分片所有权发生转移(迁移发送的事件)
     * @return 集群分片的节点对象
     */
    default ClusterShardingNode startClusterShardingService(
            ActorSystem system, String typeName,
            Props props, ClusterShardingSettings settings,
            ShardRegion.MessageExtractor extractor,
            ShardCoordinator.ShardAllocationStrategy strategy,
            Object handOffStopMessage
    ) {
        ClusterSharding clusterSharding = clusterSharding(system);
        ActorRef clusterShardingRef = clusterSharding.start(typeName, props, settings, extractor, strategy, handOffStopMessage);
        return new ClusterShardingNode(system, clusterSharding, clusterShardingRef);
    }


    /**
     * 创建 ClusterSharding 服务
     *
     * @param actor              Actor 节点
     * @param typeName           集群分片名称
     * @param props              Actor 设置
     * @param settings           集群分片配置
     * @param extractor          消息提取器
     * @param strategy           自定义节点选举策略
     * @param handOffStopMessage 集群分片所有权发生转移(迁移发送的事件)
     * @param <T>                集成实现 AbstractActor
     * @return 集群分片的节点对象
     */
    default <T extends AbstractActor> ClusterShardingNode startClusterShardingService(
            T actor, String typeName,
            Props props, ClusterShardingSettings settings,
            ShardRegion.MessageExtractor extractor,
            ShardCoordinator.ShardAllocationStrategy strategy,
            Object handOffStopMessage
    ) {
        return startClusterShardingService(actor.getContext().system(), typeName, props, settings, extractor, strategy, handOffStopMessage);
    }


    /**
     * 创建 ClusterSharding 服务
     *
     * @param system    Actor System
     * @param typeName  集群分片名称
     * @param props     Actor 设置
     * @param settings  集群分片配置
     * @param extractor 消息提取器
     * @return 集群分片的节点对象
     */
    default ClusterShardingNode startClusterShardingService(
            ActorSystem system, String typeName,
            Props props, ClusterShardingSettings settings,
            ShardRegion.MessageExtractor extractor
    ) {
        ClusterSharding clusterSharding = clusterSharding(system);
        ActorRef clusterShardingRef = clusterSharding.start(typeName, props, settings, extractor);
        return new ClusterShardingNode(system, clusterSharding, clusterShardingRef);
    }

    /**
     * 创建 ClusterSharding 服务
     *
     * @param actor     Actor 节点
     * @param typeName  集群分片名称
     * @param props     Actor 设置
     * @param settings  集群分片配置
     * @param extractor 消息提取器
     * @param <T>       集成实现 AbstractActor
     * @return 集群分片的节点对象
     */
    default <T extends AbstractActor> ClusterShardingNode startClusterShardingService(
            T actor, String typeName,
            Props props, ClusterShardingSettings settings,
            ShardRegion.MessageExtractor extractor
    ) {
        return startClusterShardingService(actor.getContext().system(), typeName, props, settings, extractor);
    }


    /**
     * 创建 ClusterSharding 服务
     *
     * @param system    Actor System
     * @param typeName  集群分片名称
     * @param props     Actor 设置
     * @param extractor 消息提取器
     * @return 集群分片的节点对象
     */
    default ClusterShardingNode startClusterShardingService(
            ActorSystem system, String typeName,
            Props props, ShardRegion.MessageExtractor extractor
    ) {
        ClusterSharding clusterSharding = clusterSharding(system);
        ClusterShardingSettings settings = ClusterShardingSettings.create(system);
        return startClusterShardingService(system, typeName, props, settings, extractor);
    }


    /**
     * 创建 ClusterSharding 服务
     *
     * @param actor     Actor 节点
     * @param typeName  集群分片名称
     * @param props     Actor 设置
     * @param extractor 消息提取器
     * @param <T>       集成实现 AbstractActor
     * @return 集群分片的节点对象
     */
    default <T extends AbstractActor> ClusterShardingNode startClusterShardingService(
            T actor, String typeName,
            Props props, ShardRegion.MessageExtractor extractor
    ) {
        return startClusterShardingService(actor.context().system(), typeName, props, extractor);
    }

    /**
     * 创建 ClusterSharding 代理
     *
     * @param system    Actor System
     * @param typeName  集群分片名称
     * @param role      集群参与角色
     * @param extractor 消息提取器
     * @return 集群分片的节点对象
     */
    default ClusterShardingNode startClusterShardingProxy(ActorSystem system, String typeName, String role, ShardRegion.MessageExtractor extractor) {
        ClusterSharding clusterSharding = clusterSharding(system);
        ActorRef actorRef = clusterSharding.startProxy(typeName, Optional.of(role), extractor);
        return new ClusterShardingNode(system, clusterSharding, actorRef);
    }


    /**
     * 创建 ClusterSharding 代理
     *
     * @param actor     Actor 节点
     * @param typeName  集群分片名称
     * @param role      集群参与角色
     * @param extractor 消息提取器
     * @param <T>       集成实现 AbstractActor
     * @return 集群分片的节点对象
     */
    default <T extends AbstractActor> ClusterShardingNode startClusterShardingProxy(T actor, String typeName, String role, ShardRegion.MessageExtractor extractor) {
        return startClusterShardingProxy(actor.getContext().system(), typeName, role, extractor);
    }


    /**
     * 创建 ClusterSharding 代理
     *
     * @param system    Actor System
     * @param typeName  集群分片名称
     * @param extractor 消息提取器
     * @return 集群分片的节点对象
     */
    default ClusterShardingNode startClusterShardingProxy(ActorSystem system, String typeName, ShardRegion.MessageExtractor extractor) {
        ClusterSharding clusterSharding = clusterSharding(system);
        ActorRef actorRef = clusterSharding.startProxy(typeName, Optional.empty(), extractor);
        return new ClusterShardingNode(system, clusterSharding, actorRef);
    }


    /**
     * 创建 ClusterSharding 代理
     *
     * @param actor     Actor 节点
     * @param typeName  集群分片名称
     * @param extractor 消息提取器
     * @param <T>       集成实现 AbstractActor
     * @return 集群分片的节点对象
     */
    default <T extends AbstractActor> ClusterShardingNode startClusterShardingProxy(T actor, String typeName, ShardRegion.MessageExtractor extractor) {
        return startClusterShardingProxy(actor.getContext().system(), typeName, extractor);
    }
}

这里就是把冗长的功能封装成通用接口, 后续只需要实现这些接口就能挂载对应简化功能实现, 这里追加个测试单元:

package io.fortress.common.utils;

import io.fortress.common.actor.ActorMessageExtractor;
import io.fortress.common.config.ActorSystemConfiguration;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.common.constraint.Assert;
import io.smallrye.config.ConfigMapping;
import jakarta.inject.Inject;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.ClusterEvent;
import org.apache.pekko.cluster.sharding.ClusterSharding;
import org.apache.pekko.cluster.sharding.ClusterShardingSettings;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;

/**
 * io.fortress.common.utils.ActorExtensionsTest.java
 * <p>
 * Actor扩展功能实现
 */
@QuarkusTest
class ActorExtensionsTest {

    /**
     * 日志对象
     */
    static final Logger logger = LoggerFactory.getLogger(ActorExtensionsTest.class);


    /**
     * 扩展 Actor 系统配置接口
     */
    @ConfigMapping(prefix = "fortress.actor")
    interface ActorSystemConfigurationExt extends ActorSystemConfiguration {
        // 后续 Actor 参数需要扩展就在这里追加方法
    }


    /**
     * 加载测试单元的 application.properties 配置
     */
    @Inject
    ActorSystemConfigurationExt configuration;


    /**
     * 追加扩展功能 Actor 服务
     */
    public static class LocalCluster extends AbstractActor implements ActorExtensions {

        /**
         * 基础集群
         */
        final Cluster cluster = cluster(getContext().getSystem());

        /**
         * 分片集群
         */
        final ClusterSharding clusterSharding = clusterSharding(getContext().getSystem());


        /**
         * 启动前回调
         *
         * @throws Exception 默认异常
         */
        @Override
        public void preStart() throws Exception {
            super.preStart();

            // 注册需要监听的系统事件
            cluster.subscribe(getSelf(),
                    ClusterEvent.initialStateAsEvents(),
                    ClusterEvent.UnreachableMember.class, // 集群节点成员不可用
                    ClusterEvent.ReachableMember.class // 集群节点成员可用
            );
        }

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(ClusterEvent.ReachableMember.class, (event) -> {
                        // 失效可用的节点是自己, 可以需要启动某些功能
                        if (cluster.selfMember().equals(event.member())) {
                            logger.info("Received reachable member from cluster: {}", event.member());
                        }
                    })
                    .match(ClusterEvent.UnreachableMember.class, (event) -> {
                        // 失效不可用的节点是自己, 可以需要断开目前的节点的某些功能
                        if (cluster.selfMember().equals(event.member())) {
                            logger.info("Received unreachable member from cluster: {}", event.member());
                        }
                    })
                    .build();
        }
    }

    /**
     * 集群服务
     */
    @Test
    void cluster() {
        ActorSystem system = configuration.createActorSystem();
        Assert.assertNotNull(system);

        // 基础集群对象和分片集群
        Cluster cluster = Cluster.get(system);
        ClusterSharding clusterSharding = ClusterSharding.get(system);
        ClusterShardingSettings clusterShardingSettings = ClusterShardingSettings.create(system);

        // 只有节点加入成功才需要挂载服务
        cluster.registerOnMemberUp(() -> {

            // 启动集群分片
            clusterSharding.start(
                    "LocalCluster",
                    Props.create(LocalCluster.class),
                    clusterShardingSettings,
                    new ActorMessageExtractor(1, Collections.emptyList(), "local-cluster-")
            );
            logger.info("Actor registered");
        });

        // 节点被移除的时候需要准备些处理
        cluster.registerOnMemberRemoved(() -> logger.info("Actor removed"));

    }
}

字段签名

有时候需要将 {AAA=BBB,CCC=DDD} 的结构需要按照 KEY 标识正序|倒序拼凑排列成 AAA=BBB&CCC=DDD 方式; 最后加入 secure 得出字段的 MD5 哈希值, 这就是字段的合法性 签名(Sign), 这种方式适合简单的参数传递验证:

package io.fortress.common.utils;


import org.apache.commons.codec.digest.DigestUtils;

import java.util.*;

/**
 * io.fortress.common.utils.ParamValidator.java
 * <p>
 * 参数验证器工具
 */
@SuppressWarnings("unused")
public class ParamValidator {

    /**
     * 不允许外部实例化
     */
    private ParamValidator() {
        // ignore
    }

    /**
     * 获取参数签名字符串
     *
     * @param params     参数列表
     * @param secure     签名字符串
     * @param comparator 参数排序的序列, Comparator.naturalOrder() 正序, Comparator.reverseOrder() 倒序
     * @return 参数列表的MD5
     */
    public static String signParamsString(final Map<String, String> params, final String secure, final Comparator<? super String> comparator) {
        Map<String, String> sortedParams = new TreeMap<>(comparator);
        sortedParams.putAll(params);

        // 获取字段列表
        List<String> fields = new ArrayList<>(params.size());
        for (Map.Entry<String, String> entry : sortedParams.entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            if (key != null && value != null) {
                fields.add("%s=%s".formatted(entry.getKey(), value));
            }
        }

        // 合并成 x1=x2&y1=y2, 并参与加密
        String paramsSign = String.join("&", fields);
        if (secure != null && !secure.isBlank()) {
            paramsSign = paramsSign + secure;
        }
        return paramsSign;
    }

    /**
     * 参数签名
     *
     * @param params     参数列表
     * @param secure     签名字符串
     * @param comparator 参数排序的序列, Comparator.naturalOrder() 正序, Comparator.reversedOrder() 倒序
     * @return 参数列表的MD5
     */
    public static String signParams(final Map<String, String> params, final String secure, final Comparator<? super String> comparator) {
        // 采用 org.apache.commons.codec 库的 MD5 工具哈希去处理
        return DigestUtils.md5Hex(signParamsString(params, secure, comparator));
    }

    /**
     * 默认正序列字段签名
     *
     * @param params 参数列表
     * @param secure 签名字符串
     * @return 参数列表的MD5
     */
    public static String signParams(final Map<String, String> params, final String secure) {
        return signParams(params, secure, Comparator.naturalOrder());
    }


    /**
     * 默认正序列字段签名
     *
     * @param params 参数列表
     * @return 参数列表的MD5
     */
    public static String signParams(final Map<String, String> params) {
        return signParams(params, "", Comparator.naturalOrder());
    }
}

这里追加测试单元来实验下最后结果:

package io.fortress.common.utils;

import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Comparator;
import java.util.Map;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.*;

/**
 * io.fortress.common.utils.ParamValidatorTest.java
 * <p>
 * 参数验证器单元测试
 */
@QuarkusTest
class ParamValidatorTest {

    /**
     * 日志对象
     */
    final Logger logger = LoggerFactory.getLogger(ParamValidatorTest.class);

    /**
     * 测试签名
     */
    @Test
    public void sign() {
        String startString = "first String";
        String endString = "this is end";
        String secure = "test-key";

        // 获取正序哈希字符串
        String naturalParamsString = ParamValidator.signParamsString(Map.of(
                "sid", "1",
                "uid", "10000",
                "zzz", endString,
                "token", UUID.randomUUID().toString(),
                "aaa", startString
        ), secure, Comparator.naturalOrder());
        assertNotNull(naturalParamsString);
        assertTrue(naturalParamsString.startsWith("aaa=%s".formatted(startString)));
        assertTrue(naturalParamsString.endsWith("zzz=%s%s".formatted(endString, secure)));
        logger.info("naturalOrder paramsString: {}", naturalParamsString);

        // 获取正序哈希签名
        String naturalSign = ParamValidator.signParams(Map.of(
                "sid", "1",
                "uid", "10000",
                "zzz", endString,
                "token", UUID.randomUUID().toString(),
                "aaa", startString
        ), secure, Comparator.naturalOrder());
        logger.info("naturalOrder sign: {}", naturalSign);


        // 获取倒序哈希字符串
        String reverseParamsString = ParamValidator.signParamsString(Map.of(
                "sid", "1",
                "uid", "10000",
                "zzz", endString,
                "token", UUID.randomUUID().toString(),
                "aaa", startString
        ), secure, Comparator.reverseOrder());
        assertNotNull(reverseParamsString);
        logger.info("reverseOrder paramsString: {}", reverseParamsString);


        // 获取倒序哈希签名
        String reverseSign = ParamValidator.signParams(Map.of(
                "sid", "1",
                "uid", "10000",
                "zzz", endString,
                "token", UUID.randomUUID().toString(),
                "aaa", startString
        ), secure, Comparator.reverseOrder());
        logger.info("reverseOrder sign: {}", reverseSign);
    }
}

测试单元最后的打印内容:

2025-09-07 12:24:38,312 INFO  [io.for.com.uti.ParamValidatorTest] (main) naturalOrder paramsString: aaa=first String&sid=1&token=9aa325d2-0e5d-44ee-88ae-cdf85ffb03af&uid=10000&zzz=this is endtest-key
2025-09-07 12:24:38,314 INFO  [io.for.com.uti.ParamValidatorTest] (main) naturalOrder sign: 57b2a9340fb35843ca29937e37e8e17e
2025-09-07 12:24:38,314 INFO  [io.for.com.uti.ParamValidatorTest] (main) reverseOrder paramsString: zzz=this is end&uid=10000&token=5ba325fc-a58f-40fa-a81b-65666cc72edb&sid=1&aaa=first Stringtest-key
2025-09-07 12:24:38,315 INFO  [io.for.com.uti.ParamValidatorTest] (main) reverseOrder sign: 53bbcb6bff1c1dac35722341ee80989a

MessageExtractor

消息提取器之前实现过一版, 采用默认取哈希取模算法处理, 其中 protobuf 格式如下:

syntax = "proto3";

// 其他略

// 会话响应的消息内容: 服务端→客户端
message S2CMessageEvent{
  string sessionId = 1; // 会话ID, 授权之后的玩家ID
  int64 timestamp = 2; // 发起时间戳
  int32 messageId = 3; // 内容ID
  bytes messageData = 4; // 会话二进制内容
}

可以看到 sessionId 作为标识玩家角色ID采用 string 类型, 这实际上是很冗余的设计(但通用化还行); string 类型本身会生成为 长度标识 + 内容的UTF8Bytes 的传输结构, 相比直接 int64 性能在高频率传输时候影响最大.

另外作为数据库(MySQL)入库, 也尽可能采用 BIGINT 类型帮助其加速查询等

基于传输效率出发应该采用 int64 而非 string, 这时候消息提取器那边也能直接获取 long 来处理取模, 而不用针对数据的 String 类型提取出来做哈希编码转 int32 再去取模.

不过问题不止这样, 可以看到消息提取器采用取模是硬编码在自己的 MessageExtractor 之中, 如果某一天因为项目变动需要切换新的算法来做数据分片传输呢? 重新写一个 MessageExtractor 然后项目全部替换掉吗?

可以看到里面现在就出现问题了, 切换算法都要去所有涉及地方手动修改, 而不是直接在某些地方切换配置生效

这里就需要将内部业务提取出来分成以下模块:

  • 算法功能抽象接口
  • 生成功能实现工厂
  • 通用配置提取

算法抽象接口如下, 后面的算法可以直接追加该实现接口:

package io.fortress.common.message;

/**
 * io.fortress.common.message.MessageExtractorAlgorithm.java
 * <p>
 * 基础集群消息提取算法接口
 */
public interface MessageExtractorAlgorithm {

    /**
     * 提取集群实体ID
     *
     * @param message 传递过来的消息对象
     * @return 获取集群实体标识
     */
    String entityId(Object message);

    /**
     * 提取集群消息格式, 默认直接对象返回
     *
     * @param message 传递过来的消息对象
     * @return 获取集群实体消息结构
     */
    default Object entityMessage(Object message) {
        return message;
    }

    /**
     * 提取集群分片ID(有的分片采用玩家ID做标识, 最后的值为 `int64`)
     *
     * @param message 传递过来的消息对象
     * @return 按照算法生成分片ID
     */
    Long shardId(Object message);
}

通用的消息提取器, 继承实现默认的 pekko 消息提取器功能:

package io.fortress.common.config;

import io.fortress.common.message.MessageExtractorAlgorithm;
import io.fortress.common.message.MessageShardingExtractor;
import io.fortress.common.protobuf.Events;
import io.smallrye.config.WithDefault;

import java.util.List;

/**
 * io.fortress.common.config.ActorExtractorConfiguration.java
 * <p>
 * 加载 Actor Extractor 配置, 用于传递给 pekko 的配置项
 */
@SuppressWarnings("unused")
public interface ActorExtractorConfiguration {


    /**
     * 系统消息包装:C2SConnectedEvent
     */
    record C2SConnectedEventExtractor(

    ) implements MessageShardingExtractor.Extractor {

        @Override
        public Object getEntityId(Object message) {
            if (message instanceof Events.C2SConnectedEvent event) {
                event.getSessionId();
            }
            return null;
        }

        @Override
        public Object getShardId(Object message) {
            if (message instanceof Events.C2SConnectedEvent event) {
                event.getSessionId();
            }
            return null;
        }
    }

    /**
     * 系统消息包装:C2SDisconnectEvent
     */
    record C2SDisconnectEventExtractor(

    ) implements MessageShardingExtractor.Extractor {

        @Override
        public Object getEntityId(Object message) {
            if (message instanceof Events.C2SDisconnectEvent event) {
                return event.getSessionId();
            }
            return null;
        }

        @Override
        public Object getShardId(Object message) {
            if (message instanceof Events.C2SDisconnectEvent event) {
                return event.getSessionId();
            }
            return null;
        }
    }


    ///  以上内容到时候会抽取到特定位置放置, 目前仅测试用
    ///  -------------------------------------------------------

    /**
     * 是否加载系统事件
     */
    @WithDefault("true")
    boolean useEvents();

    /**
     * 默认分片前缀
     */
    @WithDefault("default-")
    String prefix();


    /**
     * 生成扩展消息提取器
     *
     * @param algorithm  消息算法
     * @param extractors 消息拦截器
     * @return Actor 消息提取器
     */
    default MessageShardingExtractor<?> createMessageShardingExtractor(MessageExtractorAlgorithm algorithm, List<MessageShardingExtractor.Extractor> extractors) {
        if (useEvents()) {
            // todo: 后续扩展
            extractors.add(new C2SConnectedEventExtractor());
            extractors.add(new C2SDisconnectEventExtractor());
        }

        // 生成消息提取器
        String prefix = prefix();
        return new MessageShardingExtractor<>(
                algorithm,
                extractors,
                prefix
        );
    }

}

顺便提供个配置生成功能对象方便底层去实现注入扩展:

package io.fortress.common.config;

import io.fortress.common.message.MessageExtractorAlgorithm;
import io.fortress.common.message.MessageShardingExtractor;

import java.util.List;

/**
 * io.fortress.common.config.ActorExtractorConfiguration.java
 * <p>
 * 加载 Actor Extractor 配置, 用于传递给 pekko 的配置项
 */
@SuppressWarnings("unused")
public interface ActorExtractorConfiguration {


    /**
     * 生成扩展消息提取器
     *
     * @param algorithm  消息算法
     * @param extractors 消息拦截器
     * @param prefix     分片前缀
     * @return Actor 消息提取器
     */
    default MessageShardingExtractor<?> createMessageShardingExtractor(MessageExtractorAlgorithm algorithm, List<MessageShardingExtractor.Extractor> extractors, String prefix) {
        return new MessageShardingExtractor<>(
                algorithm,
                extractors,
                prefix
        );
    }
}

这里提供个 取模MurMur3 的算法实现, 用于提供怎么自定义的封装.

MurMur3 算法分片

package io.fortress.common.message.impl;

import io.fortress.common.message.MessageExtractorAlgorithm;
import org.apache.commons.codec.digest.MurmurHash3;

import java.nio.charset.StandardCharsets;

/**
 * io.fortress.common.message.impl
 * <p>
 * 采用 org.apache.commons.codec.digest.Mur3Mur3 集群分片算法实现
 */
@SuppressWarnings("unused")
public class MurMur3ExtractorAlgorithm implements MessageExtractorAlgorithm {

    /**
     * MurMur3哈希Seed定义, 默认为0
     */
    protected final int hashSeed;

    /**
     * 构造方法
     *
     * @param hashSeed 哈希种子
     */
    public MurMur3ExtractorAlgorithm(int hashSeed) {
        if (hashSeed <= 0) throw new IllegalArgumentException("hashSeed must be greater than 0");
        this.hashSeed = hashSeed;
    }

    /**
     * 构造方法
     */
    public MurMur3ExtractorAlgorithm() {
        this.hashSeed = 0;
    }

    /**
     * 实体ID其实可以不用参与判断
     *
     * @param message 传递过来的消息对象
     * @return 实体ID标识
     */
    @Override
    public String entityId(Object message) {
        if (message instanceof String msg && !msg.isEmpty()) {
            return msg;
        }
        return null;
    }

    /**
     * 分片ID这时候就要开始判断将字符串转为 murmur3 值
     *
     * @param message 传递过来的消息对象
     * @return 分片ID标识
     */
    @Override
    public Long shardId(Object message) {
        if (message instanceof String msg && !msg.isEmpty()) {
            byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
            long[] murmur3Hash = MurmurHash3.hash128x64(bytes, 0, bytes.length, hashSeed);
            return murmur3Hash[0];
        }
        return null;
    }
}

取模 算法分片

package io.fortress.common.message.impl;

import io.fortress.common.message.MessageExtractorAlgorithm;

/**
 * io.fortress.common.message.impl.ModulusExtractorAlgorithm.java
 * <p>
 * 取模算法实现
 */
public class ModulusExtractorAlgorithm implements MessageExtractorAlgorithm {

    /**
     * 分片总数
     */
    protected final int numberOfShards;

    /**
     * 构造方法
     *
     * @param numberOfShards 分片总数
     */
    public ModulusExtractorAlgorithm(int numberOfShards) {
        if (numberOfShards <= 0) throw new IllegalArgumentException("numberOfShards must be greater than 0");
        this.numberOfShards = numberOfShards;
    }


    /**
     * 实体ID其实可以不用参与判断
     *
     * @param message 传递过来的消息对象
     * @return 实体ID标识
     */
    @Override
    public String entityId(Object message) {
        if (message instanceof String msg && !msg.isEmpty()) {
            return msg;
        }
        return null;
    }

    /**
     * 分片ID这时候就要开始判断将字符串转为取模值
     *
     * @param message 传递过来的消息对象
     * @return 分片ID标识
     */
    @Override
    public Long shardId(Object message) {
        if (message instanceof Long value) {
            return value % numberOfShards;
        }
        return null;
    }
}