MeteorCat / 构建游戏网关(四)

Created Sun, 20 Apr 2025 19:05:30 +0800 Modified Wed, 29 Oct 2025 23:24:45 +0800
1821 Words

作为单机 Actor 结构目前已经实现简单的转发功能, 但是需要注意对于客户端消息来说有以下区分:

  • 请求(request): 客户端主动推送给服务端消息包
  • 响应(response): 服务端主动推送给客户端消息包

需要区分 长连接短连接 区别, 短连接(HTTP) 要求从发起到响应必须双向(请求必须要带响应); 而长连接(TCP|UDP) 是非双向, 也就是可能只有请求而没有响应或者干脆只有响应.

我们编写的简单 Actor 虽然完成基本将客户端消息请求功能, 但是目前没有合适的响应功能, 而没有合适响应功能也就没办法实现我们之前 服务端心跳包 需求.

但是这时候不要先急着去实现, 而是要回头思考下: 为什么消息格式能够被正确转发?

消息传递

ActorRef<T> 得出这里面推送的消息结构是泛型, 也就是可能本身也就是 Object 对象, 而 createReceive 的回调就更加明显展示消息转化流程:

public <M extends T> ReceiveBuilder<T> onMessage(final Class<M> type, final Function<M, Behavior<T>> handler) {
    OptionVal.Some var10000 = org.apache.pekko.util.OptionVal.Some..MODULE$;
    org.apache.pekko.util.OptionVal..MODULE$.None();
    // 关键的转化流程, 实际上就是按照传入的动态类型转化写入匹配路由
    Case var3 = new Case(type, (Predicate) null, handler);
    this.messageHandlers_$eq((List) this.messageHandlers().$plus$colon(var3));
    return this;
}

这个方法往下就能看到 class Case<BT, MT> implements Product, Serializable 转化类, 其中关键内容方法已经展示出来了:

// 判断是否可以转化
public boolean canEqual(final Object x$1) {
    return x$1 instanceof Case;
}

// 是否匹配类型
public boolean equals(final Object x$1) {
    // 判断转化, 代码略
    return false;
}

那么基本上已经可以知道消息怎么回事, 消息必须要通过 Serializable 序列化处理, 之后通过 Case<BT, MT> 挂载请求信号|消息表来获取消息匹配处理.

注意: 序列化传递消息其实很低效, 因为对于消息传输会携带大量没有意义的冗余数据, 所以正式项目会采用自己编写序列化方法

查看了官方文档看到以下配置, 就知道默认会将数据序列化之后放入 ActorMailBox:

pekko {
    serializers {
        java = "org.apache.pekko.serialization.JavaSerializer"
        primitive-string = "org.apache.pekko.serialization.StringSerializer"
        # ......
    }

    serialization-bindings {
        "java.io.Serializable" = java
        "java.lang.String" = primitive-string
        # .......
    }
}

这里是 pekko 包内默认 reference.conf 配置选项, 已经揭示了数据序列化传输的过程, 那么按照这样我们自己编写可序列化的结构类可以吗? 那么来测试下编写自己的序列化类:

// src/main/java/com/meteorcat/fusion/message/IMessage.java
import java.io.Serializable;

/**
 * 抽象的消息接口
 * 相当于所有消息必须实现 IMessage 接口才能用于传输
 *
 * @param <T> 消息内容类型
 */
public interface IMessage<T> extends Serializable {

    /**
     * 消息ID
     *
     * @return int
     */
    int getId();

    /**
     * 消息内容
     *
     * @return Object
     */
    T getMessage();
}

// src/main/java/com/meteorcat/fusion/message/WebSocketMessage.java
import lombok.Builder;
import lombok.ToString;

/**
 * 自定义的序列化消息结构
 */
@Builder
@ToString
public class WebSocketMessage implements IMessage<String> {

    /**
     * 消息ID
     */
    private int id;

    /**
     * 消息结构JSON
     */
    private String message;


    @Override
    public int getId() {
        return id;
    }

    @Override
    public String getMessage() {
        return message;
    }
}

这里就是我们自己实现的通用序列化消息的结构对象, 之后就是替换 Actor 系统的具体类型:

import com.meteorcat.fusion.message.IMessage;
import com.meteorcat.fusion.message.WebSocketMessage;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Receive;
import org.slf4j.Logger;

/**
 * 简单的 Actor 服务
 */
public class ActorSupervisor extends AbstractBehavior<IMessage<String>> {

    /**
     * 日志句柄
     */
    final Logger log = getContext().getLog();

    /**
     * 构造方法
     *
     * @param context Actor上下文
     */
    public ActorSupervisor(ActorContext<IMessage<String>> context) {
        super(context);
    }

    /**
     * 构建回调匹配
     *
     * @return Receive
     */
    @Override
    public Receive<IMessage<String>> createReceive() {
        return newReceiveBuilder()
                .onMessage(WebSocketMessage.class, this::onCustomMessage)
                .build();
    }

    /**
     * 自定义结构对象
     *
     * @param msg 消息
     * @return Behavior
     */
    private Behavior<IMessage<String>> onCustomMessage(WebSocketMessage msg) {
        log.info("Struct : {}!", msg);
        return this;
    }
}

不再以 String.class 作为传递对象改由让自定义结构作为传输载体, 最后 WebSocket 修改下类型:

/**
 * Text类型的WebSocket句柄
 */
@Slf4j
@Component
public class WebSocketApplication extends TextWebSocketHandler {

    /**
     * Actor全局系统
     * 修改为自定义结构来传递
     */
    final ActorSystem<IMessage<String>> system;

    /**
     * 构建方法
     *
     * @param objectMapper JSON解析器
     */
    @Autowired
    public WebSocketApplication(ActorSystem<IMessage<String>> system, ObjectMapper objectMapper) {
        this.system = system;
        this.objectMapper = objectMapper;
    }

    /**
     * 消息回调
     * 以 JSON:{ id: number, data: {} } 来做消息传递
     */
    @Override
    protected void handleTextMessage(
            @NonNull WebSocketSession session,
            @NonNull TextMessage message
    ) throws Exception {
        final String payload = message.getPayload();
        if (payload.isEmpty()) {
            return;
        }

        // 确认必须带 Object 并且格式为 { id: number, data: {} }
        final JsonNode node = objectMapper.readTree(payload);
        if (!node.isObject() || !node.has("id") || !node.has("data")) {
            return;
        }

        // 提取数据
        final JsonNode id = node.get("id");
        final JsonNode data = node.get("data");
        if (!id.isNumber() || !data.isObject()) {
            return;
        }

        // 自定义序列化结构对象
        WebSocketMessage protocol = WebSocketMessage
                .builder()
                .id(id.asInt())
                .message(data.toString())
                .build();

        system.tell(protocol);
    }
}

执行之后用客户端测试推送 JSON 格式消息给 Actor 返回数据:

2025-04-23T13:38:22.964+08:00  INFO 9672 --- [nio-8080-exec-1] c.m.f.websocket.WebSocketApplication     : Established: 9807a6c4-1d8c-aa7e-dc19-d2e83017979c
2025-04-23T13:38:23.675+08:00  INFO 9672 --- [nio-8080-exec-2] c.m.f.websocket.WebSocketApplication     : frame received(100): {"flag":1001}
2025-04-23T13:38:23.687+08:00  INFO 9672 --- [lt-dispatcher-3] c.m.fusion.actor.ActorSupervisor         : Struct : WebSocketMessage(id=100, message={"flag":1001})!

自定义的消息结构已经完成, 那么接下来编写响应对象功能.

消息响应

SpringBoot 的消息响应相对来说没问题困难, 直接登录时候记录在线会话映射然后 Actor 获取全局实例对象就可以推送.

而我们定义 ActorSupervisorActorSystem 就需要传递 WebSocketHandler 来让其能够接触到 WebSocket 单例:

/**
 * 简单的 Actor 服务
 */
public class ActorSupervisor extends AbstractBehavior<IMessage<String>> {

    /**
     * WebSocket数据层, 也就是外部需要构建的时候传递过来的全局句柄
     */
    final WebSocketHandler webSocketHandler;

    /**
     * 构造方法
     *
     * @param context Actor上下文
     */
    public ActorSupervisor(ActorContext<IMessage<String>> context, WebSocketHandler webSocketHandler) {
        super(context);
        this.webSocketHandler = webSocketHandler;
    }
}

那么现在需要拆分 ActorProperties(Actor属性) + ActorConfiguration(Actor配置) 区分, ActorProperties 主要用于加载外部 yaml 等配置文件属性, ActorConfiguration 则是对内做初始化全局对象.

// src/main/java/com/meteorcat/fusion/config/ActorProperties.java
import lombok.Data;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * 全局 Actor 属性
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "fusion.actor")
public class ActorProperties {

    /**
     * Actor系统名称
     */
    private String name;


    /**
     * Actor类型
     */
    private Class<? extends AbstractBehavior<?>> behavior;
}

// src/main/java/com/meteorcat/fusion/config/ActorConfiguration.java
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;


/**
 * Actor配置中心
 */
@Configuration
public class ActorConfiguration {


    /**
     * Actor属性
     */
    final ActorProperties properties;

    /**
     * WebSocket配置
     */
    final WebSocketConfiguration webSocketConfiguration;

    @Autowired
    public ActorConfiguration(ActorProperties properties, WebSocketConfiguration webSocketConfiguration) {
        this.properties = properties;
        this.webSocketConfiguration = webSocketConfiguration;
    }


    /**
     * 实例句柄
     */
    @Bean
    public ActorSystem<?> factory() {
        return ActorSystem.create(
                Behaviors.setup((ctx) -> {

                    // 加载全局WebSocket配置
                    final WebSocketHandler webSocketHandler = webSocketConfiguration
                            .getContext()
                            .getBean(webSocketConfiguration.getProperties().getHandler());

                    // 获取Actor句柄
                    AbstractBehavior<?> instance = properties.getBehavior().getConstructor(
                            ActorContext.class,
                            WebSocketHandler.class
                    ).newInstance(ctx, webSocketHandler);

                    return instance.unsafeCast();
                }),
                properties.getName()
        );
    }
}