作为单机 Actor 结构目前已经实现简单的转发功能, 但是需要注意对于客户端消息来说有以下区分:
请求(request): 客户端主动推送给服务端消息包响应(response): 服务端主动推送给客户端消息包
需要区分 长连接 和 短连接 区别, 短连接(HTTP) 要求从发起到响应必须双向(请求必须要带响应);
而长连接(TCP|UDP) 是非双向, 也就是可能只有请求而没有响应或者干脆只有响应.
我们编写的简单 Actor 虽然完成基本将客户端消息请求功能, 但是目前没有合适的响应功能,
而没有合适响应功能也就没办法实现我们之前 服务端心跳包 需求.
但是这时候不要先急着去实现, 而是要回头思考下: 为什么消息格式能够被正确转发?
消息传递
从 ActorRef<T> 得出这里面推送的消息结构是泛型, 也就是可能本身也就是 Object 对象,
而 createReceive 的回调就更加明显展示消息转化流程:
public <M extends T> ReceiveBuilder<T> onMessage(final Class<M> type, final Function<M, Behavior<T>> handler) {
OptionVal.Some var10000 = org.apache.pekko.util.OptionVal.Some..MODULE$;
org.apache.pekko.util.OptionVal..MODULE$.None();
// 关键的转化流程, 实际上就是按照传入的动态类型转化写入匹配路由
Case var3 = new Case(type, (Predicate) null, handler);
this.messageHandlers_$eq((List) this.messageHandlers().$plus$colon(var3));
return this;
}
这个方法往下就能看到 class Case<BT, MT> implements Product, Serializable 转化类,
其中关键内容方法已经展示出来了:
// 判断是否可以转化
public boolean canEqual(final Object x$1) {
return x$1 instanceof Case;
}
// 是否匹配类型
public boolean equals(final Object x$1) {
// 判断转化, 代码略
return false;
}
那么基本上已经可以知道消息怎么回事, 消息必须要通过 Serializable 序列化处理,
之后通过 Case<BT, MT> 挂载请求信号|消息表来获取消息匹配处理.
注意: 序列化传递消息其实很低效, 因为对于消息传输会携带大量没有意义的冗余数据, 所以正式项目会采用自己编写序列化方法
查看了官方文档看到以下配置, 就知道默认会将数据序列化之后放入 Actor 的 MailBox:
pekko {
serializers {
java = "org.apache.pekko.serialization.JavaSerializer"
primitive-string = "org.apache.pekko.serialization.StringSerializer"
# ......
}
serialization-bindings {
"java.io.Serializable" = java
"java.lang.String" = primitive-string
# .......
}
}
这里是 pekko 包内默认 reference.conf 配置选项, 已经揭示了数据序列化传输的过程,
那么按照这样我们自己编写可序列化的结构类可以吗? 那么来测试下编写自己的序列化类:
// src/main/java/com/meteorcat/fusion/message/IMessage.java
import java.io.Serializable;
/**
* 抽象的消息接口
* 相当于所有消息必须实现 IMessage 接口才能用于传输
*
* @param <T> 消息内容类型
*/
public interface IMessage<T> extends Serializable {
/**
* 消息ID
*
* @return int
*/
int getId();
/**
* 消息内容
*
* @return Object
*/
T getMessage();
}
// src/main/java/com/meteorcat/fusion/message/WebSocketMessage.java
import lombok.Builder;
import lombok.ToString;
/**
* 自定义的序列化消息结构
*/
@Builder
@ToString
public class WebSocketMessage implements IMessage<String> {
/**
* 消息ID
*/
private int id;
/**
* 消息结构JSON
*/
private String message;
@Override
public int getId() {
return id;
}
@Override
public String getMessage() {
return message;
}
}
这里就是我们自己实现的通用序列化消息的结构对象, 之后就是替换 Actor 系统的具体类型:
import com.meteorcat.fusion.message.IMessage;
import com.meteorcat.fusion.message.WebSocketMessage;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Receive;
import org.slf4j.Logger;
/**
* 简单的 Actor 服务
*/
public class ActorSupervisor extends AbstractBehavior<IMessage<String>> {
/**
* 日志句柄
*/
final Logger log = getContext().getLog();
/**
* 构造方法
*
* @param context Actor上下文
*/
public ActorSupervisor(ActorContext<IMessage<String>> context) {
super(context);
}
/**
* 构建回调匹配
*
* @return Receive
*/
@Override
public Receive<IMessage<String>> createReceive() {
return newReceiveBuilder()
.onMessage(WebSocketMessage.class, this::onCustomMessage)
.build();
}
/**
* 自定义结构对象
*
* @param msg 消息
* @return Behavior
*/
private Behavior<IMessage<String>> onCustomMessage(WebSocketMessage msg) {
log.info("Struct : {}!", msg);
return this;
}
}
不再以 String.class 作为传递对象改由让自定义结构作为传输载体, 最后 WebSocket 修改下类型:
/**
* Text类型的WebSocket句柄
*/
@Slf4j
@Component
public class WebSocketApplication extends TextWebSocketHandler {
/**
* Actor全局系统
* 修改为自定义结构来传递
*/
final ActorSystem<IMessage<String>> system;
/**
* 构建方法
*
* @param objectMapper JSON解析器
*/
@Autowired
public WebSocketApplication(ActorSystem<IMessage<String>> system, ObjectMapper objectMapper) {
this.system = system;
this.objectMapper = objectMapper;
}
/**
* 消息回调
* 以 JSON:{ id: number, data: {} } 来做消息传递
*/
@Override
protected void handleTextMessage(
@NonNull WebSocketSession session,
@NonNull TextMessage message
) throws Exception {
final String payload = message.getPayload();
if (payload.isEmpty()) {
return;
}
// 确认必须带 Object 并且格式为 { id: number, data: {} }
final JsonNode node = objectMapper.readTree(payload);
if (!node.isObject() || !node.has("id") || !node.has("data")) {
return;
}
// 提取数据
final JsonNode id = node.get("id");
final JsonNode data = node.get("data");
if (!id.isNumber() || !data.isObject()) {
return;
}
// 自定义序列化结构对象
WebSocketMessage protocol = WebSocketMessage
.builder()
.id(id.asInt())
.message(data.toString())
.build();
system.tell(protocol);
}
}
执行之后用客户端测试推送 JSON 格式消息给 Actor 返回数据:
2025-04-23T13:38:22.964+08:00 INFO 9672 --- [nio-8080-exec-1] c.m.f.websocket.WebSocketApplication : Established: 9807a6c4-1d8c-aa7e-dc19-d2e83017979c
2025-04-23T13:38:23.675+08:00 INFO 9672 --- [nio-8080-exec-2] c.m.f.websocket.WebSocketApplication : frame received(100): {"flag":1001}
2025-04-23T13:38:23.687+08:00 INFO 9672 --- [lt-dispatcher-3] c.m.fusion.actor.ActorSupervisor : Struct : WebSocketMessage(id=100, message={"flag":1001})!
自定义的消息结构已经完成, 那么接下来编写响应对象功能.
消息响应
SpringBoot 的消息响应相对来说没问题困难, 直接登录时候记录在线会话映射然后 Actor 获取全局实例对象就可以推送.
而我们定义 ActorSupervisor 的 ActorSystem 就需要传递 WebSocketHandler 来让其能够接触到 WebSocket 单例:
/**
* 简单的 Actor 服务
*/
public class ActorSupervisor extends AbstractBehavior<IMessage<String>> {
/**
* WebSocket数据层, 也就是外部需要构建的时候传递过来的全局句柄
*/
final WebSocketHandler webSocketHandler;
/**
* 构造方法
*
* @param context Actor上下文
*/
public ActorSupervisor(ActorContext<IMessage<String>> context, WebSocketHandler webSocketHandler) {
super(context);
this.webSocketHandler = webSocketHandler;
}
}
那么现在需要拆分 ActorProperties(Actor属性) + ActorConfiguration(Actor配置) 区分,
ActorProperties 主要用于加载外部 yaml 等配置文件属性, ActorConfiguration 则是对内做初始化全局对象.
// src/main/java/com/meteorcat/fusion/config/ActorProperties.java
import lombok.Data;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* 全局 Actor 属性
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "fusion.actor")
public class ActorProperties {
/**
* Actor系统名称
*/
private String name;
/**
* Actor类型
*/
private Class<? extends AbstractBehavior<?>> behavior;
}
// src/main/java/com/meteorcat/fusion/config/ActorConfiguration.java
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
/**
* Actor配置中心
*/
@Configuration
public class ActorConfiguration {
/**
* Actor属性
*/
final ActorProperties properties;
/**
* WebSocket配置
*/
final WebSocketConfiguration webSocketConfiguration;
@Autowired
public ActorConfiguration(ActorProperties properties, WebSocketConfiguration webSocketConfiguration) {
this.properties = properties;
this.webSocketConfiguration = webSocketConfiguration;
}
/**
* 实例句柄
*/
@Bean
public ActorSystem<?> factory() {
return ActorSystem.create(
Behaviors.setup((ctx) -> {
// 加载全局WebSocket配置
final WebSocketHandler webSocketHandler = webSocketConfiguration
.getContext()
.getBean(webSocketConfiguration.getProperties().getHandler());
// 获取Actor句柄
AbstractBehavior<?> instance = properties.getBehavior().getConstructor(
ActorContext.class,
WebSocketHandler.class
).newInstance(ctx, webSocketHandler);
return instance.unsafeCast();
}),
properties.getName()
);
}
}