Actor框架配置
这里需要先查看 基础Actor原理
之前编写基础的 Actor 框架, 其中忘了解释 RPC(Remote Procedure Call,远程过程调用) 概念, 它允许计算机程序跨网络使用过程或函数.
RPC 使得一个计算机程序能够调用另一个计算机程序的函数, 就像在本地执行一样.
虽然
RPC常用在分布式计算当中, 但是在游戏服务端当中也是很常见的存在
还有关于 Actor 设计优缺点问题, 先是设计的优点:
- 可扩展性: 可以利用网关来转发到对应其他计算服务来分布发送给其他服务器处理任务
- 容错性: 每个 Actor 都是独自声明的模块, 单个 Actor 内部系统奔溃并不会干扰到其他 Actor 对象
- 并发性: 每个 Actor 都有各自线程池负责调度, 多个 Actor 对象可以被线程同时运行
- 模块化: 每个 Actor 都可以被视为一个独立的模块, 互相之间可以通过内部推送消息队列转发任务
但是也带来了缺点:
- 复杂性: 相比其他同步架构, Actor 引入线程池/消息队列/互不干扰/读写锁设计等, 在设计过程当中带来更多心智负担.
- 性能损耗: Actor 引入了消息队列和读写锁, 相比传统加锁之后执行多出了少许性能占用
- 测试难度: 实现很多概念带来的就是很难对其进行测试调试, 如果 IDE 不对其支持断点调试等操作会让人抓狂( skynet 的业务调试就是限定Linux/Unix调试, 对 Lua 调试也是只能手动输出分析 ).
- 不可拒绝: Actor 只负责接收消息且不会去管什么数据是否正确, 对于 Actor 来说只管取出数据转发而不管数据对不对.
还有 线程分配 的概念, 常规来说现代服务器采用默认配置为 双核四线程 为最低配置,
也就是低于这个配置不考虑什么多线程开发问题( 单核双线程的环境可能连开发调试都成问题 ).
日常开发底层网络架构的时候需要以最低 4线程 做基础开发, 而其中需要预留以下线程做功能:
主线程 - 1: 一般就是主要启动线程防止线程退出, 默认Spring框架启动的时候就已经启动该线程.定时|网络线程 - 1: 用于全局处理交换任务通知的线程, 一般业务线程或者网络挂起监听等用来管理业务线程A消息延时或者立即推送到业务线程B.业务线程 - N:Actor对象的业务进程, 用于唤醒所有Actor当中各自的队列处理, 如果服务器全部当作唯一服务那么预留两个线程之后剩下线程可以分配给Actor处理.
上面都是需要忘记补充的内容, 只有了解上面那些相关知识点才能后续扩展属于自己 Actor 对象.
任务调度
之前编写过内部的线程池用于调度任务, 但是建议还是自己编写封装好调度线程池功能, 这样才能比较充分裁剪定义业务.
首先就是需要封装时间回调功能 ( Event ):
/**
* 自定义事件
*/
public interface Event extends Runnable {
/**
* 获取任务ID
* @return Long
*/
Long getTaskId();
/**
* 设置任务ID
* @param id Long
*/
void setTaskId(Long id);
}
之后就是多线程调度管理器, 用于线程调度池管理( EventMonitor ):
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* 事件调度管理器
*/
public class EventMonitor extends ScheduledThreadPoolExecutor {
/**
* 原子量用于获取调用事件id
*/
private AtomicLong id = new AtomicLong(0);
/**
* 构造方法
*
* @param corePoolSize 线程数
*/
public EventMonitor(int corePoolSize) {
super(corePoolSize);
}
/**
* 构造方法
*
* @param corePoolSize 线程数
* @param threadFactory 线程构造工厂
*/
public EventMonitor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
}
/**
* 构造方法
*
* @param corePoolSize 线程数
* @param handler 异常句柄
*/
public EventMonitor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, handler);
}
/**
* 构造方法
*
* @param corePoolSize 线程数
* @param threadFactory 线程构造工厂
* @param handler 异常句柄
*/
public EventMonitor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, threadFactory, handler);
}
/**
* 线程直接调用
*
* @param event 事件
*/
public void execute(Event event) {
event.setTaskId(id.incrementAndGet());
super.execute(event);
}
/**
* 线程定时调用
*
* @param event 事件
* @param delay 延迟
* @param unit 时间单位
* @return ScheduledFuture
*/
public ScheduledFuture<?> schedule(Event event, long delay, TimeUnit unit) {
event.setTaskId(id.incrementAndGet());
return super.schedule(event, delay, unit);
}
/**
* 线程定时调用, 这里定时要求周期调用是基于当前唤醒时间之后的调用, 不管任务是否已经完成
*
* @param event 事件
* @param initialDelay 初始化延迟
* @param period 延迟时间
* @param unit 时间单位
* @return ScheduledFuture
*/
public ScheduledFuture<?> scheduleAtFixedRate(Event event, long initialDelay, long period, TimeUnit unit) {
event.setTaskId(id.incrementAndGet());
return super.scheduleAtFixedRate(event, initialDelay, period, unit);
}
/**
* 线程定时调用, 这里定时要求周期调用是基于当前任务唤起完成之后的时间点, 以此为延迟误差
*
* @param event 事件
* @param initialDelay 初始化延迟
* @param delay 延迟时间
* @param unit 时间单位
* @return ScheduledFuture
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Event event, long initialDelay, long delay, TimeUnit unit) {
event.setTaskId(id.incrementAndGet());
return super.scheduleWithFixedDelay(event, initialDelay, delay, unit);
}
/**
* 任务休眠停止
*/
@Override
public void shutdown() {
super.shutdown();
}
/**
* 判断任务是否休眠
* @return boolean
*/
@Override
public boolean isShutdown() {
return super.isShutdown();
}
/**
* 直接休眠限制任务
* @return List
*/
@Override
public List<Runnable> shutdownNow() {
return super.shutdownNow();
}
}
这里就是自定义编写线程池管理器, 后续都是基于这个做多线程管理.
状态标识
客户端和服务端有时候请求服务是需要状态做标识的, 默认 None(全开放) 代表对全部开放,
而同时还有 Authorized(已经验证过登录状态) 等多种复杂状态.
常见于游戏某些服务必须
Actor已经登录过才能识别出玩家ID情况, 所以在@ActorMapping参数需要追加状态类型.
这里基于性能考虑等情况, 最后采用 int 做类型值而不采用 enum 处理( enum 无法做继承成扩展,
如果独立成框架没办法在外部处理扩展 ); 最后处理下类型 @ActorMapping 注解:
import java.lang.annotation.*;
/**
* 启用 Actor.* 相关注册功能
* 这里绑定 Integer = Method 做绑定映射, 让外部可以直接通过 int 类型访问调用到 Actor 方法
*/
@Inherited
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface ActorMapping {
/**
* 声明Actor请求值
*
* @return int
*/
int value() default -1;
/**
* 允许的请求状态, 默认空状态标识可以对外全部开放, 这里允许多个值映射
* @return ActorState
*/
int[] state() default {};
}
这里 state 就是用 int 数组映射入口状态, 默认 [] 代表允许所有状态, 如果有登录状态 Authorized = 1
就限定 @ActorMapping(value = 1111, state = {1}) 即可, 这里展示玩家模块:
/**
* 玩家信息 Actor, 所有涉及到资源扣款等时间都是 Actor 来修改而不是其他地方直接修改
*/
@EnableActor(owner = PlayerInfoLogic.class)
public class PlayerInfoLogic extends ActorConfigurer {
/**
* 日志句柄
*/
final Logger logger = LoggerFactory.getLogger(PlayerInfoLogic.class);
/**
* 提交登录信息
*/
@ActorMapping(value = 1110)
public void login() {
logger.info("Login !!!!!!!");// 请求登录
}
/**
* 已经登录完成
*/
@ActorMapping(value = 1111, state = {1})
public void printLoginId() {
logger.info("Hello.World !!!!!!!"); // 确认已登录
}
}
这里的 state 可以定义成全局常量码表方便来统一管理全部状态, 之后就是处理下 ActorConfigurer 工具类:
import org.springframework.lang.Nullable;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Actor服务
* 多线程消息队列推送, 这里采用 @Nullable 处理 null, 默认实例化的时候不会申请多余内存
*/
public abstract class ActorConfigurer {
/**
* 这里修改为可为 null, 初始化类不会直接实例化对象, 节省实例化的空间占用
*/
@Nullable
private Map<Integer, ActorFuture> futures;
/**
* 记录内部全部指令码对象, 默认为 null 只有在使用初始化
*/
@Nullable
private List<Integer> ops;
/**
* 链表队列, 默认为 null 只有在使用初始化
*/
@Nullable
private Queue<ActorEvent> events;
/**
* 读写锁
*/
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* 读取锁
*/
private final Lock readLock = lock.readLock();
/**
* 写入锁
*/
private final Lock writeLock = lock.writeLock();
/**
* 类实例化之后的初始化方法
*/
public void init() throws Exception {
Class<? extends ActorConfigurer> configurer = this.getClass();
EnableActor enableActor = configurer.getAnnotation(EnableActor.class);
if (enableActor == null || !enableActor.owner().getName().equals(configurer.getName())) {
throw new ClassNotFoundException(String.format("Not Implemented @EnableActor(own = %s)", configurer.getName()));
}
// 初始化所有记录操作码
Method[] methods = configurer.getMethods();
if (ops == null) {
ops = new ArrayList<>(methods.length);
}
// 初始化所有事件
if (futures == null) {
futures = new HashMap<>(methods.length);
}
// 检索出所有Actor入口
for (Method method : methods) {
ActorMapping mapping = method.getAnnotation(ActorMapping.class);
if (mapping != null) {
// 写入操作码和方法
Integer op = mapping.value();
int[] status = mapping.state();
List<Integer> state = new ArrayList<>(status.length);
for (int v : status) {
state.add(v);
}
ops.add(op);
futures.put(op, new ActorFuture(op, this, method, state));
}
}
}
/**
* 类析构之后的调用方法
*/
public void destroy() throws Exception {
if (futures != null) {
for (Map.Entry<Integer, ActorFuture> future : futures.entrySet()) {
System.out.printf("Unload ActorMapping = %s%n", future.getValue().method().getName());
remove(future.getKey());
}
}
}
/**
* 清除目前挂载 Actor 入口
*
* @param key 操作码
*/
public void remove(Integer key) {
if (futures != null) {
futures.remove(key);
}
if (ops != null) {
ops.remove(key);
}
}
/**
* 唤醒代码块
*
* @param op 操作码
* @param args 操作参数
*/
public void execute(Integer op, Object... args) throws Exception {
if (futures != null) {
ActorFuture future = futures.get(op);
if (future != null) {
future.execute(args);
}
}
}
/**
* 获取目前装载的所有操作码
*
* @return List<Integer>
*/
public List<Integer> ops() {
if (ops == null) {
Class<? extends ActorConfigurer> configurer = this.getClass();
EnableActor enableActor = configurer.getAnnotation(EnableActor.class);
if (enableActor == null) {
ops = new ArrayList<>(0);
return ops;
}
Method[] methods = configurer.getMethods();
ops = new ArrayList<>(methods.length);
for (Method method : methods) {
ActorMapping mapping = method.getAnnotation(ActorMapping.class);
if (mapping != null) {
ops.add(mapping.value());
}
}
}
return ops;
}
/**
* 获取目前所有的代码块
*
* @return Map<Integer, ActorFuture>
*/
public Map<Integer, ActorFuture> futures() {
if (futures == null) {
futures = new HashMap<>(0);
}
return futures;
}
/**
* 推送事件
*
* @param op 特征码
* @param args 参数
*/
public void push(Integer op, Integer state, Object... args) {
writeLock.lock(); // 写入锁开启
if (events == null) {
events = new LinkedList<>();
}
Object[] data = new Object[args.length];
System.arraycopy(args, 0, data, 0, args.length);
events.add(new ActorEvent(op, state, data));
writeLock.unlock(); // 写入锁让出
}
/**
* 唤起消息队列执行, 内部千万小心抛出异常避免导致线程死锁
*/
public void disposer() {
if (events == null || futures == null) return;
readLock.lock(); // 读取锁开启
if (events.isEmpty()) {
readLock.unlock(); // 读取锁让出
return;
}
ActorEvent event = events.poll();
ActorFuture future = futures.get(event.op());
if (future == null) {
readLock.unlock(); // 读取锁让出
return;
}
List<Integer> status = future.getStatus();
if (status.isEmpty() || status.contains(event.getState())) {
try {
future.invoke(filter(event.getArgs()));
} catch (Exception exception) {
throw new RuntimeException(exception);
} finally {
readLock.unlock();
}
} else {
readLock.unlock();
}
}
}
这里需要处理下 ActorFuture|ActorEvent 追加了 state 用来判断是否允许被访问:
/**
* 推送事件数据, 后续扩展可以放置于此
*
* @param op 操作码
* @param state 允许访问状态
* @param data 推送数据
*/
public record ActorEvent(Integer op, Integer state, Object[] data) {
}
/**
* Actor 封装执行代码段, 追加了 `state` 调用
*
* @param op
* @param instance
* @param method
* @param state
*/
public record ActorFuture(Integer op, Object instance, Method method, List<Integer> state) {
/**
* 唤醒执行方法
*
* @param args 传入参数
*/
public void execute(Object... args) throws Exception {
method.invoke(instance, args);
}
}
目前已经处理好 Actor 相关功能, 之后就是编写配置器模板和容器类处理, 但是稍等下来定时唤醒方法一直占用 CPU 其实能耗也高,
这里查看 skynet 当中的线程定时器运作方法:
//skynet_start.c
static void *
thread_timer(void *p) {
struct monitor * m = p;
skynet_initthread(THREAD_TIMER);
for (;;) {
skynet_updatetime();
skynet_socket_updatetime();
CHECK_ABORT
wakeup(m,m->count-1);
// 这里采用线程定时处理, 2500微秒才唤醒触发
usleep(2500);
if (SIG) {
signal_hup();
SIG = 0;
}
}
// wakeup socket thread
skynet_socket_exit();
// wakeup all worker thread
pthread_mutex_lock(&m->mutex);
m->quit = 1;
pthread_cond_broadcast(&m->cond);
pthread_mutex_unlock(&m->mutex);
return NULL;
}
所以 ActorContainer 容器对象, 不在直接简单遍历线程挂起多线程去同时抢占:
// 自己定义的事件管理器
import com.app.ford.event.EventMonitor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Actor全局容器用于后续扩展, 可以考虑采用 HashMap 或者 TreeMap, 这里采用 HashMap
* 后续扩展容器只需要修改该类即可, 不需要在外部处理, 这里内部先不做扩展实现
*/
public class ActorContainer extends HashMap<Integer, ActorConfigurer> {
/**
* 调用线程池
*/
final EventMonitor monitor;
final List<ActorConfigurer> configurers;
final int nThreads;
/**
* 初始化方法
*
* @param nThreads 线程池数量
*/
public ActorContainer(int nThreads) {
this.nThreads = nThreads;
this.monitor = new EventMonitor(nThreads);
this.configurers = new ArrayList<>();
}
/**
* 初始化方法
*
* @param nThreads 线程池数量
* @param capacity 内部唯一Actor句柄初始化数量
*/
public ActorContainer(int nThreads, int capacity) {
this.nThreads = nThreads;
this.monitor = new EventMonitor(nThreads);
this.configurers = new ArrayList<>(capacity);
}
/**
* 初始化方法
*
* @param count 内部初始化数据容量
* @param nThreads 线程池数量
* @param capacity 内部唯一Actor句柄初始化数量
*/
public ActorContainer(int count, int nThreads, int capacity) {
super(count);
this.nThreads = nThreads;
this.monitor = new EventMonitor(nThreads);
this.configurers = new ArrayList<>(capacity);
}
/**
* 初始化方法
*
* @throws Exception 异常
*/
public void init() throws Exception {
// 过滤所有唯一对象并写入对象
forEach((op, configurer) -> {
if (!configurers.contains(configurer)) {
configurers.add(configurer);
}
});
// 所有唯一对象初始化
for (ActorConfigurer configurer : configurers) {
configurer.init();
}
// 初始化之后就是驱动调用线程
disposer();
}
/**
* 析构方法
*
* @throws Exception 异常
*/
public void destroy() throws Exception {
monitor.shutdown();// 停止所有任务
for (ActorConfigurer configurer : configurers) {
configurer.destroy();
}
}
/**
* 采用线程池调用
*
* @param op 操作码
* @param args 参数
*/
public void execute(Integer op, Object... args) {
if (containsKey(op)) {
ActorConfigurer configurer = get(op);
monitor.execute(() -> {
try {
configurer.execute(op, args);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
/**
* 驱动线程池调用内部方法
*/
public void disposer() {
for (int i = 0; i < nThreads; i++) {
long idx = i + 1;
// 延迟唤醒, 错开其他相对线程指定延迟, 基本上建议线程唤起之间间隔唤起
monitor.scheduleAtFixedRate(() -> {
// 休眠跳过所有任务
if (monitor.isShutdown()) {
return;
}
// 唤起所有内部 Actor 的任务
forEach((op, configurer) -> {
try {
configurer.disposer();
} catch (Exception e) {
e.printStackTrace();
}
});
}, 0, idx * 100 + 1000, TimeUnit.MILLISECONDS);
}
}
}
最后挂载 @Bean 对象的最终方法( ActorConfig ):
/**
* 编写测试 Actor 加载类
*/
@Configuration
public class ActorConfig {
private final ApplicationContext context;
public ActorConfig(ApplicationContext context) {
this.context = context;
}
/**
* 全局挂载的 Actor 对象
*
* @return String
*/
@Bean(initMethod = "init", destroyMethod = "destroy")
public ActorContainer searchActor() {
ActorContainer configurers = new ActorContainer(4);// 四个线程处理
Map<String, ActorConfigurer> classes = context.getBeansOfType(ActorConfigurer.class);
for (Map.Entry<String, ActorConfigurer> clazz : classes.entrySet()) {
// 初始化所有的 Actor
ActorConfigurer configurer = clazz.getValue();
// 加载所有操作码
List<Integer> ops = configurer.ops();
for (Integer op : ops) {
configurers.put(op, configurer);
}
}
return configurers;
}
}
之后就是 WebSocket 挂载功能调整下:
/**
* 全局 WebSocket 挂载服务
* BinaryWebSocketHandler 代表了二进制传输数据
* TextWebSocketHandler 代表了文本流传输数据
*/
@Order
@Component
public class WebSocketApplication extends TextWebSocketHandler {
/**
* 日志库
*/
Logger logger = LoggerFactory.getLogger(WebSocketApplication.class);
/**
* 静态全局会话对象, 这里就是新加入的对象
*/
static final Map<WebSocketSession, Integer> sessions = new HashMap<>();
/**
* 设置会话状态
* @param session 会话句柄
* @param state 数据状态
*/
public void setSessionState(WebSocketSession session, Integer state) {
if (sessions.containsKey(session)) {
sessions.put(session, state);
}
}
/**
* JSON解析库
*/
ObjectMapper mapper = new ObjectMapper();
/**
* 全局 Actor 容器
*/
final ActorContainer container;
/**
* 构造方法
*
* @param container 系统注入容器
*/
public WebSocketApplication(ActorContainer container) {
this.container = container;
}
/**
* Established - 会话连接回调
*
* @param session 会话句柄
* @throws Exception 异常错误
*/
@Override
public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
logger.debug("Established = {}", session);
sessions.put(session, 0);// 初始化会话状态
}
/**
* Closed - 会话连接关闭回调
*
* @param session 会话句柄
* @param status 关闭状态
* @throws Exception 异常错误
*/
@Override
public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus status) throws Exception {
logger.debug("Close = {},{}", session, status);
sessions.remove(session);// 清除会话状态
}
/**
* Error - 会话错误异常回调
*
* @param session 会话句柄
* @param exception 回调错误
* @throws Exception 异常错误
*/
@Override
public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {
logger.debug("Error = {},{}", session, exception.toString());
sessions.remove(session);// 清除会话状态
}
/**
* Text - 接收到文本数据回调
* 如果二进制数据则是 handleBinaryMessage 回调
*
* @param session Websocket
* @param message data
* @throws Exception Error
*/
@Override
protected void handleTextMessage(@NonNull WebSocketSession session, @NonNull TextMessage message) throws Exception {
logger.debug("Message = {},{}| Thread = {}", session, message.getPayload(), Thread.currentThread().getId());
// 获取提交内容
String payload = message.getPayload();
if (payload.isBlank()) return;
// 解析内容要求格式为 { op: int, args: object } 传递
JsonNode json = mapper.readTree(payload);
if (!json.isObject()) {
logger.error("传递数据格式错误");
return;
}
// 确认数据格式存在
if (!json.has("op") || !json.has("args")) {
logger.error("传递数据格式错误");
return;
}
// 判断传递的格式类型正确
JsonNode opNode = json.get("op");
JsonNode argsNode = json.get("args");
if (!opNode.isInt() || !argsNode.isObject()) {
logger.error("传递数据格式错误");
return;
}
// 推送队列
Integer op = opNode.asInt();
ActorConfigurer configurer = container.get(op);
// 将状态和会话推送给 Actor
configurer.push(op, sessions.get(session), session, sessions);
}
}
这里是比较粗糙的处理方式, 后续再细化处理处理, 之后就是测试登录权限功能( PlayerInfoLogic ):
/**
* 玩家信息 Actor, 所有涉及到资源扣款等时间都是 Actor 来修改而不是其他地方直接修改
*/
@EnableActor(owner = PlayerInfoLogic.class)
public class PlayerInfoLogic extends ActorConfigurer {
/**
* 日志句柄
*/
final Logger logger = LoggerFactory.getLogger(PlayerInfoLogic.class);
final WebSocketApplication websocket;
public PlayerInfoLogic(WebSocketApplication websocket) {
this.websocket = websocket;
}
/**
* 获取登录信息
*/
@ActorMapping(value = 1110)
public void login(WebSocketSession session, Map<WebSocketSession, Integer> sessions) {
logger.info("login !!!!!!!{}", Thread.currentThread().getId());// 请求登录
websocket.setSessionState(session, 1);// 切换状态 state = 1
}
/**
* 已经登录完成
*/
@ActorMapping(value = 1111, state = {1})
public void printLoginId(WebSocketSession session, Map<WebSocketSession, Integer> sessions) {
logger.info("Hello.World !!!!!!!"); // 确认已登录, 直接打印
}
}
重新跑下以下 WebScoket 请求内容推送以下 JSON 调试下处理确认能被调用:
// 推送登录
{ "op": 1110, "args":{}}
// 确认登录, 如果没有登录该请求是没有响应的
{ "op": 1111, "args":{}}
现在基本的调度功能已经准备的七七八八了, 至此基本上可以跑个基本的 Actor 服务器框架, 之后就是对所有功能细化处理.
这里编写简单 组件库 引入调用