MeteorCat / SpringBoot+WebSocket构建Actor

Created Fri, 05 Jan 2024 20:40:38 +0800 Modified Wed, 29 Oct 2025 23:25:00 +0800
6595 Words

SpringBoot+WebSocket构建 Actor

Actor 是种业务驱动的设计模式, 将业务封装成各自业务代码私有领域封装在内部而不对外暴露执行; 具体就是将业务功能类任务挂载在内存之中, 等待被唤起调用触发通知内部回调触发.

skyent 作为游戏服务端内部就是采用 actor 模式处理功能业务.

这里基于 JavaSpringBoot 内部工具, 可以实现出简单的 Actor 模式, 但是首先必须要对以下内容有所了解:

  • 泛型与反射
  • 自定义注解

注意虽然这里是 WebSocket+Actor, 但是这两种是相互独立出来不会耦合在一起, 也就是 WebSocket 仅仅作为组件, 后续可以集成 netty 来用 tcp/udp 替换传输层.

这里参考下 skynet 挂载服务流程, 查看 actor 是怎么挂载上去的:

local skynet = require("skynet")

--- 声明出 Actor 内部指令
local CMD = {}

--- 封装 Actor.init() 方法
function CMD.init()
    skynet.error("initialization")
end

--- 启用挂载出初始化服务
skynet.start(function()
    -- 注册 Actor 内部指令
    skynet.dispatch("lua", function(session, address, cmd, ...)
        -- 唤醒回调内部指令
        local f = CMD[cmd]
        if f then
            -- 确认指令存在就转发调用
            f(...)
        end
    end)
end)

上面可以看出注册 Actor 方法, 而在 Java 当中实现相当于比较简单点, 但是需要转变思路:

  1. 生成 @EnableActor 用于将某个类对象注册成挂载内存的 Actor 唯一对象, 作用对象在 class 当中.
  2. 生成 @ActorMapping 用于将启动 Actor 对象调用内部方法声明, 用于捆绑协议功能作用对象在 method 当中.
  3. ActorConfigurer 用于实现被继承实现启动时候扫描加载出 Actor 对象.
import org.springframework.stereotype.Component;

import java.lang.annotation.*;

/**
 * 启用 Actor 挂载注解
 * 这里采用 Spring 内置的 @Component 合并该注解
 * 因为继承了 @Component 注解所以该对象默认会在启动时候在程序生成实例
 */
@Inherited
@Documented
@Component
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface EnableActor {

    /**
     * 声明实现的所有者, 用于添加注解声明集成相关配置扩展类
     * 比如: @EnableActor(owner = TestLogic.class)
     *      public class TestLogic extends ActorConfigurer
     * 以上就是具体测试调用
     * @return ? extends ActorConfigurer
     */
    Class<? extends ActorConfigurer> owner();
}

之后就是绑定类内部方法的注解 @ActorMapping, 给类内部方法挂载:



import java.lang.annotation.*;

/**
 * 启用 Actor.* 相关注册功能
 * 这里绑定 Integer = Method 做绑定映射, 让外部可以直接通过 int 类型访问调用到 Actor 方法
 * 默认 -1 代表了不被外部暴露的方法, 用于进程内调用: 建议小于 0 是只能是被进程内部调用, 而大于 0 则是直接暴露给客户端调用
 */
@Inherited
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface ActorMapping {

    /**
     * 声明Actor请求值
     *
     * @return int
     */
    int value() default -1;
}

这里就是最基础的两个注解, 用于分别给类和方法附加上去声明, 这里可以标识写个测试逻辑:

/**
 * 测试 Actor 对象, 这里 ActorConfigurer 就是集成大量内置的调用工具集
 */
@EnableActor(owner = TestLogic.class)
public class TestLogic extends ActorConfigurer {


    /**
     * 挂载 Actor 方法
     * @param msg 消息内容
     */
    @ActorMapping(value = 1000)
    public void print(String msg) {
        System.out.printf("Message = %s |Thread = %s\r\n", msg, Thread.currentThread().getId());
    }
}

这里就是相当简单 Actor 挂载方法, 只传入了 String 截获出信息, 关键就是 EnableActor.owner 对象做强限制要求必须实现 ActorConfigurer, 这个类就是内部集成所有对外装载功能用于被外部获取, 这里先编写最基础的功能:

import org.springframework.lang.Nullable;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;

/**
 * Actor 加载类
 */
public abstract class ActorConfigurer {


    /**
     * 采用红黑树方式结构保存 Actor 所有调用方法
     */
    private final Map<Integer, ActorFuture> futures = new TreeMap<>();

    /**
     * 记录内部全部指令码对象
     */
    @Nullable
    private List<Integer> opValues;


    /**
     * 初始化检索类所有入口方法
     */
    public void load() throws ClassNotFoundException {
        // 检索实现了启用Actor注解
        Class<? extends ActorConfigurer> configurer = this.getClass();
        EnableActor enableActor = configurer.getAnnotation(EnableActor.class);
        if (enableActor == null) {
            throw new ClassNotFoundException("Not Implemented @EnableActor");
        }

        // 初始化所有记录操作码
        Method[] methods = configurer.getMethods();
        if (opValues == null) {
            opValues = new ArrayList<>(methods.length);
        }

        // 检索出所有Actor入口
        for (Method method : methods) {
            ActorMapping mapping = method.getAnnotation(ActorMapping.class);
            if (mapping != null) {
                Integer op = mapping.value();
                opValues.add(op);
                futures.put(op, new ActorFuture(op, this, method));
            }
        }
    }


    /**
     * 清除目前挂载 Actor 入口
     *
     * @param key 操作码
     */
    public void unload(Integer key) {
        if (futures.containsKey(key)) {
            futures.remove(key);
            if (opValues != null) {
                opValues.remove(key);
            }
        }
    }


    /**
     * 唤醒代码块
     *
     * @param op   操作码
     * @param args 操作参数
     * @throws InvocationTargetException 类目标调用异常
     * @throws IllegalAccessException    没有访问权限的异常
     */
    public void execute(Integer op, Object... args) throws InvocationTargetException, IllegalAccessException {
        ActorFuture future = futures.get(op);
        if (future != null) {
            future.execute(args);
        }
    }


    /**
     * 获取目前装载的所有操作码
     *
     * @return Optional<List < Integer>>
     */
    public Optional<List<Integer>> getOpValues() {
        return opValues == null ? Optional.empty() : Optional.of(opValues);
    }


    /**
     * 获取目前所有的代码块
     *
     * @return Map<Integer, ActorFuture>
     */
    public Map<Integer, ActorFuture> getFutures() {
        return this.futures;
    }
}

这里 ActorFuture 则是将类内部代码块数据封装的类:

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;


/**
 * Actor 封装执行代码段
 * @param op
 * @param instance
 * @param method
 */
public record ActorFuture(Integer op, Object instance, Method method) {


    /**
     * 唤醒执行方法
     *
     * @param args 传入参数
     * @throws InvocationTargetException 类目标调用异常
     * @throws IllegalAccessException    没有访问权限的异常
     */
    public void execute(Object... args) throws InvocationTargetException, IllegalAccessException {
        method.invoke(instance, args);
    }

}

至此目前已经实现了简单 Actor 装载对象, 对于 Actor 的内部封装已经差不多完成, 之后就是需要外部装载检索出所有对象并且装载到内存之中:

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;

/**
 * 编写测试 Actor 加载类
 */
@Configuration
public class ActorConfig {

    private final ApplicationContext context;


    public ActorConfig(ApplicationContext context) {
        this.context = context;
    }


    /**
     * 测试加载所有 Actor
     *
     * @return String
     */
    @Bean
    public Map<Integer, ActorConfigurer> searchActor() throws ClassNotFoundException, InvocationTargetException, IllegalAccessException {
        Map<Integer, ActorConfigurer> configurers = new TreeMap<>();
        Map<String, ActorConfigurer> classes = context.getBeansOfType(ActorConfigurer.class);
        for (Map.Entry<String, ActorConfigurer> clazz : classes.entrySet()) {
            System.out.printf("Load = %s\r\n", clazz.getKey());

            // 初始化所有的 Actor
            ActorConfigurer configurer = clazz.getValue();
            configurer.load();

            // 加载所有操作码
            Optional<List<Integer>> ops = configurer.getOpValues();
            if (ops.isPresent()) {
                for (Integer op : ops.get()) {
                    configurers.put(op, configurer);
                }
            }

            // 测试调用之前内部指令方法
            configurer.execute(1000, "Hello.World");
        }

        System.out.printf("Actors = %s\r\n", configurers);
        return configurers;
    }
}

之后就能看到指令转发到 testLogic 对象的 1000 操作码方法 之中, 这时候就已经实现自己的 单线程 Actor 模型 功能了.

注: 作为单线程性能无法调用现代CPU多线程的运算能力, 也就会出现当某个函数执行时间过长的时候就会出现全局卡顿等待函数执行完成才会跑完后续业务逻辑, 思考多人操作同个逻辑的时候会造成什么情况?

挂载 WebSocket 服务

按照其他之前 教程 挂载服务, 这里编写个 JSON 功能来设计转发出到 Actor 功能, 之前 Map<Integer, ActorConfigurer> 需要再次封装成 ActorContainer 对象做内部数组

/**
 * Actor全局容器用于后续扩展, 可以考虑采用 HashMap 或者 TreeMap, 这里采用 HashMap
 * 后续扩展容器只需要修改该类即可, 不需要在外部处理, 这里内部先不做扩展实现
 */
public class ActorContainer extends HashMap<Integer, ActorConfigurer> { }

之后处理下全局挂载的对象 ActorConfig :

/**
 * 编写测试 Actor 加载类
 */
@Configuration
public class ActorConfig {

    private final ApplicationContext context;


    public ActorConfig(ApplicationContext context) {
        this.context = context;
    }


    /**
     * 全局挂载的 Actor 对象
     *
     * @return String
     */
    @Bean
    public ActorContainer searchActor() throws ClassNotFoundException, InvocationTargetException, IllegalAccessException {
        ActorContainer configurers = new ActorContainer();
        Map<String, ActorConfigurer> classes = context.getBeansOfType(ActorConfigurer.class);
        for (Map.Entry<String, ActorConfigurer> clazz : classes.entrySet()) {
            System.out.printf("Load = %s\r\n", clazz.getKey());

            // 初始化所有的 Actor
            ActorConfigurer configurer = clazz.getValue();
            configurer.load();

            // 加载所有操作码
            Optional<List<Integer>> ops = configurer.getOpValues();
            if (ops.isPresent()) {
                for (Integer op : ops.get()) {
                    configurers.put(op, configurer);
                }
            }
        }
        return configurers;
    }
}

最后挂载上 WebSocket 处理转发功能, 先利用 JSON 来接收数据转发到内部:


/**
 * 全局 WebSocket 挂载服务
 * BinaryWebSocketHandler 代表了二进制传输数据
 * TextWebSocketHandler 代表了文本流传输数据
 */
@Order
@Component
public class WebSocketApplication extends TextWebSocketHandler {

    /**
     * 日志库
     */
    Logger logger = LoggerFactory.getLogger(WebSocketApplication.class);


    /**
     * JSON解析库
     */
    ObjectMapper mapper = new ObjectMapper();


    /**
     * 全局 Actor 容器
     */
    final ActorContainer container;

    /**
     * 构造方法
     *
     * @param container 系统注入容器
     */
    public WebSocketApplication(ActorContainer container) {
        this.container = container;
    }


    /**
     * Established - 会话连接回调
     *
     * @param session 会话句柄
     * @throws Exception 异常错误
     */
    @Override
    public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
        logger.debug("Established = {}", session);
    }


    /**
     * Closed - 会话连接关闭回调
     *
     * @param session 会话句柄
     * @param status  关闭状态
     * @throws Exception 异常错误
     */
    @Override
    public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus status) throws Exception {
        logger.debug("Close = {},{}", session, status);
    }

    /**
     * Error - 会话错误异常回调
     *
     * @param session   会话句柄
     * @param exception 回调错误
     * @throws Exception 异常错误
     */
    @Override
    public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {
        logger.debug("Error = {},{}", session, exception.toString());
    }

    /**
     * Text - 接收到文本数据回调
     * 如果二进制数据则是 handleBinaryMessage 回调
     *
     * @param session Websocket
     * @param message data
     * @throws Exception Error
     */
    @Override
    protected void handleTextMessage(@NonNull WebSocketSession session, @NonNull TextMessage message) throws Exception {
        logger.debug("Message = {},{}", session, message.getPayload());

        // 获取提交内容
        String payload = message.getPayload();
        if (payload.isBlank()) return;

        // 解析内容要求格式为 { op: int, args: object } 传递
        JsonNode json = mapper.readTree(payload);
        if (!json.isObject()) {
            logger.error("传递数据格式错误");
            return;
        }
        ;

        // 确认数据格式存在
        if (!json.has("op") || !json.has("args")) {
            logger.error("传递数据格式错误");
            return;
        }
        ;

        // 判断传递的格式类型正确
        JsonNode opNode = json.get("op");
        JsonNode argsNode = json.get("args");
        if (!opNode.isInt() || !argsNode.isObject()) {
            logger.error("传递数据格式错误");
            return;
        }

        // 转发给 Actor
        Integer op = opNode.asInt();
        ActorConfigurer configurer = container.get(op);
        if (configurer == null) {
            logger.error("找不到 Actor 对象");
            return;
        }

        // 转发调用
        configurer.execute(op, session, argsNode);
    }
}

之后追加新的 Actor 功能来处理对应的 WebSocket 请求:

/**
 * 测试 Actor 对象, 这里 ActorConfigurer 就是集成大量内置的调用工具集
 */
@EnableActor(owner = TestLogic.class)
public class TestLogic extends ActorConfigurer {

    final Logger logger = LoggerFactory.getLogger(TestLogic.class);

    /**
     * 挂载 Actor 方法
     *
     * @param msg 消息内容
     */
    @ActorMapping(value = 1000)
    public void print(String msg) {
        logger.warn("Message = {} |Thread = {}", msg, Thread.currentThread().getId());
    }


    /**
     * 打印 WebSocket 转发的数据
     *
     * @param session 会话句柄
     * @param args    参数对象
     */
    @ActorMapping(value = 1001)
    public void printWebsocket(@NonNull WebSocketSession session, JsonNode args) {
        logger.warn("WebSocket = {} |Message = {}", session, args);
    }
}

启动程序之后利用网页编写或者 Postman 推送以下数据就能直接查看到数据转发是否成功, 确认成功就代表 Actor 数据已经转发:

{ "op": 1001, "args": { "username":"MeteorCat","password": "MeteorCat" } }

推送确认没问题之后, 属于自己的 Actor 库已经完成, 后续就是怎么和多线程池集合让多线程拆分多个任务并行执行, 这里先查看 skynet 是如何处理模块的:

struct skynet_module {
	const char * name;
	void * module;
	skynet_dl_create create;
	skynet_dl_init init;
	skynet_dl_release release;
	skynet_dl_signal signal;
};

struct modules {
	int count;
	struct spinlock lock;
	const char * path;
	struct skynet_module m[MAX_MODULE_TYPE];
};

这里 skynet_module 相当于之前生成 @EnableActor 全局实例化类, 而 modules 则是 Actor 模块管理器, 也就是 ActorContainer 容器, 总体上无论其他语言差不多这样架构就行了.

skynet_dl_create/skynet_dl_init/skynet_dl_release/skynet_dl_signal 基本上都是回调, 用于触发创建/初始化/唤醒等操作, Java 内部节约时间都是采用构造方法, 具体可以手动编写方法之后初始化/退出时候回调.

线程池处理

Java2x 版本之后带来了虚拟线程来做任务并发执行, 但是在这之前都是采用线程池方式来处理, 所以这里采用更加通用的自定义线程池处理. 这里先说明怎么构建:

  1. 需要生成 2+N 以上的线程池进行监听 Actor, 2+N代表线程数且按照现代服务器最低配置2核心4线程, 2个为Actor业务线程, 1个网络线程和1个延时调度线程
  2. Actor 内部必须采用 @EnableActor 注解声明挂起全局业务类
  3. @EnableActor + @ActorMapping 生成服务入口, 等待被调用
  4. 服务入口不能被直接调用, 而是必须采用各自声明类的 Channel 做内部互斥锁调用
  5. 每声明 @EnableActor 代表 Actor 内部自己状态维护, 内部都有自己的消息队列调用方式是队列推送处理而非直接内部调用方法
  6. 可以把每个 @EnableActor 注解修饰类当作 ‘领主城堡’, 与外部通讯通过内部带有读写互斥锁的队列就是 ‘外交官’
  7. 2+N的线程池负责不断轮询各自队列内部对象来让其内部被唤醒调用, 比如不断判断 @EnableActor 内部消息队列是否为空, 不为空需要识别读写锁当中的 ‘读锁’ 从而 trylock(), 不能获取代表当前有任务等待下次唤醒.
  8. 这里能够看出所谓多线程实际上就是把多个 Actor 类每次调用按照线程安全方式不断被单个线程调用, 实际上就是为了让多个 Actor 平均分布维护各自的线程任务, 让繁重的多线程任务可以以单线程方式来处理.
  9. 这种 Actor 方式可以直接抽象出各自不同服务, 如 BagActor(背包), BattleActor(战斗), BuildActor(建筑), WordActor(大世界), ChatActor(全服聊天) 等, 把服务分流到不同线程各自不同让其维护自己服务.

这里简单处理个线程池处理, 当然这里面实现是有问题的, 这里仅仅作为多线程处理方式, 修改追加线程调用 ActorContainer 容器对象:


/**
 * Actor全局容器用于后续扩展, 可以考虑采用 HashMap 或者 TreeMap, 这里采用 HashMap
 * 后续扩展容器只需要修改该类即可, 不需要在外部处理, 这里内部先不做扩展实现
 */
public class ActorContainer extends HashMap<Integer, ActorConfigurer> {

    /**
     * 调用线程池
     */
    @Nullable
    final ExecutorService executor;

    public ActorContainer(@Nullable ExecutorService executor) {
        this.executor = executor;
    }


    /**
     * 采用线程池调用
     *
     * @param op   操作码
     * @param args 参数
     * @throws InvocationTargetException 类目标调用异常
     * @throws IllegalAccessException    没有访问权限的异常
     */
    public void execute(Integer op, Object... args) throws InvocationTargetException, IllegalAccessException {
        if (containsKey(op)) {
            ActorConfigurer configurer = get(op);
            if (executor == null) {
                configurer.execute(op, args);
            } else {
                executor.submit(() -> {
                    try {
                        configurer.execute(op, args);
                    } catch (InvocationTargetException | IllegalAccessException e) {
                        throw new RuntimeException(e);
                    }
                });
            }
        }
    }
}

之后修改单进程调用转为多线程唤起服务:

/**
 * 全局 WebSocket 挂载服务
 * BinaryWebSocketHandler 代表了二进制传输数据
 * TextWebSocketHandler 代表了文本流传输数据
 */
@Order
@Component
public class WebSocketApplication extends TextWebSocketHandler {
    
    /// 其他代码略

    /**
     * Text - 接收到文本数据回调
     * 如果二进制数据则是 handleBinaryMessage 回调
     *
     * @param session Websocket
     * @param message data
     * @throws Exception Error
     */
    @Override
    protected void handleTextMessage(@NonNull WebSocketSession session, @NonNull TextMessage message) throws Exception {
        // 添加线程打印
        logger.debug("Message = {},{}| Thread = {}", session, message.getPayload(),Thread.currentThread().getId());

        // 获取提交内容
        String payload = message.getPayload();
        if (payload.isBlank()) return;

        // 解析内容要求格式为 { op: int, args: object } 传递
        JsonNode json = mapper.readTree(payload);
        if (!json.isObject()) {
            logger.error("传递数据格式错误");
            return;
        }
        ;

        // 确认数据格式存在
        if (!json.has("op") || !json.has("args")) {
            logger.error("传递数据格式错误");
            return;
        }
        ;

        // 判断传递的格式类型正确
        JsonNode opNode = json.get("op");
        JsonNode argsNode = json.get("args");
        if (!opNode.isInt() || !argsNode.isObject()) {
            logger.error("传递数据格式错误");
            return;
        }

        // 转发给 Actor
//        Integer op = opNode.asInt();
//        ActorConfigurer configurer = container.get(op);
//        if (configurer == null) {
//            logger.error("找不到 Actor 对象");
//            return;
//        }
//
//        // 转发调用
//        configurer.execute(op, session, argsNode);


        // 修改成多线程调用
        Integer op = opNode.asInt();
        container.execute(op, session, argsNode);
    }
}

重新调用推送客户端发现确实被其他业务线程唤起, 而且完美执行了这些任务但请注意 目前只是单人调试, 在多人情况会出现很大问题! 千万不要把代码用于正式环境!

在单线程服务变化为多线程驱动之后, 随之而来会带来最大问题: 消息功能调用乱序, 怎么保证用户执行有序进行, 后续展开处理怎么实现多个业务线程处理.

消息队列

目前看起来功能已经 “比较” 完善, 但真的是这样吗?

假设以下最有可能场景:

  • 玩家正在战斗等待结算金币, 然后突然另外期活动金币奖励推送过来, 那么这时候金币扣增能够保证一致吗?
  • RPG即时制战斗时候玩家A攻击玩家B发动致命一击, 此时玩家B立即回血, 怎么保证玩家B能够立刻回血避免被战斗扣血击杀?
  • 其他要求时效顺序样例……

这都是游戏服务端常见样例, 从而引出多线程致命问题: 如何保证消息有序化?

Java 当中有 java.util.concurrent.Queue.ConcurrentLinkedQueue, 使用一个无锁的环形缓冲区来存储元素,从而可以实现高性能的并发访问, 但是不具有灵活性所以没办法适应目前过程.

为什么说线程安全的 Queue 对象没办法适应 Actor 业务? 因为数组只能保证取出来数据是线程安全的, 但是具体 Actor 不止要数据线程安全还要处理过程线程安全( 总不能两者取出来 HP = 10, 扣除血量都是拿同个血量扣除吧 ); 注意这里需要知道怎么把 Object... args 可变参数封装成参数队列然后推送到内部调用.

在传统语言多线程之中都是采用 原子量线程锁 处理数据同步, 这里一般采用线程锁当中的 读写锁 来管理队列.

可能你会问为什么不直接在 execute 唤起 Actor 服务器之前就加锁保持线程调用, 如:

// 准备推给多线程任务
executor.submit(()->{
    // 加锁
    lock.lock();
    // 转发任务线程执行
    container.execute();
});

这里是有很大 bug 的, 假设目前玩家节日奖励推送 100 金币, 而正好战斗结算正好 100 金币推送到同时玩家资源模块, 线程A 正好 加锁 在公式计算获取总和 +100, 线程B 检测到锁已经加锁的时候, 这时候 线程B 任务因为 线程A 正在执行加锁而被跳过; 这里就是因为任务独占了锁导致后续任务直接被跳过, 所以为了保证消息准确到达和执行才需要消息队列在多线程当中负责其他线程接收队列数据.

这里追加个事件对线接收传递过来的数据( ActorEvent ):

/**
 * 推送事件数据, 后续扩展可以放置于此
 *
 * @param op   操作码
 * @param data 推送数据
 */
public record ActorEvent(Integer op, Object[] data) { }

之后改造下继承 ActorConfigurer 类让其支持多线程消息队列处理:

/**
 * Actor服务
 * 支持多线程消息队列推送
 */
public abstract class ActorConfigurer {
    /// 其他略
    
    /**
     * 消息链表队列
     */
    private final Queue<ActorEvent> events = new LinkedList<>();


    /**
     * 读写锁
     */
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    /**
     * 读取锁
     */
    private final Lock readLock = lock.readLock();

    /**
     * 写入锁
     */
    private final Lock writeLock = lock.writeLock();


    /**
     * 推送事件
     * @param op 特征码
     * @param args 参数
     */
    public void push(Integer op, Object... args) {
        writeLock.lock(); // 写入锁开启
        Object[] data = new Object[args.length];
        System.arraycopy(args, 0, data, 0, args.length);
        events.add(new ActorEvent(op, data));
        writeLock.unlock(); // 写入锁让出
    }

    /**
     * 唤起消息队列执行
     *
     * @throws InvocationTargetException 类目标调用异常
     * @throws IllegalAccessException 没有访问权限的异常
     */
    public void disposer() throws InvocationTargetException, IllegalAccessException {
        readLock.lock(); // 读取锁开启
        if (!events.isEmpty()) {
            // 提取出目前需要执行的事件
            ActorEvent event = events.poll();
            ActorFuture future = futures.get(event.op());
            if (future != null) {
                future.execute(event.data());
            }
        }
        readLock.unlock(); // 读取锁让出
    }
}

这里之所以用读写锁而不用单个互斥锁就是考虑到自调用情况( ActorMapping 当中再次调用 EnableActor 内部入口 ), 如果用单个锁会消息队列数据重复入列导致死锁情况.

这里修改完之后就是以线程池驱动 disposer 方法, 这里需要在容器类 ActorContainer 追加方法:

/**
 * Actor全局容器用于后续扩展, 可以考虑采用 HashMap 或者 TreeMap, 这里采用 HashMap
 * 现在已经可以扩展内部多线程调用方法了
 */
public class ActorContainer extends HashMap<Integer, ActorConfigurer> {
    /// 其他略

    // 运行状态
    private boolean isRunning = true;


    /**
     * 驱动线程池调用内部方法
     */
    public void disposer() {
        if (executor != null) {
            // 推送多线程任务, 这里延迟不用让CPU长期处于唤醒死循环而定时休眠唤醒
            executor.submit(() -> {
                while (isRunning){
                    // 这里遍历采用简单遍历, 可以细化处理出自己的遍历线程方法
                    // 考虑到按照 Actor 功能入口越多, Map 内部会相同 Actor 会越多, 越多入口则命中线程唤醒越容易
                    // 也就是如果单个 Actor 内部 Mapping 越多, 那么在线程池被调用次数越频繁
                    forEach((op, configurer) -> {
                        try {
                            configurer.disposer();
                        } catch (InvocationTargetException | IllegalAccessException e) {
                            // 运行态不需要直接中断运行而是直接打印错误
                            // 所有逻辑 Actor 都不应该导致干扰其他进程崩溃
                            e.printStackTrace();
                        }
                    });
                }
            });
        }
    }

    /**
     * 停止运行态
     */
    public void stop(){
        isRunning = false;
    }
}

需要在配置 ActorConfig 配置挂起多线程服务:

/**
 * 编写测试 Actor 加载类
 */
@Configuration
public class ActorConfig {
    /// 其他略
    
    /**
     * 全局挂载的 Actor 对象
     *
     * @return String
     */
    @Bean
    public ActorContainer searchActor() throws ClassNotFoundException, InvocationTargetException, IllegalAccessException {
        ActorContainer configurers = new ActorContainer(Executors.newFixedThreadPool(2));
        Map<String, ActorConfigurer> classes = context.getBeansOfType(ActorConfigurer.class);
        for (Map.Entry<String, ActorConfigurer> clazz : classes.entrySet()) {
            System.out.printf("Load = %s\r\n", clazz.getKey());

            // 初始化所有的 Actor
            ActorConfigurer configurer = clazz.getValue();
            configurer.load();

            // 加载所有操作码
            Optional<List<Integer>> ops = configurer.getOpValues();
            if (ops.isPresent()) {
                for (Integer op : ops.get()) {
                    configurers.put(op, configurer);
                }
            }
        }

        // 多线程唤醒, 这时候就是挂起服务服务运行, 这里就是刚刚的调用方法
        configurers.disposer();
        return configurers;
    }
}

@Bean 有自带 initMethod(构造)destroyMethod(析构) 用于主要初始|析构化, 为了节约理解成本不采用, 实际上可以内部类扩展自己的初始化和推出回调, 这里主要精简降低理解成本

最后挂起 WebSocket 服务跑测试样例确认线程调用即可 ( WebSocketApplication ):

/**
 * 全局 WebSocket 挂载服务
 * BinaryWebSocketHandler 代表了二进制传输数据
 * TextWebSocketHandler 代表了文本流传输数据
 */
@Order
@Component
public class WebSocketApplication extends TextWebSocketHandler {
    /// 其他略

    /**
     * Text - 接收到文本数据回调
     * 如果二进制数据则是 handleBinaryMessage 回调
     *
     * @param session Websocket
     * @param message data
     * @throws Exception Error
     */
    @Override
    protected void handleTextMessage(@NonNull WebSocketSession session, @NonNull TextMessage message) throws Exception {
        /// 其他略
        
        // 判断传递的格式类型正确
        JsonNode opNode = json.get("op");
        JsonNode argsNode = json.get("args");
        if (!opNode.isInt() || !argsNode.isObject()) {
            logger.error("传递数据格式错误");
            return;
        }

        // 修改成多线程调用, 直接推送给 Actor 内部消息队列让内部处理
        Integer op = opNode.asInt();
        ActorConfigurer configurer = container.get(op);
        configurer.push(op, "hello", "world");// 这里测试推送数据过去
    }
}

走下 websocket 数据推送测试就可以查看到多线程调用具体流程, 这样处理作为游戏服务端 skynet 调试更加容易( idea直接调试 ), 但是因为没有脚本机制导致没办法都功能业务代码热更新( 只能热更新加载配置表数据 ).

这里处理到这里已经完成比较基础的 Actor 机制部署, 其他语言也可以基于这种方式实现自己的服务端业务系统; 总体上面来说, 因为 H5 大部分采用都是轻度项目没有太高热更新要求, 哪怕项目重启只需要客户端保持重连机制就可以满足日常简单的游戏服务端调试业务.

当然也不止游戏服务端, 其他需要长连接的数据请求访问也可以接受这种方式.