集群实现
这篇章开始设计简单的集群功能, 方便后续的业务开发.
在开始设计集群先说明下 common 项目之前考虑的点: common 通用包里面要不要加入 pekko-cluster-sharding 依赖?
- 引入之后就
pekko强绑定了 - 不引入没有办法设计通用的
消息提取器(MessageExtractor).
而由于之前切换成 Protobuf 序列化功能, 所以内部消息定义可能需要额外做处理,
最终考虑再三之后将 pekko-cluster-sharding 移交到 common 做通用的第三方依赖.
这里在通用 common 设计个取模消息提取器方便后续做分片计算:
package io.fortress.common.actor;
import io.fortress.common.protobuf.Events;
import org.apache.commons.codec.digest.MurmurHash3;
import org.apache.pekko.cluster.sharding.ShardRegion;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
/**
* io.fortress.common.actor.ModuloMessageExtractor.java
* <p>
* 取模消息提取器
*/
@SuppressWarnings("unused")
public class ModuloMessageExtractor implements ShardRegion.MessageExtractor {
/**
* 分片总数
*/
protected final int numberOfShards;
/**
* MurMur3哈希Seed定义, 默认为0
*/
protected final int hashSeed;
/**
* 生成分片ID回调
*/
protected final Function<Integer, String> createShardId;
/**
* 构造方法
*/
public ModuloMessageExtractor(int numberOfShards, Function<Integer, String> createShardId, int hashSeed) {
this.numberOfShards = numberOfShards;
this.createShardId = createShardId;
this.hashSeed = hashSeed;
}
/**
* 构造方法
*/
public ModuloMessageExtractor(int numberOfShards, Function<Integer, String> createShardId) {
this(numberOfShards, createShardId, 0);
}
/**
* 构造方法
*/
public ModuloMessageExtractor(int numberOfShards, String prefix, int hashSeed) {
this(numberOfShards, (integer -> prefix + integer), hashSeed);
}
/**
* 构造方法
*/
public ModuloMessageExtractor(int numberOfShards, String prefix) {
this(numberOfShards, (integer -> prefix + integer), 0);
}
/**
* 从消息中提取分片的实体ID
* 下面就是为什么要将 protobuf 的 sessionId 定义为 string 的原因之一
*/
@Override
public String entityId(Object message) {
// 这部分后面可能需要优化处理下, 基本约等于将事件硬编码到这里识别
// 感觉这里配置不是那么灵活
if (message instanceof Events.C2SConnectedEvent event) {
return event.getSessionId();
} else if (message instanceof Events.C2SDisconnectEvent event) {
return event.getSessionId();
} else if (message instanceof Events.C2SMessageEvent event) {
return event.getSessionId();
} else if (message instanceof Events.S2CDisconnectEvent event) {
return event.getSessionId();
} else if (message instanceof Events.S2CMessageEvent event) {
return event.getSessionId();
}
return null;
}
/**
* 提取实际要发送给实体的消息
*/
@Override
public Object entityMessage(Object message) {
return message;
}
/**
* 基于实体ID的哈希值取模计算分片ID
*/
@Override
public String shardId(Object message) {
String entityId = entityId(message);
if (entityId == null) return null;
// 采用murmur3将哈希成 uint32 值, 确保哈希值最后得出为正数
byte[] bytes = entityId.getBytes(StandardCharsets.UTF_8);
int hash = Math.abs(MurmurHash3.hash32x86(bytes, 0, bytes.length, hashSeed));
// 计算分片抽取, 确保结果在[0, numberOfShards-1]范围内
Integer shared = hash % numberOfShards;
return createShardId.apply(shared);
}
}
内部采用了 MurMur3 提取的 uint32 再取模换算出具体的分片ID, 这里添加个测试单元确认代码无误:
package io.fortress.common.actor;
import io.fortress.common.protobuf.Events;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.common.constraint.Assert;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* io.fortress.common.actor.ModuloMessageExtractorTest.java
* <p>
* 取模集群分片算法
*/
@QuarkusTest
class ModuloMessageExtractorTest {
/**
* 日志句柄
*/
private static final Logger log = LoggerFactory.getLogger(ModuloMessageExtractorTest.class);
/**
* 取模分片算法
*/
private final ModuloMessageExtractor extractor = new ModuloMessageExtractor(
10,
integer -> "shared-" + integer,
0
);
/**
* 提取集群分片的
*/
@Test
void entityId() {
// 生成 C2SConnectedEvent 事件结构
Events.C2SConnectedEvent c2SConnectedEvent = Events.C2SConnectedEvent
.newBuilder()
.setSessionId("1000001")
.setTimestamp(System.currentTimeMillis())
.setIp("127.0.0.1")
.build();
var entityId = extractor.entityId(c2SConnectedEvent);
Assert.assertNotNull(entityId);
log.info("entityId: {}", entityId);
}
/**
* 检索分片ID
*/
@Test
void shardId() {
// 随机生成批量消息事件
for (int i = 0; i < 5; i++) {
// 生成 C2SDisconnectEvent 事件结构
Events.C2SDisconnectEvent c2SDisconnectEvent = Events.C2SDisconnectEvent
.newBuilder()
.setSessionId(String.valueOf(RandomUtils.insecure().randomLong(100000, 900000)))
.setTimestamp(System.currentTimeMillis())
.build();
// 检索分片ID
var shardId = extractor.shardId(c2SDisconnectEvent);
Assert.assertNotNull(shardId);
log.info("shardId: {}", shardId);
}
}
}
实际上之前就想用 int32/int64 替代掉 sessionId 的 string 类型传输, 后面发现底层很多哈希路由很多依赖于 String 做处理,
最后没什么太多选择只能将对应的标识改由 string 传递.
现在前期集群算法的编写工作已经完成, 之后就是开始设计挂载集群服务了.
集群服务
首先确保创建 cluster 子模块, 内部的 pom.xml 配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- 父依赖 -->
<parent>
<groupId>io.fortress</groupId>
<artifactId>boot</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<!-- 子属性 -->
<modelVersion>4.0.0</modelVersion>
<artifactId>fortress-cluster</artifactId>
<name>Fortress Cluster Module</name>
<description>Fortress cluster Core</description>
<packaging>jar</packaging>
<!-- 第三方依赖 -->
<dependencies>
<!-- 通用全局库 -->
<dependency>
<groupId>io.fortress</groupId>
<artifactId>fortress-common</artifactId>
</dependency>
<!-- Protobuf 核心依赖 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
</dependencies>
</project>
因为集群本身就是启动的应用, 所以需要定义应用入口:
package io.fortress.cluster;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.annotations.QuarkusMain;
/**
* io.fortress.cluster.ClusterApplication.java
* <p>
* 项目启动入口
* 参考文档: <a href="https://cn.quarkus.io/guides/lifecycle">应用程序的初始化和终止</a>
*/
@QuarkusMain
public class ClusterApplication {
/**
* 程序启动入口
*
* @param args 参数列表
*/
public static void main(String[] args) {
Quarkus.run(args);
}
}
还要有全局通用的 Actor System, 需要添加扩展的配置接口和全局对象注入:
package io.fortress.cluster.config;
import io.fortress.common.config.ActorSystemConfiguration;
import io.smallrye.config.ConfigMapping;
/**
* io.fortress.cluster.config.ClusterActorSystemConfiguration.java
* <p>
* 重载通用配置
*/
@ConfigMapping(prefix = "fortress.actor")
public interface ClusterActorSystemConfiguration extends ActorSystemConfiguration {
}
Actor System 通用系统基本和 websocket 那边差不多, 只是追加了节点启动之后绑定集群业务的逻辑:
package io.fortress.cluster;
import io.fortress.cluster.config.ClusterActorSystemConfiguration;
import io.fortress.cluster.session.SessionManager;
import io.fortress.common.actor.ModuloMessageExtractor;
import io.quarkus.arc.DefaultBean;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.sharding.ClusterSharding;
import org.apache.pekko.cluster.sharding.ClusterShardingSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* io.fortress.cluster.ClusterActorSystem.java
* <p>
* 加载全局初始化 ActorSystem
*/
@ApplicationScoped
public class ClusterActorSystem {
/**
* 日志对象
*/
final Logger logger = LoggerFactory.getLogger(ClusterActorSystem.class);
/**
* 加载配置
*/
@Inject
ClusterActorSystemConfiguration configuration;
/**
* 生成全局唯一Actor系统
*/
@Produces
@Startup
@DefaultBean
@ApplicationScoped
public ActorSystem createActorSystem() {
// 打印配置
logger.info("Creating ActorSystem: {}", configuration.name());
configuration.settings().forEach((key, value) -> logger.info("ActorSystem Setting: {} = {}", key, value));
// 生成 Actor System 并注册系统退出方法, 当 Actor 系统退出的时候要求关闭整个服务
ActorSystem system = configuration.createActorSystem((terminatedTry -> {
logger.info("Close ActorSystem, Status: {}", terminatedTry.isSuccess() ? "Success" : "Failure");
// 无论成功失败都关闭 Quarkus 系统
Quarkus.asyncExit();
return null;
}));
// 确认是否加入集群, 加入集群的同时需要启动集群分片
Cluster cluster = Cluster.get(system);
cluster.registerOnMemberUp(() -> {
logger.info("Joined Cluster: {}", system.name());
// todo: 创建集群分片业务, 后面这里可能要生成动态的配置加载
// 设定会话集群对象: Session
ClusterSharding clusterSharding = ClusterSharding.get(system);
ClusterShardingSettings settings = ClusterShardingSettings.create(system);
ModuloMessageExtractor extractor = new ModuloMessageExtractor(5, "session-");
ActorRef sessionActorRef = clusterSharding.start(
"session",
SessionManager.props,
settings,
extractor
);
logger.info("Create Cluster Sharding: {}", sessionActorRef.path().address());
// 对于游戏服务端来说, 除了玩家自身会话之外, 可能还有涉及到其他类型分片业务
// - clusterSharding.start("chat",....) // 聊天系统
// - clusterSharding.start("rank",....) // 排行系统
// - clusterSharding.start("battle",....) // 战斗系统
// - clusterSharding.start("achieve",....) // 成就系统
// 这部分系统就是最开我担心的, 单纯的 ModuloMessageExtractor 系统已经无法覆盖到其中分片
// 不过目前还没用到的相关, 可以先处理玩家个人的消息集群分片业务
});
// 离开集群的时候的回调, 这里可以通知该集群下面的分片做些处理
cluster.registerOnMemberRemoved(() -> {
logger.info("Removed Cluster: {}", system.name());
});
return system;
}
}
这里简单实现个 会话管理器 功能就行了, 后面再一起扩展出来不同的业务玩法:
package io.fortress.cluster.session;
import io.fortress.common.protobuf.Events;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.Props;
import org.apache.pekko.event.Logging;
import org.apache.pekko.event.LoggingAdapter;
/**
* io.fortress.cluster.session.SessionManager.java
* <p>
* 会话集群Actor
*/
public class SessionManager extends AbstractActor {
public static Props props = Props.create(SessionManager.class);
final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);
@Override
public Receive createReceive() {
return receiveBuilder().match(Events.C2SConnectedEvent.class, event -> {
logger.info("Received C2SConnectedEvent: {}", event.toString());
}).match(Events.S2CDisconnectEvent.class, event -> {
logger.info("Received S2CDisconnectEvent: {}", event.toString());
}).build();
}
}
最后配置文件 application.properties 编写内容:
# Actor 集群配置
# 参考: pekko-cluster_2.13-1.1.5.jar!\reference.conf 内部配置重载
fortress.actor.name=fortress-cluster
# 直接角色配置项声明即可
fortress.actor.settings.pekko.cluster.roles.0=fortress-cluster
fortress.actor.settings.pekko.cluster.roles.1=fortress-game
fortress.actor.settings.pekko.cluster.roles.2=fortress-center
# 重载 pekko 配置项目 - 基础 Actor 设置
fortress.actor.settings.pekko.actor.provider=cluster
fortress.actor.settings.pekko.remote.artery.enabled=on
fortress.actor.settings.pekko.remote.artery.canonical.hostname=127.0.0.1
fortress.actor.settings.pekko.remote.artery.canonical.port=2551
# Actor 序列化处理
fortress.actor.settings.pekko.actor.serializers.proto=org.apache.pekko.remote.serialization.ProtobufSerializer
fortress.actor.settings.pekko.actor.serialization-bindings."com.google.protobuf.Message"=proto
# 重载 pekko 配置项目 - 集群节点 Actor(注意:自己最好在首个节点)
fortress.actor.settings.pekko.cluster.seed-nodes.0=pekko://${fortress.actor.name}@127.0.0.1:2551
#fortress.actor.settings.pekko.cluster.seed-nodes.1=pekko://${fortress.actor.name}@127.0.0.1:2552
# 重载 pekko 配置项目 - 集群 Actor 决策配置
fortress.actor.settings.pekko.cluster.downing-provider-class=org.apache.pekko.cluster.sbr.SplitBrainResolverProvider
fortress.actor.settings.pekko.cluster.split-brain-resolver.active-strategy=keep-majority
启动目前是没有问题, 但是目前并没有具体的业务逻辑处理, 也没有相关的数据加载传输, 下面就是打印的日志内容:
2025-09-04 18:56:19,154 INFO [io.for.clu.ClusterActorSystem] (fortress-cluster-pekko.actor.default-dispatcher-17) Joined Cluster: fortress-cluster
2025-09-04 18:56:19,170 INFO [io.for.clu.ClusterActorSystem] (fortress-cluster-pekko.actor.default-dispatcher-17) Create Cluster Sharding: pekko://fortress-cluster
2025-09-04 18:56:19,173 INFO [org.apa.pek.clu.sha.ShardRegion] (fortress-cluster-pekko.actor.default-dispatcher-17) session: Automatic entity passivation: idle entities after [2.000 min], checked every [1.000 min]
2025-09-04 18:56:19,180 INFO [org.apa.pek.clu.sin.ClusterSingletonManager] (fortress-cluster-pekko.actor.default-dispatcher-17) Singleton manager starting singleton actor [pekko://fortress-cluster/system/sharding/sessionCoordinator/singleton]
2025-09-04 18:56:19,181 INFO [org.apa.pek.clu.sin.ClusterSingletonManager] (fortress-cluster-pekko.actor.default-dispatcher-17) ClusterSingletonManager state change [Start -> Oldest]
2025-09-04 18:56:19,186 INFO [org.apa.pek.clu.sha.DDataShardCoordinator] (fortress-cluster-pekko.actor.default-dispatcher-16) session: ShardCoordinator was moved to the active state with [0] shards
这里日志打印 ShardCoordinator was moved to the active state with [0] shards 说明集群分片已经启动, 但是目前没有会话连接到内部.
补充说明
为什么需要传递 ShardRegion.MessageExtractor 这个类?
其实集群分片的原理就是在原本的集群 Cluster 之上加上动态扩展节点, 并且追加崩溃状态转移等处理方式;
单个集群服务之下需要按照规则来做动态节点创建, 这个动态创建的规则就是 MessageExtractor 定义, 按照接入集群规则流程如下:
- 外部请求接入集群分片节点
- 消息包装时候调用
MessageExtractor提取出entityId作为集群标识 - 集群分片接收到消息调用
MessageExtractor提取shardId作为分片标识 - 获取到
shardId判断是否在entityId之下存在 - 如果存在
shardId就直接转发给对应Actor节点对象 - 如果不存在
shardId就动态挂载关联的Actor节点之后再转发处理
这里就是基本上的处理流程, 所以说 MessageExtractor 才是很重要, 消息到达与否和他有密切联系;
但是目前在 common 的组件其实很大概率不具有通用性, 之前注释也说明过:
// 这部分后面可能需要优化处理下, 基本约等于将事件硬编码到这里识别
// 感觉这里配置不是那么灵活
if (message instanceof Events.C2SConnectedEvent event) {
return event.getSessionId();
} else if (message instanceof Events.C2SDisconnectEvent event) {
return event.getSessionId();
} else if (message instanceof Events.C2SMessageEvent event) {
return event.getSessionId();
} else if (message instanceof Events.S2CDisconnectEvent event) {
return event.getSessionId();
} else if (message instanceof Events.S2CMessageEvent event) {
return event.getSessionId();
}
所以后面也需要改进下让其能够通用化实现, 所以重构这部分功能采用通用化实现:
package io.fortress.common.actor;
import org.apache.commons.codec.digest.MurmurHash3;
import org.apache.pekko.cluster.sharding.ShardRegion;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.function.Function;
/**
* io.fortress.common.actor.ActorMessageExtractor.java
* <p>
* 支持通用的的消息提取器, 外部需要传入实现 Extractor 的列表, 才会被检索判断出来
*/
@SuppressWarnings("unused")
public class ActorMessageExtractor implements ShardRegion.MessageExtractor {
/**
* 默认提取接口
*/
public interface Extractor {
/**
* 获取实体ID
*/
String getEntityId(Object message);
/**
* 获取分片ID
*/
String getShardId(Object message);
}
/**
* 分片总数
*/
protected final int numberOfShards;
/**
* MurMur3哈希Seed定义, 默认为0
*/
protected final int hashSeed;
/**
* 消息提取器列表
*/
protected final List<Extractor> extractors;
/**
* 生成分片ID回调
*/
protected final Function<Integer, String> createShardId;
/**
* 构造方法
*/
public ActorMessageExtractor(int numberOfShards, List<Extractor> extractors, Function<Integer, String> createShardId, int hashSeed) {
if (numberOfShards <= 0) throw new IllegalArgumentException("numberOfShards must be greater than 0");
this.numberOfShards = numberOfShards;
this.extractors = extractors;
this.createShardId = createShardId;
this.hashSeed = hashSeed;
}
/**
* 构造方法
*/
public ActorMessageExtractor(int numberOfShards, List<Extractor> extractors, Function<Integer, String> createShardId) {
this(numberOfShards, extractors, createShardId, 0);
}
/**
* 构造方法
*/
public ActorMessageExtractor(int numberOfShards, List<Extractor> extractors, String prefix, int hashSeed) {
this(numberOfShards, extractors, (integer -> prefix + integer), hashSeed);
}
/**
* 构造方法
*/
public ActorMessageExtractor(int numberOfShards, List<Extractor> extractors, String prefix) {
this(numberOfShards, extractors, (integer -> prefix + integer), 0);
}
/**
* 检索实体ID
*/
@Override
public String entityId(Object message) {
if (extractors.isEmpty() || message == null) return null;
// 遍历列表当中符合条件的对象
return extractors
.stream()
.filter(e -> e.getEntityId(message) != null)
.findFirst()
.map(value -> value.getEntityId(message))
.orElse(null);
}
/**
* 检索实体消息, 保持不变即可
*/
@Override
public Object entityMessage(Object message) {
return message;
}
/**
* 检索分片ID
*/
@Override
public String shardId(Object message) {
if (extractors.isEmpty() || message == null) return null;
String shardId = extractors.stream()
.filter(e -> e.getShardId(message) != null)
.findFirst()
.map(value -> value.getShardId(message))
.orElse(null);
if (shardId == null) return null;
// 取模算法
byte[] bytes = shardId.getBytes(StandardCharsets.UTF_8);
int hash = Math.abs(MurmurHash3.hash32x86(bytes, 0, bytes.length, hashSeed));
Integer shardNumber = hash % numberOfShards;
return createShardId.apply(shardNumber);
}
}
这里编写个测试单元确认功能运行成功:
package io.fortress.common.actor;
import io.fortress.common.protobuf.Events;
import io.smallrye.common.constraint.Assert;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* io.fortress.common.actor.ActorMessageExtractorTest.java
* <p>
* ActorMessageExtractor 测试单元
*/
class ActorMessageExtractorTest {
/**
* 日志句柄
*/
private static final Logger log = LoggerFactory.getLogger(ActorMessageExtractorTest.class);
/**
* 需要实现提取器功能:C2SConnectedEvent
*/
record C2SConnectedEventExtractor() implements ActorMessageExtractor.Extractor {
@Override
public String getEntityId(Object message) {
if (message instanceof Events.C2SConnectedEvent event) {
return event.getSessionId();
}
return null;
}
@Override
public String getShardId(Object message) {
if (message instanceof Events.C2SConnectedEvent event) {
return event.getSessionId();
}
return null;
}
}
/**
* 需要实现提取器功能:C2SDisconnectEvent
*/
record C2SDisconnectedEventExtractor() implements ActorMessageExtractor.Extractor {
@Override
public String getEntityId(Object message) {
if (message instanceof Events.C2SDisconnectEvent event) {
return event.getSessionId();
}
return null;
}
@Override
public String getShardId(Object message) {
if (message instanceof Events.C2SDisconnectEvent event) {
return event.getSessionId();
}
return null;
}
}
/**
* 通用取模分片算法
*/
private final ActorMessageExtractor extractor = new ActorMessageExtractor(
10,
List.of(
// 注入消息提取器
new C2SConnectedEventExtractor(),
new C2SDisconnectedEventExtractor()
),
integer -> "shared-" + integer,
0
);
/**
* 提取集群实体ID
*/
@Test
void entityId() {
// 生成 C2SConnectedEvent 事件结构
Events.C2SConnectedEvent c2SConnectedEvent = Events.C2SConnectedEvent
.newBuilder()
.setSessionId("1000001")
.setTimestamp(System.currentTimeMillis())
.setIp("127.0.0.1")
.build();
var entityId = extractor.entityId(c2SConnectedEvent);
Assert.assertNotNull(entityId);
log.info("entityId: {}", entityId);
}
/**
* 检索分片ID
*/
@Test
void shardId() {
// 随机生成批量消息事件
for (int i = 0; i < 5; i++) {
// 生成 C2SDisconnectEvent 事件结构
Events.C2SDisconnectEvent c2SDisconnectEvent = Events.C2SDisconnectEvent
.newBuilder()
.setSessionId(String.valueOf(RandomUtils.insecure().randomLong(100000, 900000)))
.setTimestamp(System.currentTimeMillis())
.build();
// 检索分片ID
var shardId = extractor.shardId(c2SDisconnectEvent);
Assert.assertNotNull(shardId);
log.info("shardId: {}", shardId);
}
}
}
这里可以看到这个接口功能通用性更强, 支持外部去定制提取器列表分析消息返回.
但是问题也在通用性方面, 为了支持外部高度自定义消息解析采用 List, 每次消息都会去遍历是否类型匹配;
无论消息是否被认证过都会被列表迭代查询一次, 这其中的损耗其实就看你是否能够接受,
所以消息列表应该尽可能控制少点从而避免迭代查询次数过多.
改进之后的 ActorSystem 代码就像这样:
package io.fortress.cluster;
import io.fortress.cluster.config.ClusterActorSystemConfiguration;
import io.fortress.cluster.session.SessionManager;
import io.fortress.common.actor.ActorMessageExtractor;
import io.fortress.common.actor.ModuloMessageExtractor;
import io.fortress.common.protobuf.Events;
import io.quarkus.arc.DefaultBean;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.sharding.ClusterSharding;
import org.apache.pekko.cluster.sharding.ClusterShardingSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* io.fortress.cluster.ClusterActorSystem.java
* <p>
* 加载全局初始化 ActorSystem
*/
@ApplicationScoped
public class ClusterActorSystem {
/**
* 日志对象
*/
final Logger logger = LoggerFactory.getLogger(ClusterActorSystem.class);
/**
* 加载配置
*/
@Inject
ClusterActorSystemConfiguration configuration;
/**
* 获取消息提取器列表
* 简单提取两个事件, 后面这个通用消息会统一封装
*/
public List<ActorMessageExtractor.Extractor> getActorExtractor() {
return List.of(
new ActorMessageExtractor.Extractor() {
@Override
public String getEntityId(Object message) {
if (message instanceof Events.C2SConnectedEvent event) {
return event.getSessionId();
}
return null;
}
@Override
public String getShardId(Object message) {
if (message instanceof Events.C2SConnectedEvent event) {
return event.getSessionId();
}
return null;
}
},
new ActorMessageExtractor.Extractor() {
@Override
public String getEntityId(Object message) {
if (message instanceof Events.C2SDisconnectEvent event) {
return event.getSessionId();
}
return null;
}
@Override
public String getShardId(Object message) {
if (message instanceof Events.C2SDisconnectEvent event) {
return event.getSessionId();
}
return null;
}
}
);
}
/**
* 生成全局唯一Actor系统
*/
@Produces
@Startup
@DefaultBean
@ApplicationScoped
public ActorSystem createActorSystem() {
// 打印配置
logger.info("Creating ActorSystem: {}", configuration.name());
configuration.settings().forEach((key, value) -> logger.info("ActorSystem Setting: {} = {}", key, value));
// 生成 Actor System 并注册系统退出方法, 当 Actor 系统退出的时候要求关闭整个服务
ActorSystem system = configuration.createActorSystem((terminatedTry -> {
logger.info("Close ActorSystem, Status: {}", terminatedTry.isSuccess() ? "Success" : "Failure");
// 无论成功失败都关闭 Quarkus 系统
Quarkus.asyncExit();
return null;
}));
// 确认是否加入集群, 加入集群的同时需要启动集群分片
Cluster cluster = Cluster.get(system);
cluster.registerOnMemberUp(() -> {
logger.info("Joined Cluster: {}", system.name());
// todo: 创建集群分片业务, 后面这里可能要生成动态的配置加载
// 设定会话集群对象: Session
ClusterSharding clusterSharding = ClusterSharding.get(system);
ClusterShardingSettings settings = ClusterShardingSettings.create(system);
ActorMessageExtractor extractor = new ActorMessageExtractor(5, getActorExtractor(), "session-");
ActorRef sessionActorRef = clusterSharding.start(
"session",
SessionManager.props,
settings,
extractor
);
logger.info("Create Cluster Sharding: {}", sessionActorRef.path().address());
// 对于游戏服务端来说, 除了玩家自身会话之外, 可能还有涉及到其他类型分片业务
// - clusterSharding.start("chat",....) // 聊天系统
// - clusterSharding.start("rank",....) // 排行系统
// - clusterSharding.start("battle",....) // 战斗系统
// - clusterSharding.start("achieve",....) // 成就系统
// 这部分系统就是最开我担心的, 单纯的 ModuloMessageExtractor 系统已经无法覆盖到其中分片
// 不过目前还没用到的相关, 可以先处理玩家个人的消息集群分片业务
});
// 离开集群的时候的回调, 这里可以通知该集群下面的分片做些处理
cluster.registerOnMemberRemoved(() -> {
logger.info("Removed Cluster: {}", system.name());
});
return system;
}
}
再次启动服务正常挂载就没问题了