最近需要部署搭建个 WebSocket 的游戏网关服务端, 考虑之后从 Java 之中选取设计方案;
而这里首先排除 Spring 全家桶系列, 因为内部集成太过冗余且对底层操作也比较麻烦,
最后敲定方案采用仅仅对网络工具方法抽象的 Vert-X.
这里 Actor 最开始考虑 akka, 但是目前最新版本已经从开源协议转为商业协议,
为了后续规避潜在的商业纠纷最后采用同源的开源替代 pekko.
而消息交换协议, 最开始在单纯二进制和 protbuf 之间考虑, 主要问题是 protobuf 协议生成的类文件对于项目侵入太严重;
所以考虑在三之后选择性能更好且通用型更强的 msgpack.
建议对 WebSocket 协议规范 rfc6455 进行学习,
因为相对于 TCP 来说底层已经做好 帧(Frame) 概念处理, 所以直接只用处理消息接收和推送即可.
对于 WebSocket 来说基本上传递数据格式已经内置数据类型:
0x1: 文本内容0x2: 二进制内容0x8: 关闭连接0x9: Ping0xA: Pong
这主要是区分好消息格式发送什么, 用来推送给 Actor 内部消息来处理不同的业务逻辑.
另外需要设计好 actor 拓扑图, 需要对 actor 功能做基本设计分布:
[ActorSystem,最基础的系统]
|
|
[Supervisor,核心管理器负责监控所有 Worker] --- [Worker:WebSocket,网络层负责协调客户端和Actor关联,同时负责动态会话创建]
| |
| |
| |- [Worker * N:(UUID) Session, 每个会话都动态创建 actor]
| |
| |- [会话Actor是不断动态创建的]......
|
|- [Proxy:Cluster,集群业务代理转发]
我这里是按照以前项目 Erlang 的 MMO 框架设计出来的基本网络架构,
每个启动游戏进程其实就是单服, 比如 微信1区 对应 192.168.1.10 单台服务器的 进程A,
微信2区 对应 192.168.1.10 单台服务器的 进程B, 以此类推出扩展手动选服的游戏服务器.
为什么要每个会话动态创建 Actor , 而不是构建单个 Actor 来读写?
官方 akka/pekko 对于 actor 要求十分明确: 状态隔离 和 任务分解.
这里可以设想下, 如果采用单 actor 作为 Socket 管理会出现什么情况:
崩溃连锁: 假设某个原因某个Session异常导致线程崩溃的时候, 所有Socket需要一起崩溃吗?请求频繁:Actor模型单个对象本身负载能力有限, 基于消息队列转发Socket消息会造成大量消息堆积和产生死信
日常可能出问题就上面这些, 其中频繁请求导致队列堆积就是造成单个
Actor占用异常的最大问题
上面就是必须思考的问题, 包括后续做业务的时候需要考虑 怎么把 Actor 功能分流缓解请求压力.
另外还有个依赖问题, 也就是比较经典的: 是 VertX 依赖 Pekko ? 还是 Pekko 依赖 VertX ?
Vert-X在类内部再启动Pekko初始化ActorSystemPekko初始化之后在ActorSystem挂载Vert-X单独服务
其实从上面就已经看出来了, 当引入 Actor-FSM 概念的时候就要明确: 所有的服务都要抽象单个或多个Actor.
Actor内部其实就是有限状态机(FSM,Finite State Machine)
还有一点就是 必须要为外部控制关闭|停止服务器做好预留接口, 这样才能直接手动在即时执行游戏维护的关闭服务器操作.
那么这样抽象出来的话, Vert-X-Actor 需要负担以下系统功能:
OnStart: 启动WebSocket服务OnStop: 关闭WebSocket服务OnRestart: 可有可无的的重启功能, 实际上就是Stop->Start启动服务OnConnect: 客户端会话连接加入时候动态创建ActorOnClose: 客户端会话连接关闭时候需要通知Actor关闭和退出OnRequest: 消息请求事件, 需要区分WebSocket有Text和Binary等类型OnResponse: 消息响应事件, 由Actor内部发起通知客户端
为了降低理解难度, 这里都是基于
本地Actor来做开发, 而不涉及集群Actor等业务拆分
这里都是提供抽象成 事件(Event) 或者 信号(Signal) 处理, 但是需要先学习概念: 依赖反转.
之所以要学习 依赖反转 概念是要学习怎么把 Vert-X 和 Pekko 集成在一起,
不要像下面一样编写出来的功能:
class WebSocketActor {
private Vertx vertx;// Vert运行时
private HttpServer server; // WebSocket服务
/**
* 构造方法
*/
public WebSocketActor() {
this.vertx = Vertx.vert();
this.server = vertx.createHttpServer();
}
}
// 直接外部启用初始化
// var listen = new WebSocketActor();
像这种功能类实例化看起来没有问题, 但其实内部扩展性是有问题的: 如果外部需要 Vertx 重用的是否该怎么办?
// 单个运行时
var vertx = Vertx.vert();
// 需要创建WebSocket应用A
var server = vertx.createHttpServer();
// 内部集成的功能类该怎么办? 没办法引用到同个 vertx 运行时
var listen = new WebSocketActor();
// 难道每个服务都创建独立不共享的 Vert 运行时吗?
// 如果把功能过于集中内部, 那么以后做功能迭代维护的时候就会很麻烦
// 而且这个功能后续可能会被频繁用到, 所以更加需要构建成通用组件来处理
所以在后续应该将 Vert-X 运行时作为组件引入, 这里需要采用 依赖反转 :
/**
* 泛型的通用事件抽象
* <p>
* 所有的事件对象都是需要继承这个抽象通用类, 就像以下示例:
* <pre>
* class StopEvent implements IEvent<Long>{
* private Long timestamp;
* public Long message(){
* return timestamp;
* }
* }
* </pre>
*
* @param <T> 消息传递类型
*/
public interface IEvent<T> {
/**
* 获取消息主体内容
*/
T message();
}
以上就是通用消息结构, 用于对消息进行统一规范化传递; 之后就是抽象出来的 Actor 管理器:
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.Props;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Receive;
import org.apache.pekko.actor.typed.javadsl.ReceiveBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
/**
* 抽象的 ActorSupervisor
* <p>
* 内部采用 Map 做映射管理多个 actorRef 做 worker 容器,
* 如果想要使用必须继承该抽象类从而去实现底层方法
*
* @param <T> 消息类型
*/
public abstract class AbstractSupervisor<T> extends AbstractBehavior<T> {
/**
* 工作任务 Actor 池
*/
protected final Map<String, ActorRef<T>> workers;
/**
* 拦截消息监听器
*/
protected final ReceiveBuilder<T> builder = ReceiveBuilder.create();
/**
* 默认构造方法
*
* @param context Actor上下文
*/
public AbstractSupervisor(ActorContext<T> context) {
super(context);
// 采用系统默认MAP的容量
this.workers = new HashMap<>();
}
/**
* 默认构造方法
*
* @param context Actor上下文
* @param capacity Map的元素容量
*/
public AbstractSupervisor(ActorContext<T> context, int capacity) {
super(context);
this.workers = new HashMap<>(capacity);
}
/**
* 添加Actor节点
*
* @param behavior 子Actor运行状态
* @param name 子Actor的节点名称
* @return ActorRef
*/
public ActorRef<T> spawn(Behavior<T> behavior, String name) {
final var ctx = getContext();
final var ref = ctx.spawn(behavior, name);
return workers.put(name, ref);
}
/**
* 添加Actor节点
*
* @param behavior 子Actor运行状态
* @param name 子Actor的节点名称
* @param props 子Actor的属性
* @return ActorRef
*/
public ActorRef<T> spawn(Behavior<T> behavior, String name, Props props) {
final var ctx = getContext();
final var ref = ctx.spawn(behavior, name, props);
return workers.put(name, ref);
}
/**
* 回调方法做初始化子Actor
*
* @param creator 子Actor构建器
*/
public void spawn(BiConsumer<ActorContext<T>, Map<String, ActorRef<T>>> creator) {
creator.accept(getContext(), workers);
}
/**
* 停止Actor节点
*
* @param name 子Actor的节点名称
* @return ActorRef|null
*/
public ActorRef<T> stop(String name) {
final var ctx = getContext();
final var ref = workers.get(name);
if (ref != null) {
ctx.stop(ref);
return workers.remove(name);
}
return null;
}
/**
* 获取消息拦截器
*
* @return ReceiveBuilder
*/
public ReceiveBuilder<T> getReceiveBuilder() {
return builder;
}
/**
* 默认消息拦截器构建方法
*
* @return Receive
*/
@Override
public Receive<T> createReceive() {
return builder.build();
}
}
后续就是去实现 Actor 管理器, 从而具体集成搭建第三方包功能, 这里实现个简单的 回写(echo) 服务:
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import java.util.function.Consumer;
/**
* 回写Actor管理器
*/
public final class EchoSupervisor extends AbstractSupervisor<IEvent<String>> {
/**
* 内部定义的消息传递包
*
* @param msg
*/
public record Message(
String msg
) implements IEvent<String> {
@Override
public String message() {
return msg;
}
}
/**
* 不允许外部实例化
*
* @param context Actor上下文
*/
private EchoSupervisor(ActorContext<IEvent<String>> context) {
super(context);
}
/**
* 不允许外部实例化
*
* @param context Actor上下文
* @param capacity 子Actor容器大小
*/
private EchoSupervisor(ActorContext<IEvent<String>> context, int capacity) {
super(context, capacity);
}
/**
* 静态构建
*/
public static Behavior<IEvent<String>> create() {
return Behaviors.setup(EchoSupervisor::new);
}
/**
* 静态构建
*/
public static Behavior<IEvent<String>> create(Consumer<EchoSupervisor> creator) {
return Behaviors.setup(ctx -> {
final var owner = new EchoSupervisor(ctx);
creator.accept(owner);
return owner;
});
}
}
之后就是简单入口调用实现:
/**
* 启动应用入口
*/
public class WebSocketApp {
/**
* 入口方法
*/
public static void main(String[] args) {
// 创建 echo 管理器, 并且移交 WebSocketApp::echo 静态函数来对内部消息进行 hook
final var echoSupervisor = EchoSupervisor.create(WebSocketApp::echo);
// 创建 echo 运行时的系统
final var echoActor = ActorSystem.create(
echoSupervisor,
"echo");
// 发送给 echo actor 消息让其调用事件
echoActor.tell(new EchoSupervisor.Message("Hello.World"));
}
/**
* Actor管理器回调函数
*
* @param supervisor Actor 运行管理器
*/
public static void echo(EchoSupervisor supervisor) {
final var builder = supervisor.getReceiveBuilder();
builder.onMessage(EchoSupervisor.Message.class, (event) -> {
System.out.println("Actor Message: " + event.message());
return supervisor;
});
}
}
可以看出来内部只对功能进行简单的封装, 主要业务全部是在外部拦截并且转发从而实现 高内聚低耦合 的状态,
启动执行之后就能看到具体 Actor 消息转发执行显示内容了.