之前已经展示封装成功能 Actor, 可以看到我们封装类和 skynet 类似,
在官方 vert-x 文档可以看到比较简单的 WebSocket 集成设置:
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.WebSocket;
public class WebSocketServerExample {
public static void main(String[] args) {
// 创建Vertx实例
Vertx vertx = Vertx.vertx();
// 创建Http服务器选项
HttpServerOptions options = new HttpServerOptions()
.setPort(8080);
// 创建Http服务器
HttpServer server = vertx.createHttpServer(options);
// 监听HTTP请求并升级为WebSocket连接
server.webSocketHandler(webSocket -> {
System.out.println("WebSocket连接已建立");
// 处理接收到的WebSocket消息
webSocket.handler(buffer -> {
String message = buffer.toString();
System.out.println("收到消息: " + message);
// 发送响应消息
webSocket.writeTextMessage("你发送的消息是: " + message);
});
// 处理WebSocket连接关闭事件
webSocket.closeHandler(v -> {
System.out.println("WebSocket连接已关闭");
});
});
// 启动Http服务器
server.listen();
}
}
所以看起来对于基本的 WebSocket 服务器只需要传入两个参数:
Vertx: 运行时HttpServerOptions: Http服务器配置
那么很简单集成抽象实现:
/**
* WebSocket的Actor管理器
*/
public class WebSocketSupervisor extends AbstractSupervisor<IEvent<?>> {
/**
* VertX运行时
*/
final Vertx vertx;
/**
* Http启动配置
* <p>
* 用于重启服务器的时候释放掉 httpServer 句柄并重新构建
*/
final HttpServerOptions options;
/**
* 构造方法,不允许外部实例化
*
* @param context Actor运行时
* @param vertx VertX运行时
* @param options Http配置信息
* @param capacity 子Actor容量
*/
private WebSocketSupervisor(ActorContext<IEvent<?>> context, Vertx vertx, HttpServerOptions options, int capacity) {
super(context, capacity);
this.vertx = vertx;
this.options = options;
}
/**
* 构造方法, 不允许外部实例化
*
* @param context Actor运行时
* @param vertx VertX运行时
* @param options Http配置信息
*/
private WebSocketSupervisor(ActorContext<IEvent<?>> context, Vertx vertx, HttpServerOptions options) {
super(context);
this.vertx = vertx;
this.options = options;
}
/**
* 静态构建
*/
public static Behavior<IEvent<?>> create(Vertx vertx, HttpServerOptions options) {
return Behaviors.setup(ctx -> new WebSocketSupervisor(ctx, vertx, options));
}
/**
* 静态构建
*/
public static Behavior<IEvent<?>> create(Vertx vertx, HttpServerOptions options, Consumer<WebSocketSupervisor> creator) {
return Behaviors.setup(ctx -> {
final var owner = new WebSocketSupervisor(ctx, vertx, options);
creator.accept(owner);
return owner;
});
}
// todo:其他扩展功能
}
以上暂时仅搭建空白的服务框架, 但是外部只需要直接传入 VertX 相关配置就可以运行:
final var vertx = Vertx.vertx();
final var options = new HttpServerOptions()
.setHost("127.0.0.1") // 监听hostname
.setPort(8088) // 监听端口
.setSsl(false); // 是否启用 https
// 创建 WebSocket运行时, 启动之后这里会提示说消息没有被拦截 `unhandled` , 这是正常的现象
// to Actor[pekko://websocket/user] was unhandled. [1] dead letters encountered
final var runtime = WebSocketSupervisor.create(vertx, options);
final var actor = ActorSystem.create(
runtime,
"websocket");
这是否就开始规划功能了, VertX 需要 开启 和 关闭 功能来给 开服|停服 通知:
/**
* WebSocket的Actor管理器
*/
public class WebSocketSupervisor extends AbstractSupervisor<IEvent<?>> {
/**
* Http服务句柄
*/
HttpServer server = null;
/**
* 启动WebSocket服务, 用于开服启动
*/
public void startHttpServer() {
final var logger = getContext().getLog();
if (server != null) {
logger.warn("websocket server already started");
return;
}
logger.info("websocket server started");
// 创建HTTP服务,
server = vertx.createHttpServer(options);
server.webSocketHandler(webSocket -> {
// todo:至此就是会话层的关系
// 消息回调
webSocket.frameHandler(frameHandler -> {
// 这里测试下触发关服, 只需要客户端发送 'stop' 关键字就自动关闭
// 这里其实是错误示范, 因为这种关于 actor 操作应该放入内部的消息队列, 也就是 tell 让其按照顺序去执行
// 直接不走 Actor 关闭服务器会导致分离出来的子 actor 消息推送强制中断.
if (frameHandler.isText()) {
final var msg = frameHandler.textData();
if (msg.equals("stop")) {
stopHttpServer();
}
}
});
webSocket.closeHandler(closeHandler -> {
logger.warn("websocket handler closed");
});
});
// 启动监听
server.listen().onFailure(throwable -> {
server.close();
server = null;
logger.error("websocket start failed", throwable);
});
}
/**
* 停服推送
*/
public void stopHttpServer() {
final var logger = getContext().getLog();
if (server == null) {
logger.warn("websocket service not started");
return;
}
// 所有子 Actor 通知关闭
final var ctx = getContext();
for (final var worker : workers.values()) {
ctx.stop(worker);
}
server.close();
}
}
这里初始化构建启动运行时的方式如下:
/**
* 启动应用入口
*/
public class WebSocketApp {
/**
* 消息回调方法
* 这里面的 OnStart | OnStop 是自定义的事件类
*/
private static void init(WebSocketSupervisor supervisor) {
var builder = supervisor.getReceiveBuilder();
// 启动服务
builder.onMessage(OnStart.class, (event) -> {
System.out.println("WebSocket started");
supervisor.startHttpServer();
return supervisor;
});
// 关闭服务
builder.onMessage(OnStop.class, (event) -> {
System.out.println("WebSocket stopped");
supervisor.stopHttpServer();
return Behaviors.same();
});
}
/**
* 核心入口方法
*/
public static void main(String[] args) {
final var vertx = Vertx.vertx();
final var options = new HttpServerOptions()
.setHost("127.0.0.1") // 监听hostname
.setPort(8088) // 监听端口
.setSsl(false); // 是否启用 https
// 创建 WebSocket运行时
final var runtime = WebSocketSupervisor.create(vertx, options, WebSocketApp::init);
final var actor = ActorSystem.create(
runtime,
"websocket");
actor.tell(new OnStart());
}
}
可以用 Postman 这种客户端直接测试连接, 推送 stop 字符串就会把启动的 websocket 服务关闭;
注意这里的 OnStart | OnStop 是我自己自定义的事件类用来拦截消息的.
这就是我们目前跑起来支持 actor 的 websocket 网关程序, 虽然简单但是基本网络传输功能已经设计完成;
接下来就是封装 Session(会话) 层面的子 actor, 用于 代理(Agent) 客户端交互请求.
关键词:
代理者, 用于代理websocket的Open|Close|Message|Error推送到actor对象中心让其自己拦截处理
这里可以暂停一下思考下后续的会话功能类应该怎么去封装, 另外还需要说明下 Actor 内部的回调拦截 ReceiveBuilder:
// 创建拦截器
builder = ReceiveBuilder.create();
// 拦截器当中比较常用的两个函数:
builder.onMessage(...); 细致化消息拦截回调
builder.onSignal(...); 简单的消息信号处理
// 这两种函数最明显的差别就是类型规则声明
// 在 actor 的 message 之中, 需要去自定义传递消息内容类型, 并且后续消息必须继承或者派生该消息类
// 有时候我们不想要设计通用功能接口适应 message 的时候, 可以采用实现 org.apache.pekko.actor.typed.Signal 类来做消息信号结构
// 之前我们采用自己定义 IEvent 就是需要这样比较复杂的派生, 要求对消息自定义被通用衍生继承实现
Signal 方式其实只需要实现接口就行, 样例如下:
import org.apache.pekko.actor.typed.Signal;
/**
* record 是 Java11 新特性, 用于作为数据容器类
*/
public record OnStart(Long timestamp) implements Signal {
// 做些内部自己数据操作功能
}
Signal 机制其实更加符合作为二次封装功能, 因为不需要强制声明传输数据类型:
// 这里看到具体实现的 ReceiveBuilder 构建器, 这里需要注意: 实际类型 T 需要强制声明
ReceiveBuilder<T> builder = ReceiveBuilder.create();
// 类似于 ? 未知声明, 也是 Java 新特性
ReceiveBuilder<?> builder = ReceiveBuilder.create();
// 如果采用 onMessage 编写回调拦截必须强制对 T 类型进行衍生
ReceiveBuilder<?> builder = ReceiveBuilder.create();
builder.onMessage(OnOpenEvent.class,(event)->{
// 异常, 因为回调传入的 event 没办法匹配出传入声明类型
// Inferred type 'M' for type parameter 'M' is not within its bound; should extend 'T'
return null;
});
// 而采用 onSignal 就没有这么多顾虑, 不需要明确指定衍生类型
ReceiveBuilder<?> builder = ReceiveBuilder.create();
builder.onSignal(OnOpenEvent.class,(event)->{
// 直接能够获取 event 对应类型
return null;
});
这是内部的 actor 需要注意的点, 有的官方文档没有提到所以需要说明下, 接下来就是继续后面的 子actor 部分.
首先需要先定义个回调抽象接口, 需要涵盖以下接口暴露给下层外部 Actor( 其实就是定义状态机当中的所有状态 ):
OnOpen: 开启会话OnClose: 关闭会话OnText: 文本回调OnBinary: 二进制回调OnPing: Ping回调OnPong: Pong回调OnError: 错误回调
其实相对来说接口比较好设计, 但是需要思考怎么和 Actor 工作模型集合在一起:
/**
* 简单的会话回调, 这里的 T 是为了方便后续底层衍生
*/
public interface WebSocketBehavior<T> {
/**
* 会话连接时候的回调
*
* @param socket WebSocket 会话句柄
* @return Actor 状态机的状态
*/
Behavior<T> onOpen(ServerWebSocket socket, ActorRef<?> supervisor);
/**
* 推送Open信号给Actor
*
* @param socket WebSocket 会话句柄
* @param supervisor 推送的 actor 句柄
*/
void tellOnOpen(ServerWebSocket socket, ActorRef<?> supervisor);
/**
* Text内容时候的回调
*
* @param socket 会话对象
* @param message 数据内容
* @return Actor 状态机的状态
*/
Behavior<T> onTextMessage(ServerWebSocket socket, String message);
/**
* 推送Text内容给Actor
*
* @param socket 会话对象
* @param message 数据内容
*/
void tellOnTextMessage(ServerWebSocket socket, String message);
/**
* Binary内容时候的回调
*
* @param socket 会话对象
* @param buffer 数据内容
* @return Actor 状态机的状态
*/
Behavior<T> onBinaryMessage(ServerWebSocket socket, Buffer buffer);
/**
* 推送Binary内容给Actor
*
* @param socket 会话对象
* @param buffer 数据内容
*/
void tellOnBinaryMessage(ServerWebSocket socket, Buffer buffer);
/**
* 关闭连接时候的回调
*
* @param socket 会话对象
* @param code 关闭时候的状态码
* @param reason 关闭时候的消息
* @return Actor 状态机的状态
*/
Behavior<T> onClose(ServerWebSocket socket, Short code, String reason);
/**
* 推送关闭连接给 actor
*
* @param socket 会话对象
* @param code 关闭时候的状态码
* @param reason 关闭时候的消息
*/
void tellOnClose(ServerWebSocket socket, Short code, String reason);
/**
* 会话异常时候的回调
*
* @param socket 会话对象
* @param error 异常错误
* @return Actor 状态机的状态
*/
Behavior<T> onError(ServerWebSocket socket, Throwable error);
/**
* 推送会话异常给 actor
*
* @param socket 会话对象
* @param error 异常错误
*/
void tellOnError(ServerWebSocket socket, Throwable error);
/**
* 用于给底层实现消息拦截器
*
* @param builder 消息拦截器句柄
* @return 最后再封装的的消息拦截器
*/
default ReceiveBuilder<T> onInit(ReceiveBuilder<T> builder) {
return builder;
}
/**
* 默认封装起来的的消息拦截器
*
* @return 消息拦截器
*/
default Receive<T> receive() {
ReceiveBuilder<T> builder = ReceiveBuilder.create();
builder.onSignal(OnOpenEvent.class, (signal) -> this.onOpen(signal.socket))
.onSignal(OnTextMessageEvent.class, (event) -> this.onTextMessage(event.socket, event.message))
.onSignal(OnBinaryMessageEvent.class, (event) -> this.onBinaryMessage(event.socket, event.buffer))
.onSignal(OnCloseEvent.class, (event) -> this.onClose(event.socket, event.code, event.reason))
.onSignal(OnErrorEvent.class, (event) -> this.onError(event.socket, event.error))
;
return onInit(builder).build();
}
/**
* 开启连接信号
*
* @param socket
*/
record OnOpenEvent(
ServerWebSocket socket
) implements Signal {
}
/**
* 文本传输信号
*
* @param socket
* @param message
*/
record OnTextMessageEvent(
ServerWebSocket socket,
String message
) implements Signal {
}
/**
* 二进制传输信号
*
* @param socket
* @param buffer
*/
record OnBinaryMessageEvent(
ServerWebSocket socket,
Buffer buffer
) implements Signal {
}
/**
* 关闭连接信号
*
* @param socket
* @param code
* @param reason
*/
record OnCloseEvent(
ServerWebSocket socket,
Short code,
String reason
) implements Signal {
}
/**
* 连接异常信号
*
* @param socket
* @param error
*/
record OnErrorEvent(
ServerWebSocket socket,
Throwable error
) implements Signal {
}
}
这样定义接口的好处就是具体实现类不需要关心底层处理细节, 只需要将实现类转为 Actor 对象并实现该接口,
在构建 ActorSystem 的时候引入即可.
如果学习过 Erlang 的 Actor 设计, 后续基本上就能知道该怎么去处理, 基本上就是 Supervisor + Worker 架构设计;
很多时候需要借鉴不同开发语言和思想, 从而实现自己心目当中所需的功能.
可以按照自己思路来编, 但是写的时候会出现异常( 第三方库用到了跨线程操作 ):
No message is currently processed by the actor, but ActorContext was called from Thread
后续需要说明的问题: 第三方 Actor 异步处理与多线程处理