消息对接
现在已经搭建好集群分片和网关服务, 可以尝试将他们的功能对接起来方便后续的业务开发.
Actor System 连接 Cluster Sharding 方式其实也十分简单, 直接启动的时候挂载代理就行了.
注意以下代码哪怕运行之后还是会报错, 还有很多细致化配置参数需要处理
这里是 websocket 项目的全局 ActorSystem 对象生成 Bean:
package io.fortress.websocket;
import io.fortress.common.actor.ActorMessageExtractor;
import io.fortress.common.protobuf.Events;
import io.fortress.websocket.config.WebSocketActorSystemConfiguration;
import io.quarkus.arc.DefaultBean;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.sharding.ClusterSharding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Optional;
/**
* io.fortress.websocket.WebSocketActorSystem.java
* <p>
* 加载全局初始化 ActorSystem
*/
@ApplicationScoped
public class WebSocketActorSystem {
/**
* 日志对象
*/
final Logger logger = LoggerFactory.getLogger(WebSocketActorSystem.class);
/**
* 加载 ActorSystem 配置
*/
@Inject
WebSocketActorSystemConfiguration configuration;
/**
* 获取消息提取器列表
*/
public List<ActorMessageExtractor.Extractor> getActorExtractor() {
return List.of(
// 简单提取两个事件
new ActorMessageExtractor.Extractor() {
@Override
public String getEntityId(Object message) {
if (message instanceof Events.C2SConnectedEvent event) {
return event.getSessionId();
}
return null;
}
@Override
public String getShardId(Object message) {
if (message instanceof Events.C2SConnectedEvent event) {
return event.getSessionId();
}
return null;
}
},
new ActorMessageExtractor.Extractor() {
@Override
public String getEntityId(Object message) {
if (message instanceof Events.C2SDisconnectEvent event) {
return event.getSessionId();
}
return null;
}
@Override
public String getShardId(Object message) {
if (message instanceof Events.C2SDisconnectEvent event) {
return event.getSessionId();
}
return null;
}
}
);
}
/**
* 生成全局唯一Actor系统
*/
@Produces
@Startup
@DefaultBean
@ApplicationScoped
public ActorSystem createActorSystem() {
// 打印配置
logger.info("Creating ActorSystem: {}", configuration.name());
configuration.settings().forEach((key, value) -> logger.info("ActorSystem Setting: {} = {}", key, value));
// 生成 Actor System 并注册系统退出方法, 当 Actor 系统退出的时候要求关闭整个服务
ActorSystem system = configuration.createActorSystem((terminatedTry -> {
logger.info("Close ActorSystem, Status: {}", terminatedTry.isSuccess() ? "Success" : "Failure");
// 无论成功失败都关闭 Quarkus 系统
Quarkus.asyncExit();
return null;
}));
// 确认是否加入集群, 加入集群的同时需要启动集群分片
Cluster cluster = Cluster.get(system);
cluster.registerOnMemberUp(() -> {
logger.info("Joined Cluster: {}", system.name());
ClusterSharding sharding = ClusterSharding.get(system);
// 需要初始化连接节点服务, 这里其实就相当于启动本地代理
// 这里先直接手动处理连接代理, 后续再编写本地监听器
ActorMessageExtractor extractor = new ActorMessageExtractor(5, getActorExtractor(), "session-");
ActorRef sessionRef = sharding.startProxy(
"session",
Optional.empty(), // 以远程集群什么角色参与
extractor
);
logger.info("Joined Cluster Session: {}", sessionRef.path().address());
// 测试发送个数据包连通
// 目前是看起来流程是对的, 但是运行起来是没办法跑通的
Events.C2SConnectedEvent event = Events.C2SConnectedEvent
.newBuilder()
.setSessionId("10001")
.setTimestamp(System.currentTimeMillis())
.setIp("127.0.0.1")
.build();
sessionRef.tell(event, ActorRef.noSender());
});
// 离开集群的时候的回调, 这里可以通知该集群下面的分片做些处理
cluster.registerOnMemberRemoved(() -> {
logger.info("Removed Cluster: {}", system.name());
});
return system;
}
}
然后 websocket 的 application.properties 文件内容如下:
# HTTP请求接口
quarkus.http.host=127.0.0.1
quarkus.http.port=8082
quarkus.http.root-path=/fortress
# 日志打印
quarkus.log.category."io.fortress".level=DEBUG
# Actor配置
# 参考: pekko-cluster_2.13-1.1.5.jar!\reference.conf 内部配置重载
fortress.actor.name=fortress-cluster
# 直接角色配置项声明即可
fortress.actor.settings.pekko.cluster.roles.0=fortress-websocket
# 重载 pekko 配置项目 - 基础 Actor 设置
fortress.actor.settings.pekko.actor.provider=cluster
fortress.actor.settings.pekko.remote.artery.enabled=on
fortress.actor.settings.pekko.remote.artery.canonical.hostname=127.0.0.1
fortress.actor.settings.pekko.remote.artery.canonical.port=2550
# Actor 序列化处理
fortress.actor.settings.pekko.actor.serializers.proto=org.apache.pekko.remote.serialization.ProtobufSerializer
fortress.actor.settings.pekko.actor.serialization-bindings."com.google.protobuf.Message"=proto
# 重载 pekko 配置项目 - 集群节点 Actor(注意:自己最好在首个节点)
fortress.actor.settings.pekko.cluster.seed-nodes.0=pekko://${fortress.actor.name}@127.0.0.1:2551
#fortress.actor.settings.pekko.cluster.seed-nodes.1=pekko://${fortress.actor.name}@127.0.0.1:2552
# 重载 pekko 配置项目 - 集群 Actor 决策配置
fortress.actor.settings.pekko.cluster.downing-provider-class=org.apache.pekko.cluster.sbr.SplitBrainResolverProvider
fortress.actor.settings.pekko.cluster.split-brain-resolver.active-strategy=keep-majority
看起来配置也是没有什么问题的, 但是当 cluster 和 websocket 运行起来就是会报错, 异常的报错异常内容如下:
# 网关端错误
2025-09-05 12:33:31,741 WARN [org.apa.pek.clu.sha.ShardRegion] (fortress-cluster-pekko.actor.default-dispatcher-5) session: Trying to register to coordinator at [ActorSelection[Anchor(pekko://[email protected]:2551/), Path(/system/sharding/sessionCoordinator/singleton/coordinator)]], but no acknowledgement. Total [1] buffered messages. [Coordinator [Member(pekko://[email protected]:2551, Up)] is reachable.]
2025-09-05 12:33:33,760 WARN [org.apa.pek.clu.sha.ShardRegion] (fortress-cluster-pekko.actor.default-dispatcher-17) session: Trying to register to coordinator at [ActorSelection[Anchor(pekko://[email protected]:2551/), Path(/system/sharding/sessionCoordinator/singleton/coordinator)]], but no acknowledgement. Total [1] buffered messages. [Coordinator [Member(pekko://[email protected]:2551, Up)] is reachable.]
2025-09-05 12:33:35,783 WARN [org.apa.pek.clu.sha.ShardRegion] (fortress-cluster-pekko.actor.default-dispatcher-5) session: Trying to register to coordinator at [ActorSelection[Anchor(pekko://[email protected]:2551/), Path(/system/sharding/sessionCoordinator/singleton/coordinator)]], but no acknowledgement. Total [1] buffered messages. [Coordinator [Member(pekko://[email protected]:2551, Up)] is reachable.]
不用怀疑自己, 这里面网关推送的代码大体上是没有错误的; 其实关键就是内部的 ClusterSharding 参数配置的 Role.
在 pekko 官方文档当中的配置有个关键地方说明:
# 文档地址: https://pekko.apache.org/docs/pekko/current/typed/cluster-sharding.html
Cluster sharding init should be called on every node for each entity type.
Which nodes entity actors are created on can be controlled with roles.
init will create a ShardRegion or a proxy depending on whether the node’s role matches the entity’s role.
每个节点都应该为每种实体类型调用集群分片初始化.
可以通过角色来控制在哪些节点上创建 Actor Entity.
init 将根据节点的角色是否与实体的角色匹配来创建一个 ShardRegion 或 ShardRegionProxy
这句话粗看没什么问题, 但是仔细看 初始化的时候根据节点的角色与实体的角色匹配, 也就是这里的分片集群是要手动去指定角色才能被动态分配节点.
那么之前在 Cluster 项目的 application.properties 配置已经明确追加的角色:
# 这里明明指定了 pekko.cluster.roles 角色组了
fortress.actor.settings.pekko.cluster.roles.0=fortress-cluster
fortress.actor.settings.pekko.cluster.roles.1=fortress-game
fortress.actor.settings.pekko.cluster.roles.2=fortress-center
这时候可能留意到, 这里设置的是 pekko.cluster 的相关配置, 而不是 pekko.cluster.sharding.
这里一直都是底层 pekko.cluster 驱动配置, 需要的是 pekko.cluster.sharding.role 角色配置,
那么这时候随便配个角色:
# 这里明明指定了 pekko.cluster.roles 角色组了
fortress.actor.settings.pekko.cluster.roles.0=fortress-cluster
fortress.actor.settings.pekko.cluster.roles.1=fortress-game
fortress.actor.settings.pekko.cluster.roles.2=fortress-center
# 追加集群分片的角色
fortress.actor.settings.pekko.cluster.sharding.role=fortress-manager
还是依旧报错无法传递数据, 这里是另外关键点: cluster.sharding 是基于 cluster, 可选角色是不是要在 cluster.roles 选?
这里改动一下配置, 在 pekko.cluster.roles 追加上我们定义的角色:
# 这里明明指定了 pekko.cluster.roles 角色组了
fortress.actor.settings.pekko.cluster.roles.0=fortress-cluster
fortress.actor.settings.pekko.cluster.roles.1=fortress-game
fortress.actor.settings.pekko.cluster.roles.2=fortress-center
fortress.actor.settings.pekko.cluster.roles.3=fortress-manager
# 追加集群分片的角色
fortress.actor.settings.pekko.cluster.sharding.role=fortress-manager
启动 cluster 和 websocket 之后, 数据传输成功:
# 集群端接收到数据
2025-09-05 13:37:42,858 INFO [io.for.clu.ses.SessionManager] (fortress-cluster-pekko.actor.default-dispatcher-6) Received C2SConnectedEvent: sessionId: "10001"
timestamp: 1757050662796
ip: "127.0.0.1"
这里面运行的方法必须仔细观看官方文档才能发现, 否则你编写半天的集群可能最后直接没办法正常运行.
附录
cluster 和 websocket 项目最后集群参与代码可以参考下面内容处理.
Cluster 项目
application.properties 配置文件:
# Actor 集群配置
# 参考: pekko-cluster_2.13-1.1.5.jar!\reference.conf 内部配置重载
fortress.actor.name=fortress-cluster
# 直接角色配置项声明即可
fortress.actor.settings.pekko.cluster.roles.0=fortress-cluster
fortress.actor.settings.pekko.cluster.roles.1=fortress-game
fortress.actor.settings.pekko.cluster.roles.2=fortress-center
fortress.actor.settings.pekko.cluster.roles.3=fortress-manager
# 追加集群分片的角色
fortress.actor.settings.pekko.cluster.sharding.role=fortress-manager
# 重载 pekko 配置项目 - 基础 Actor 设置
fortress.actor.settings.pekko.actor.provider=cluster
fortress.actor.settings.pekko.remote.artery.enabled=on
fortress.actor.settings.pekko.remote.artery.canonical.hostname=127.0.0.1
fortress.actor.settings.pekko.remote.artery.canonical.port=2551
# Actor 序列化处理
fortress.actor.settings.pekko.actor.serializers.proto=org.apache.pekko.remote.serialization.ProtobufSerializer
fortress.actor.settings.pekko.actor.serialization-bindings."com.google.protobuf.Message"=proto
# 重载 pekko 配置项目 - 集群节点 Actor(注意:自己最好在首个节点)
fortress.actor.settings.pekko.cluster.seed-nodes.0=pekko://${fortress.actor.name}@127.0.0.1:2551
#fortress.actor.settings.pekko.cluster.seed-nodes.1=pekko://${fortress.actor.name}@127.0.0.1:2552
# 重载 pekko 配置项目 - 集群 Actor 决策配置
fortress.actor.settings.pekko.cluster.downing-provider-class=org.apache.pekko.cluster.sbr.SplitBrainResolverProvider
fortress.actor.settings.pekko.cluster.split-brain-resolver.active-strategy=keep-majority
ClusterActorSystem 全局系统创建:
package io.fortress.cluster;
import io.fortress.cluster.config.ClusterActorSystemConfiguration;
import io.fortress.cluster.session.SessionManager;
import io.fortress.common.actor.ActorMessageExtractor;
import io.fortress.common.actor.ModuloMessageExtractor;
import io.fortress.common.protobuf.Events;
import io.quarkus.arc.DefaultBean;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.sharding.ClusterSharding;
import org.apache.pekko.cluster.sharding.ClusterShardingSettings;
import org.apache.pekko.cluster.sharding.ShardCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* io.fortress.cluster.ClusterActorSystem.java
* <p>
* 加载全局初始化 ActorSystem
*/
@ApplicationScoped
public class ClusterActorSystem {
/**
* 日志对象
*/
final Logger logger = LoggerFactory.getLogger(ClusterActorSystem.class);
/**
* 加载配置
*/
@Inject
ClusterActorSystemConfiguration configuration;
/**
* 获取消息提取器列表
*/
public List<ActorMessageExtractor.Extractor> getActorExtractor() {
return List.of(
// 简单提取两个事件
new ActorMessageExtractor.Extractor() {
@Override
public String getEntityId(Object message) {
if (message instanceof Events.C2SConnectedEvent event) {
return event.getSessionId();
}
return null;
}
@Override
public String getShardId(Object message) {
if (message instanceof Events.C2SConnectedEvent event) {
return event.getSessionId();
}
return null;
}
},
new ActorMessageExtractor.Extractor() {
@Override
public String getEntityId(Object message) {
if (message instanceof Events.C2SDisconnectEvent event) {
return event.getSessionId();
}
return null;
}
@Override
public String getShardId(Object message) {
if (message instanceof Events.C2SDisconnectEvent event) {
return event.getSessionId();
}
return null;
}
}
);
}
/**
* 生成全局唯一Actor系统
*/
@Produces
@Startup
@DefaultBean
@ApplicationScoped
public ActorSystem createActorSystem() {
// 打印配置
logger.info("Creating ActorSystem: {}", configuration.name());
configuration.settings().forEach((key, value) -> logger.info("ActorSystem Setting: {} = {}", key, value));
// 生成 Actor System 并注册系统退出方法, 当 Actor 系统退出的时候要求关闭整个服务
ActorSystem system = configuration.createActorSystem((terminatedTry -> {
logger.info("Close ActorSystem, Status: {}", terminatedTry.isSuccess() ? "Success" : "Failure");
// 无论成功失败都关闭 Quarkus 系统
Quarkus.asyncExit();
return null;
}));
// 确认是否加入集群, 加入集群的同时需要启动集群分片
Cluster cluster = Cluster.get(system);
cluster.registerOnMemberUp(() -> {
logger.info("Joined Cluster: {}", system.name());
ClusterSharding clusterSharding = ClusterSharding.get(system);
ClusterShardingSettings settings = ClusterShardingSettings.create(system);
// todo: 创建集群分片业务, 后面这里可能要生成动态的配置加载
// 打印关键信息
logger.info("ClusterSharding Role: {}", settings.role());
// 设定会话集群对象: Session
ActorMessageExtractor extractor = new ActorMessageExtractor(5, getActorExtractor(), "session-");
ActorRef sessionActorRef = clusterSharding.start(
"session",
SessionManager.props(),
settings,
extractor
);
logger.info("Create Cluster Sharding: {}", sessionActorRef.path().address());
// 对于游戏服务端来说, 除了玩家自身会话之外, 可能还有涉及到其他类型分片业务
// - clusterSharding.start("chat",....) // 聊天系统
// - clusterSharding.start("rank",....) // 排行系统
// - clusterSharding.start("battle",....) // 战斗系统
// - clusterSharding.start("achieve",....) // 成就系统
});
// 离开集群的时候的回调, 这里可以通知该集群下面的分片做些处理
cluster.registerOnMemberRemoved(() -> {
logger.info("Removed Cluster: {}", system.name());
});
return system;
}
}
SessionManager 节点 Actor:
package io.fortress.cluster.session;
import io.fortress.common.protobuf.Events;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.Props;
import org.apache.pekko.event.Logging;
import org.apache.pekko.event.LoggingAdapter;
/**
* io.fortress.cluster.session.SessionManager.java
* <p>
* 会话集群Actor
*/
public class SessionManager extends AbstractActor {
public static Props props() {
return Props.create(SessionManager.class);
}
final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);
@Override
public Receive createReceive() {
return receiveBuilder().match(Events.C2SConnectedEvent.class, event -> {
logger.info("Received C2SConnectedEvent: {}", event.toString());
}).match(Events.S2CDisconnectEvent.class, event -> {
logger.info("Received S2CDisconnectEvent: {}", event.toString());
}).build();
}
}
WebSocket 项目
application.properties 配置文件:
# HTTP请求接口
quarkus.http.host=127.0.0.1
quarkus.http.port=8082
quarkus.http.root-path=/fortress
# 日志打印
quarkus.log.category."io.fortress".level=DEBUG
# Actor配置
# 参考: pekko-cluster_2.13-1.1.5.jar!\reference.conf 内部配置重载
fortress.actor.name=fortress-cluster
# 直接角色配置项声明即可
fortress.actor.settings.pekko.cluster.roles.0=fortress-websocket
# 重载 pekko 配置项目 - 基础 Actor 设置
fortress.actor.settings.pekko.actor.provider=cluster
fortress.actor.settings.pekko.remote.artery.enabled=on
fortress.actor.settings.pekko.remote.artery.canonical.hostname=127.0.0.1
fortress.actor.settings.pekko.remote.artery.canonical.port=2550
# Actor 序列化处理
fortress.actor.settings.pekko.actor.serializers.proto=org.apache.pekko.remote.serialization.ProtobufSerializer
fortress.actor.settings.pekko.actor.serialization-bindings."com.google.protobuf.Message"=proto
# 重载 pekko 配置项目 - 集群节点 Actor(注意:自己最好在首个节点)
fortress.actor.settings.pekko.cluster.seed-nodes.0=pekko://${fortress.actor.name}@127.0.0.1:2551
#fortress.actor.settings.pekko.cluster.seed-nodes.1=pekko://${fortress.actor.name}@127.0.0.1:2552
# 重载 pekko 配置项目 - 集群 Actor 决策配置
fortress.actor.settings.pekko.cluster.downing-provider-class=org.apache.pekko.cluster.sbr.SplitBrainResolverProvider
fortress.actor.settings.pekko.cluster.split-brain-resolver.active-strategy=keep-majority
WebSocketActorSystem 全局系统创建:
package io.fortress.websocket;
import io.fortress.common.actor.ActorMessageExtractor;
import io.fortress.common.protobuf.Events;
import io.fortress.websocket.config.WebSocketActorSystemConfiguration;
import io.quarkus.arc.DefaultBean;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.sharding.ClusterSharding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Optional;
/**
* io.fortress.websocket.WebSocketActorSystem.java
* <p>
* 加载全局初始化 ActorSystem
*/
@ApplicationScoped
public class WebSocketActorSystem {
/**
* 日志对象
*/
final Logger logger = LoggerFactory.getLogger(WebSocketActorSystem.class);
/**
* 加载 ActorSystem 配置
*/
@Inject
WebSocketActorSystemConfiguration configuration;
/**
* 获取消息提取器列表
*/
public List<ActorMessageExtractor.Extractor> getActorExtractor() {
return List.of(
// 简单提取两个事件
new ActorMessageExtractor.Extractor() {
@Override
public String getEntityId(Object message) {
if (message instanceof Events.C2SConnectedEvent event) {
return event.getSessionId();
}
return null;
}
@Override
public String getShardId(Object message) {
if (message instanceof Events.C2SConnectedEvent event) {
return event.getSessionId();
}
return null;
}
},
new ActorMessageExtractor.Extractor() {
@Override
public String getEntityId(Object message) {
if (message instanceof Events.C2SDisconnectEvent event) {
return event.getSessionId();
}
return null;
}
@Override
public String getShardId(Object message) {
if (message instanceof Events.C2SDisconnectEvent event) {
return event.getSessionId();
}
return null;
}
}
);
}
/**
* 生成全局唯一Actor系统
*/
@Produces
@Startup
@DefaultBean
@ApplicationScoped
public ActorSystem createActorSystem() {
// 打印配置
logger.info("Creating ActorSystem: {}", configuration.name());
configuration.settings().forEach((key, value) -> logger.info("ActorSystem Setting: {} = {}", key, value));
// 生成 Actor System 并注册系统退出方法, 当 Actor 系统退出的时候要求关闭整个服务
ActorSystem system = configuration.createActorSystem((terminatedTry -> {
logger.info("Close ActorSystem, Status: {}", terminatedTry.isSuccess() ? "Success" : "Failure");
// 无论成功失败都关闭 Quarkus 系统
Quarkus.asyncExit();
return null;
}));
// 确认是否加入集群, 加入集群的同时需要启动集群分片
Cluster cluster = Cluster.get(system);
cluster.registerOnMemberUp(() -> {
logger.info("Joined Cluster: {}", system.name());
ClusterSharding sharding = ClusterSharding.get(system);
// 需要初始化连接节点服务, 这里其实就相当于启动本地代理
// 这里先直接手动处理连接代理, 后续再编写本地监听器
ActorMessageExtractor extractor = new ActorMessageExtractor(5, getActorExtractor(), "session-");
ActorRef sessionRef = sharding.startProxy(
"session",
Optional.empty(), // 以远程集群什么角色参与, 代理端其实不需要怎么指定
extractor
);
logger.info("Joined Cluster Session: {}", sessionRef.path().address());
// 测试发送个数据包连通
Events.C2SConnectedEvent event = Events.C2SConnectedEvent
.newBuilder()
.setSessionId("10001")
.setTimestamp(System.currentTimeMillis())
.setIp("127.0.0.1")
.build();
sessionRef.tell(event, ActorRef.noSender());
});
// 离开集群的时候的回调, 这里可以通知该集群下面的分片做些处理
cluster.registerOnMemberRemoved(() -> {
logger.info("Removed Cluster: {}", system.name());
});
return system;
}
}