MeteorCat / Actor消息队列化

Created Thu, 18 Jan 2024 14:48:26 +0800 Modified Wed, 29 Oct 2025 23:25:00 +0800
2177 Words

Actor消息队列化

这里需要先查看些基础内容:

这里说明下怎么集成 SpringBoot 涉及自己的 Actor 消息传递问题, 这里先集成需要的库:


<dependencies>

    <!-- Websocket -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    <!-- Actor -->
    <dependency>
        <groupId>com.meteorcat.spring.boot</groupId>
        <artifactId>actor-spring-boot-starter</artifactId>
        <version>1.0.15-SNAPSHOT</version>
    </dependency>

    <!-- Hotfix -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>

    <!-- Configuration -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>

    <!-- Test Unit -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

这里设计个纯文本交互游戏服务端, 主要是没有美术素材所以直接设计文本放置游戏, 这样用文本方便描述效果.

加载 WebSocket 配置处理:


@Configuration
@EnableWebSocket
public class WebsocketConfig implements WebSocketConfigurer {

    /**
     * 访问路径
     */
    @Value("${websocket.server.path:/}")
    private String serverPath;

    /**
     * 传输数据缓存大小
     */
    @Value("${websocket.buffer.max.size:8192}")
    private Integer bufferMaxSize;


    /**
     * 待机主动中断时间
     */
    @Value("${websocket.idle.timeout:600000}")
    private Long idleTimeout;


    /**
     * 允许跨域地址
     */
    @Value("${websocket.allowed.origins:*}")
    private String allowOrigins;

    /**
     * 默认加载的服务
     */
    private final WebsocketApplication handler;

    public WebsocketConfig(WebsocketApplication handler) {
        this.handler = handler;
    }

    /**
     * 注册运行时句柄
     *
     * @param registry 注册器
     */
    @Override
    public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
        if (handler == null) {
            throw new RuntimeException("failed by WebSocketHandler: WebSocketHandler");
        }
        registry.addHandler(handler, serverPath).setAllowedOrigins(allowOrigins);
    }


    /**
     * 全局 Servlet 的配置容器
     *
     * @return ServletServerContainerFactoryBean
     */
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(bufferMaxSize);
        container.setMaxBinaryMessageBufferSize(bufferMaxSize);
        container.setMaxSessionIdleTimeout(idleTimeout);
        return container;
    }
}

之后就是 Actor 配置:


@Configuration
public class ActorConfig {
    private final ApplicationContext context;


    public ActorConfig(ApplicationContext context) {
        this.context = context;
    }

    /**
     * 配置 Actor 加载
     * @return ActorEventContainer
     */
    @Bean(initMethod = "init", destroyMethod = "destroy")
    public ActorEventContainer searchActor() {
        ActorEventContainer container = new ActorEventContainer(new ActorEventMonitor(5));
        container.setIdleThreads(1); // 预留线程处理
        Map<String, ActorConfigurer> classes = context.getBeansOfType(ActorConfigurer.class);
        for (Map.Entry<String, ActorConfigurer> clazz : classes.entrySet()) {
            ActorConfigurer configurer = clazz.getValue();
            for (Integer value : configurer.values()) {
                container.put(value, configurer);
            }
        }
        return container;
    }
}

最后附加上 WebSocket 服务:

/**
 * 挂载 Websocket 服务
 */
@Order
@Component
public class WebsocketApplication extends TextWebSocketHandler {


    public class MessageFrame {
        final WebSocketSession session;

        final TextMessage message;


        public MessageFrame(WebSocketSession session, TextMessage message) {
            this.session = session;
            this.message = message;
        }

        public TextMessage getMessage() {
            return message;
        }

        public WebSocketSession getSession() {
            return session;
        }
    }


    /**
     * 日志句柄
     */
    Logger logger = LoggerFactory.getLogger(WebsocketApplication.class);

    /**
     * Actor 运行时
     */
    final ActorEventContainer container;


    /**
     * 获取 Actor 运行时
     *
     * @return ActorEventContainer
     */
    public ActorEventContainer getContainer() {
        return container;
    }

    /**
     * 玩家目前的登录状态 - 采用线程安全
     */
    final Map<WebSocketSession, Integer> online = new ConcurrentHashMap<>();

    /**
     * 玩家登录 UID 标识 - 采用线程安全
     */
    final Map<WebSocketSession, Long> users = new ConcurrentHashMap<>();


    /**
     * Json 解析器
     */
    final ObjectMapper mapper = new ObjectMapper();


    /**
     * 切换玩家会话状态
     *
     * @param session 会话
     * @param state   状态
     */
    public void setState(WebSocketSession session, Integer state) {
        if (online.containsKey(session)) {
            online.put(session, state);
        }
    }

    /**
     * 设置玩家ID
     *
     * @param session 会话
     * @param uid     Long
     */
    public void setUid(WebSocketSession session, Long uid) {
        if (users.containsValue(uid)) {
            for (Map.Entry<WebSocketSession, Long> user : users.entrySet()) {
                if (user.getValue().equals(uid)) {
                    users.remove(user.getKey());
                    break;
                }
            }
        }
        users.put(session, uid);
    }


    /**
     * 获取玩家ID
     *
     * @param session 会话
     * @return Uid
     */
    public Long getUid(WebSocketSession session) {
        return users.get(session);
    }


    /**
     * 构造方法
     *
     * @param container Actor events
     */
    public WebsocketApplication(ActorEventContainer container) {
        this.container = container;
    }


    /**
     * Established
     *
     * @param session handler
     * @throws Exception Error
     */
    @Override
    public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
        logger.debug("Established = {}", session);
        online.put(session, 0);
    }


    /**
     * Closed
     *
     * @param session handler
     * @param status  close state
     * @throws Exception Error
     */
    @Override
    public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus status) throws Exception {
        logger.debug("Close = {},{}", session, status);
        online.remove(session);
    }


    /**
     * 采用JSON数据接收 { "value": 100, args:{ data.... } }
     * @param session handler
     * @param message text
     * @throws Exception Error
     */
    @Override
    protected void handleTextMessage(@NonNull WebSocketSession session, @NonNull TextMessage message) throws Exception {
        if (message.getPayloadLength() <= 0) {
            return;
        }

        // json: { "value": 100, args:{ data.... } }
        JsonNode json = mapper.readTree(message.asBytes());
        if (!json.isObject()) {
            return;
        }

        // json.value
        JsonNode valueNode = json.get("value");
        if (valueNode == null || !valueNode.isInt()) {
            return;
        }

        // container value
        Integer value = valueNode.intValue();
        ActorConfigurer configurer = container.get(value);
        if (configurer == null) {
            return;
        }

        // json.args
        JsonNode args = json.get("args");
        args = args.isObject() ? args : null;

        // forward configurer
        configurer.invoke(value, online.get(session), this, session, args);
    }
}

目前服务已经挂起了, 这里编写测试单元测试测试下是否能够运行( TestLogic ):

/**
 * 加载单元测试服务
 */
@EnableActor(owner = TestLogic.class)
public class TestLogic extends ActorConfigurer {

    /**
     * 日志对象
     */
    final Logger logger = LoggerFactory.getLogger(TestLogic.class);

    /**
     * 服务启动时候加载初始化
     *
     * @throws Exception Error
     */
    @Override
    public void init() throws Exception {
        super.init();
        logger.info("测试单元启动");
    }

    /**
     * 服务退出时候退出运行
     *
     * @throws Exception Error
     */
    @Override
    public void destroy() throws Exception {
        super.destroy();
        logger.info("测试单元退出");
    }

    /**
     * 回显服务
     *
     * @param runtime 运行时
     * @param session 会话
     * @param args    参数
     */
    @ActorMapping(value = 10)
    public void echo(WebsocketApplication runtime, WebSocketSession session, JsonNode args) throws IOException {
        if (args != null) {
            session.sendMessage(new TextMessage(args.toString()));
        }
    }
}

最后测试运行 JSON 用 WebSocket 推送:

{
  "value": 10,
  "args": {
    "text": "hello.world"
  }
}

之后回显会直接返回以下内容:

{
  "text": "hello.world"
}

那么单个游戏服务已经挂起, 所以之后就以此为例处理.

消息队列化

虽然能够看到目前推送是正常的, 但是实际上推送问题并不是那么简单的, 因为目前推送数据是无序推送的情况, 会带来以下问题:

  • 增加 Actor 内部处理逻辑, Actor 本身自己只负责处理, Actor 推送客户端数据不该浪费业务线程占用.
  • 多人帧同步游戏会连续推送( 比如多人塔防游戏 )不断步进推送对象位置, 推送数据没有做队列的情况会出现多人的某个人出现后一帧先于前一帧推送过来.
  • 如果客户端的 WebSocket 中断, Actor 的异常需要在私有业务线程的 Actor 抛出, 只有抛出之后才能解除当前 Actor 业务锁.

比较粗浅的就是这几种情况, 要尽量减少各自 Actor 的占用时间才能达到游戏服务端最高效情况.

Actor 内部运行时带有线程锁, 所以 Actor 尽可能将逻辑业务分散并且减少占用时间

所以这里改造 WebsocketApplication 把推送业务移交到网络外层去处理, 首先是数据帧定义:

/**
 * 用于推送的网络数据帧
 */
public class MessageFrame {

    /**
     * 句柄
     */
    final WebSocketSession session;

    /**
     * 数据
     */
    final TextMessage message;


    public MessageFrame(WebSocketSession session, TextMessage message) {
        this.session = session;
        this.message = message;
    }

    public TextMessage getMessage() {
        return message;
    }

    public WebSocketSession getSession() {
        return session;
    }
}

之后重新修改 WebSocketApplication 功能, 追加数据推送队列:

/**
 * 挂载 Websocket 服务
 */
@Order
@Component
public class WebsocketApplication extends TextWebSocketHandler {
    // 其他代码, 略

    /**
     * 消息队列 - 数据采用线程安全处理
     */
    final Queue<MessageFrame> messages = new ConcurrentLinkedDeque<>();


    /**
     * 构造方法
     *
     * @param container Actor events
     */
    public WebsocketApplication(ActorEventContainer container) {
        this.container = container;

        // 启动时候线程池追加定时服务, 这里的延迟实际上可以优化
        // 当数据队列为空的时候可以休眠下, 当 push 功能可以考虑继续唤醒定时任务
        this.container.scheduleAtFixedRate(() -> {
            // 获取消息队列数据
            MessageFrame frame = messages.poll();
            if (frame != null && frame.getSession().isOpen()) {
                try {
                    // 推送数据
                    frame.getSession().sendMessage(frame.getMessage());
                } catch (IOException e) {
                    logger.warn(e.getMessage());
                    throw new RuntimeException(e);
                }
            }
        }, 0, 1000L, TimeUnit.MILLISECONDS);
    }

    /**
     * 推送消息给队列处理
     *
     * @param session 会话
     * @param value   响应协议值
     * @param args    响应JSON
     * @throws IOException Error
     */
    public void push(WebSocketSession session, Integer value, Map<String, Object> args) throws IOException {
        Map<String, Object> response = new HashMap<>() {{
            put("value", value);
            put("args", args);
        }};
        push(session, mapper.writeValueAsString(response));
    }

    /**
     * 推送消息给队列处理
     *
     * @param session 会话
     * @param text    数据
     * @throws IOException Error
     */
    public void push(WebSocketSession session, String text) throws IOException {
        messages.add(new MessageFrame(session, new TextMessage(text)));
    }
}

追加完这些消息队列功能之后, 就重构之前 TestLogic 的服务, 该由外部运行时来推送:

/**
 * 加载单元测试服务
 */
@EnableActor(owner = TestLogic.class)
public class TestLogic extends ActorConfigurer {

    /**
     * 日志对象
     */
    final Logger logger = LoggerFactory.getLogger(TestLogic.class);

    /**
     * 服务启动时候加载初始化
     *
     * @throws Exception Error
     */
    @Override
    public void init() throws Exception {
        super.init();
        logger.info("测试单元启动");
    }

    /**
     * 服务退出时候退出运行
     *
     * @throws Exception Error
     */
    @Override
    public void destroy() throws Exception {
        super.destroy();
        logger.info("测试单元退出");
    }

    /**
     * 回显服务
     *
     * @param runtime 运行时
     * @param session 会话
     * @param args    参数
     */
    @ActorMapping(value = 10)
    public void echo(WebsocketApplication runtime, WebSocketSession session, JsonNode args) throws IOException {
        // 现在不需要在 Actor 内部处理推送,只需要移交消息队列推送
        if (args != null){
            runtime.push(session,args.toString());
        }
    }
}

Ok, 现在再测试回显功能, 现在就是完全采用队列化推送数据功能了.