MeteorCat / H5游戏服务端(一)

Created Fri, 19 Jan 2024 13:59:21 +0800 Modified Wed, 29 Oct 2025 23:24:59 +0800

搭建环境

对于 H5 游戏来说没有太多代码业务热更新的情况, 所以 skynet 挂载 lua 实现业务热更新的场景可以删减; Websocket 协议内部数据发包已经确定好, 每条消息都是获取取用就行( 不需要类似 int32[status] + int32[length] + data[bytes] 二进制消息封包 ).

虽然 Websocket 数据相比于 TCP/UDP 层面来说性能不高( 链接时带上了 HTTPS 头的数据包头信息 ), 但是其方便调试特性可以方便做调试从而所见即所得( 包括现在本地建立 html 文件就能编写客户端, 并且 PostMan 之类调试客户端也支持这种方式 )

所以采用 H5 作为游戏服务端入门来说相对简单, 不用担心过多繁重概念带来的心智负担就可以出成品( 当然后续进阶则需要从底层开始了解 ).

对于服务端入门来说最好采用 WebSocket + JSON 来负载游戏传输, 虽然业界普遍采用 TCP/UDP + GoogleProtoBuf 二进制传输, 但是二进制序列化数据排错和字节位封包/大小端网络序列处理的困难都是直接劝退掉大部分新人开发.

这里采用 Java11+SpringBoot3 开发, 因为 Java 的库比较成熟可以直接引入调用, Spring 内部也集成容器对象方便管理.

这里先引入 ActorWebsocket 库处理:


<dependencies>
    <!-- Websocket 库 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    <!-- Actor 库 -->
    <dependency>
        <groupId>com.meteorcat.spring.boot</groupId>
        <artifactId>actor-spring-boot-starter</artifactId>
        <version>1.0.15-SNAPSHOT</version>
    </dependency>
</dependencies>

这里 Actor 自己编写集成的工具库, 具体可以 查看代码库

这里的 Actor 工具库需要自己导出版本库并 mvn 安装到本地代码库

生成配置

先编写好 Actor 加载配置:

import com.meteorcat.spring.boot.starter.ActorConfigurer;
import com.meteorcat.spring.boot.starter.ActorEventContainer;
import com.meteorcat.spring.boot.starter.ActorEventMonitor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;


/**
 * Actor 配置类
 */
@Configuration
public class ActorConfig {

    /**
     * Spring运行时
     */
    private final ApplicationContext context;


    /**
     * 构造方法引入, 新版本不再采用 @Autowired 或者 @Resource 注解
     *
     * @param context 运行时
     */
    public ActorConfig(ApplicationContext context) {
        this.context = context;
    }


    /**
     * 配置 Actor 加载
     *
     * @return ActorEventContainer
     */
    @Bean(initMethod = "init", destroyMethod = "destroy")
    public ActorEventContainer searchActor() {
        ActorEventContainer container = new ActorEventContainer(new ActorEventMonitor(5));
        container.setIdleThreads(1); // 预留线程处理

        // 加载目前编写逻辑 Actor
        Map<String, ActorConfigurer> classes = context.getBeansOfType(ActorConfigurer.class);
        for (Map.Entry<String, ActorConfigurer> clazz : classes.entrySet()) {
            ActorConfigurer configurer = clazz.getValue();
            for (Integer value : configurer.values()) {
                container.put(value, configurer);
            }
        }
        return container;
    }
}

之后定义 Websocket 的配置:

import com.meteorcat.mix.WebsocketApplication;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.NonNull;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;

/**
 * Websocket加载配置
 */
@Configuration
@EnableWebSocket
public class WebsocketConfig implements WebSocketConfigurer {

    /**
     * 访问路径
     */
    @Value("${websocket.server.path:/}")
    private String serverPath;

    /**
     * 传输数据缓存大小
     */
    @Value("${websocket.buffer.max.size:8192}")
    private Integer bufferMaxSize;


    /**
     * 待机主动中断时间
     */
    @Value("${websocket.idle.timeout:600000}")
    private Long idleTimeout;


    /**
     * 允许跨域地址
     */
    @Value("${websocket.allowed.origins:*}")
    private String allowOrigins;

    /**
     * 默认加载的服务
     */
    private final WebsocketApplication handler;


    /**
     * 构筑方法
     * @param handler Websocket 实例
     */
    public WebsocketConfig(WebsocketApplication handler) {
        this.handler = handler;
    }

    /**
     * 注册运行时句柄
     *
     * @param registry 注册器
     */
    @Override
    public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
        if (handler == null) {
            throw new RuntimeException("failed by WebSocketHandler: WebSocketHandler");
        }
        registry.addHandler(handler, serverPath).setAllowedOrigins(allowOrigins);
    }


    /**
     * 全局 Servlet 的配置容器
     *
     * @return ServletServerContainerFactoryBean
     */
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(bufferMaxSize);
        container.setMaxBinaryMessageBufferSize(bufferMaxSize);
        container.setMaxSessionIdleTimeout(idleTimeout);
        return container;
    }
}

最后编写 Websocket 的挂载实例:

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.meteorcat.spring.boot.starter.ActorConfigurer;
import com.meteorcat.spring.boot.starter.ActorEventContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;

/**
 * 挂载 Websocket 服务
 * TextWebSocketHandler 是采用文本流形式处理
 * BinaryWebSocketHandler 是采用二进制流形式处理
 */
@Order
@Component
public class WebsocketApplication extends TextWebSocketHandler {


    /**
     * 用于推送的网络数据帧 - 这里后续会移出先处理成这样
     *
     * @param session 句柄
     * @param message 数据
     */
    public record MessageFrame(WebSocketSession session, TextMessage message) {

    }


    /**
     * 日志句柄
     */
    Logger logger = LoggerFactory.getLogger(WebsocketApplication.class);

    /**
     * Actor 运行时
     */
    final ActorEventContainer container;


    /**
     * 获取 Actor 运行时
     *
     * @return ActorEventContainer
     */
    public ActorEventContainer getContainer() {
        return container;
    }

    /**
     * 玩家目前的登录状态 - 采用线程安全
     */
    final Map<WebSocketSession, Integer> online = new ConcurrentHashMap<>();

    /**
     * 玩家登录 UID 标识 - 采用线程安全
     */
    final Map<WebSocketSession, Long> users = new ConcurrentHashMap<>();


    /**
     * Json 解析器
     */
    final ObjectMapper mapper = new ObjectMapper();


    /**
     * 消息队列 - 数据采用线程安全处理
     */
    final Queue<MessageFrame> messages = new ConcurrentLinkedDeque<>();


    /**
     * 切换玩家会话状态
     *
     * @param session 会话
     * @param state   状态
     */
    public void setState(WebSocketSession session, Integer state) {
        if (online.containsKey(session)) {
            online.put(session, state);
        }
    }

    /**
     * 设置玩家ID
     *
     * @param session 会话
     * @param uid     Long
     */
    public void setUid(WebSocketSession session, Long uid) {
        if (users.containsValue(uid)) {
            for (Map.Entry<WebSocketSession, Long> user : users.entrySet()) {
                if (user.getValue().equals(uid)) {
                    users.remove(user.getKey());
                    break;
                }
            }
        }
        users.put(session, uid);
    }


    /**
     * 获取玩家ID
     *
     * @param session 会话
     * @return Uid
     */
    public Long getUid(WebSocketSession session) {
        return users.get(session);
    }


    /**
     * 构造方法
     *
     * @param container Actor events
     */
    public WebsocketApplication(ActorEventContainer container) {
        this.container = container;
        // 启动时候线程池追加定时服务
        // 当数据队列为空的时候可以休眠下, 当 push 功能可以考虑继续唤醒定时任务
        this.container.scheduleAtFixedRate(() -> {
            // 获取消息队列数据
            MessageFrame frame = messages.poll();
            if (frame != null && frame.session.isOpen()) {
                try {
                    // 推送数据
                    frame.session.sendMessage(frame.message);
                } catch (IOException e) {
                    logger.warn(e.getMessage());
                    throw new RuntimeException(e);
                }
            }
        }, 0, 1000L, TimeUnit.MILLISECONDS);
    }


    /**
     * Established
     *
     * @param session handler
     * @throws Exception Error
     */
    @Override
    public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
        logger.debug("Established = {}", session);
        online.put(session, 0);
    }


    /**
     * Closed
     *
     * @param session handler
     * @param status  close state
     * @throws Exception Error
     */
    @Override
    public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus status) throws Exception {
        logger.debug("Close = {},{}", session, status);
        online.remove(session);
    }


    /**
     * 采用JSON数据接收 { "value": 100, args:{ data.... } }
     *
     * @param session handler
     * @param message text
     * @throws Exception Error
     */
    @Override
    protected void handleTextMessage(@NonNull WebSocketSession session, @NonNull TextMessage message) throws Exception {
        if (message.getPayloadLength() <= 0) {
            return;
        }

        // json: { "value": 100, args:{ data.... } }
        JsonNode json = mapper.readTree(message.asBytes());
        if (!json.isObject()) {
            return;
        }

        // json.value
        JsonNode valueNode = json.get("value");
        if (valueNode == null || !valueNode.isInt()) {
            return;
        }

        // container value
        Integer value = valueNode.intValue();
        ActorConfigurer configurer = container.get(value);
        if (configurer == null) {
            return;
        }

        // json.args
        JsonNode args = json.get("args");
        args = args.isObject() ? args : null;

        // forward configurer
        configurer.invoke(value, online.get(session), this, session, args);
    }


    /**
     * 推送消息给队列处理
     *
     * @param session 会话
     * @param value   响应协议值
     * @param args    响应JSON
     * @throws IOException Error
     */
    public void push(WebSocketSession session, Integer value, Map<String, Object> args) throws IOException {
        Map<String, Object> response = new HashMap<>() {{
            put("value", value);
            put("args", args);
        }};
        push(session, mapper.writeValueAsString(response));
    }

    /**
     * 推送消息给队列处理
     *
     * @param session 会话
     * @param text    数据
     */
    public void push(WebSocketSession session, String text) {
        messages.add(new MessageFrame(session, new TextMessage(text)));
    }
}

目前这里就差不多完成网络传输服务, 之后就是业务代码编写处理, 首当其冲就是每个网络服务先处理的 回显 服务.

回显服务

所有网络服务端每次测试都会编写的 echo(回显) 服务, 把客户端的推送的数据返回过去:

import com.fasterxml.jackson.databind.JsonNode;
import com.meteorcat.mix.WebsocketApplication;
import com.meteorcat.spring.boot.starter.ActorConfigurer;
import com.meteorcat.spring.boot.starter.ActorMapping;
import com.meteorcat.spring.boot.starter.EnableActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.WebSocketSession;

import java.io.IOException;

/**
 * 加载单元测试服务
 * EnableActor 声明这个类是挂载内存的实例
 * ActorConfigurer 是必须要集成的 Actor 集成类
 * 内部集成线程安全处理和消息队列, 也允许被 Container 扫描出这个类并且标识为 Actor
 */
@EnableActor(owner = TestLogic.class)
public class TestLogic extends ActorConfigurer {

    /**
     * 日志对象
     */
    final Logger logger = LoggerFactory.getLogger(TestLogic.class);

    /**
     * 服务启动时候加载初始化, ActorConfigurer 的初始化回调
     * 一般可以启动服务时候加载 xlsx/csv 表到类当中或者数据库加载游戏参数
     *
     * @throws Exception Error
     */
    @Override
    public void init() throws Exception {
        super.init();
        logger.info("测试单元启动");
    }

    /**
     * 服务退出时候退出运行, ActorConfigurer 的退出回调
     * 一般都是整个进程退出会调用的, 用于退出之前保存写入那些用户数据
     *
     * @throws Exception Error
     */
    @Override
    public void destroy() throws Exception {
        super.destroy();
        logger.info("测试单元退出");
    }

    /**
     * 回显服务, 这里就是自己编写业务代码
     * ActorMapping 就是声明 Actor 的入口, value 必须是全局唯一, state 则是允许访问的权限.
     * 默认 state 空代表允许被所有客户端直接访问
     * Example: { "value":10,"args":{ "text":"hello.world" } }
     * 
     * @param runtime 运行时
     * @param session 会话
     * @param args    参数
     */
    @ActorMapping(value = 10)
    public void echo(WebsocketApplication runtime, WebSocketSession session, JsonNode args) throws IOException {
        // 现在不需要在 Actor 内部处理推送,只需要移交消息队列推送
        if (args != null) {
            runtime.push(session, args.toString());
        }
    }
}

Ok, 通过 Postman 就能测试提交回显服务:

// 推送数据内容
{ "value":10,"args":{ "text":"hello.world" } }

目前已经搭建出基础网络服务端功能, 后续就是以下功能:

  • 登录认证, 登录授权/心跳包维持链接
  • 聊天频道, 世界频道消息等
  • 玩家模块, 加载登录玩家数据

其中 登录认证玩家模块 是必须的, 聊天频道 在目前国内比较受限所以尽量避免开发, 如果必须则要切记一定要开发黑名单关键字系统.

参考资料

网上对于游戏服务端资料相对比较少, 所以很多人视游戏服务端为洪水猛兽; 其实主要多学多问多看底层代码, 不要怕犯错而要去总结错误和思考错误基本上能够自己找出答案.

就像是这个教程采用 Websocket+JSON 内部传输性能问题, 后续如果 Websocket 连接每次都要走带来大量HTTP头可以直接采用 Netty 这种 TCP/UDP 来传输数据; 同时包括 Text+Json 在传输过程多余的逗号/花括号问题, 可以采用 Bytes+Protobuf 序列化二进制压缩数据转发, 没什么问题是无法完全解决的.