MeteorCat / Akka Actor服务

Created Thu, 17 Apr 2025 22:29:39 +0800 Modified Wed, 29 Oct 2025 23:25:00 +0800
12722 Words

Akka Actor服务

这里按照官方说明直接采用多项目 Maven + JDK17 设置, 基础的父根目录框架名为 fusion-framework

需要注意其实还有热更新方案等考虑, 但是这里基于 websocket 对于服务热更要求不高所以直接跳过

另外还需要注意: akka2.6.x 之后的版本转向闭源付费, 仅允许在开发和非生产系统中免费使用, 如果为了规避商业行为清尽量采用 2.6.x 版本二次开发.

或者采用后续开源版本: Pekko

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.meteorcat.fusion</groupId>
    <artifactId>fusion-framework</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>

    <!-- 全局属性 -->
    <properties>
        <java.version>17</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>3.3.10</spring-boot.version>
        <maven-compiler.version>3.13.0</maven-compiler.version>
        <lombok.version>1.18.36</lombok.version>

        <!-- 2023.0.x 又名 Leyton	SpringBoot 3.3.x, 3.2.x -->
        <spring-cloud.version>2023.0.2</spring-cloud.version>

        <!-- akka3 Actor -->
        <akka3.version>2.6.21</akka3.version>
        <scala.binary.version>2.13</scala.binary.version>
        <logback.version>1.5.18</logback.version>
    </properties>


    <!-- 子模块定义 -->
    <modules>
        <module>fusion-actor</module> <!-- Actor集群项目 -->
    </modules>


    <!-- 管理子项目的依赖 -->
    <dependencyManagement>
        <dependencies>
            <!-- LOMBOK组件 -->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
                <version>${lombok.version}</version>
            </dependency>

            <!-- Akka 为SLF4J 后端 -->
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>${logback.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>


            <!-- WebSocket依赖 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-websocket</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>


            <!-- springboot依赖 -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
            </dependency>

            <!-- springCloud依赖  -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <!-- Akka3Actor -->
            <dependency>
                <groupId>com.typesafe.akka</groupId>
                <artifactId>akka-bom_${scala.binary.version}</artifactId>
                <version>${akka3.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

        </dependencies>
    </dependencyManagement>


    <!-- 配置子项目第三方库源 -->
    <repositories>
        <repository>
            <id>central</id>
            <url>https://maven.aliyun.com/repository/central</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>


        <repository>
            <id>netflix-candidates</id>
            <name>Netflix Candidates</name>
            <url>https://artifactory-oss.prod.netflix.net/artifactory/maven-oss-candidates</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>

        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>


        <repository>
            <id>akka-repository</id>
            <name>Akka library repository</name>
            <url>https://repo.akka.io/maven</url>
        </repository>
    </repositories>

    <!-- 配置公共的插件管理等 -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven-compiler.version}</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

这里构建一个子项目演示下怎么构建个单独 Actor, 首先 pom.xml 的配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <artifactId>fusion-actor</artifactId>

    <!-- 继承框架 -->
    <parent>
        <groupId>com.meteorcat.fusion</groupId>
        <artifactId>fusion-framework</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <!-- 第三方依赖 -->
    <dependencies>

        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <!-- Actor LogBack -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </dependency>

        <!-- Actor -->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor-typed_${scala.binary.version}</artifactId>
        </dependency>


    </dependencies>

    <!-- 打包配置 -->
    <build>
        <plugins>

            <!-- 这里利用 springframework 工具打包 -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <mainClass>com.meteorcat.fusion.FusionGameActorApplication</mainClass>
                    <skip>false</skip>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

创建个 Actor 对象用于通知内部 Actor 对象:

package com.meteorcat.fusion.actors.echo;

import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;

/**
 * Echo 服务装载
 */
public class EchoBehavior extends AbstractBehavior<EchoBehavior.Message> {


    /**
     * Actor 消息
     */
    public record Message(String content, ActorRef<Message> reply) {

    }


    /**
     * 初始化
     *
     * @param context Actor上下文
     */
    public EchoBehavior(ActorContext<Message> context) {
        super(context);
    }

    /**
     * 静态注册对象
     *
     * @return Behavior
     */
    public static Behavior<EchoBehavior.Message> create() {
        return Behaviors.setup(EchoBehavior::new);
    }


    /**
     * 构建回调
     *
     * @return Receive
     */
    @Override
    public Receive<Message> createReceive() {
        return newReceiveBuilder()
                .onMessage(Message.class, this::onEcho)
                .build();
    }


    /**
     * 回调内部
     *
     * @param msg 消息
     * @return Behavior
     */
    private Behavior<Message> onEcho(Message msg) {
        getContext().getLog().info("ECHO : {}!", msg.content);

        // 返回结果让其他 Actor 接收
        if (msg.reply != null) {
            msg.reply.tell(new Message(msg.content, getContext().getSelf()));
        }
        return this;
    }
}

最后就是项目入口对象:

package com.meteorcat.fusion;

import akka.actor.typed.ActorSystem;
import com.meteorcat.fusion.actors.echo.EchoBehavior;

import java.io.IOException;

/**
 * 游戏项目启动入口
 */
public class FusionGameActorApplication {
    /**
     * 启动项目方法
     *
     * @param args 启动方法
     */
    public static void main(String[] args) {

        final ActorSystem<EchoBehavior.Message> actorSystem = ActorSystem.create(
                EchoBehavior.create(),
                "echo"
        );

        actorSystem.tell(new EchoBehavior.Message("hello.world", null));

        try {
            System.out.println(">>> Press ENTER to exit <<<");
            int ignore = System.in.read();
        } catch (IOException ignored) {
        } finally {
            actorSystem.terminate();
        }
    }
}

Actor管理器

一般来说, 构建父节点可以细分化在启动子节点维护新服务, 而父节点就是升级为 monitor(任务管理器) 的监控单位, 这时候父节点就是保证子节点服务正确运行并且在出错时进行服务重启还原.

这里面的概念和 erlang 拆分服务差不多, 就是 monitor(1)-worker(N) 网状扩散出来.

每个 Actor 系统都是很大的数据结构且可能会占用多个线程, 所以必须要控制好静态 Actor 数量避免创建过多 actor

同样在 Akka 之中也是含有和 erlang 一样的监管策略:

  • OneForOneStrategy: 单个子服务只会影响自身重启, 比如当 battle-100 服务奔溃重启只会影响自身子服务
  • AllForOneStrategy: 用于子服务联系紧密的服务, 要求单个子服务崩溃则该父节点下面所有子服务都要重启一遍

这里假设经由 authority 授权的时候, 需要挂载玩家存档数据到当前节点数据内存上; 可以通过推送 player 父节点动态创建其子节点经由 OneForOneStrategy 策略来挂载服务器内存, 假设需要强行下线和服务器维护的时候可以简单推送 player 父节点让所有子节点下线即可.

Akka 当中都是由 ActorRef 来做 Actor 引用对象用于互相 Actor 之间的消息投递, 另外除了引用还是 路径 差别, 常见的路径定义如下:

"akka://my-sys/user/service-a/worker1"                   // 纯本地路径
"akka.tcp://[email protected]:5678/user/service-b" // TCP远程路径
"akka.udp://[email protected]:5678/user/service-c" // TCP远程路径

Actor 创建之后默认都会定义以下路径:

  • /user: 所有由用户创建的顶级actor的监管者, 用 ActorSystem.actorOf 创建的 actor 在其节点之下
  • /system: 所有由系统创建的顶级actor的监管者, 如日志监听器或由配置指定在actor系统启动时自动部署的actor
  • /deadLetters: 死信actor, 所有发往已经终止或不存在的actor的消息会被重定向到这里(以尽最大努力为基础:消息丢失不可避免)
  • /temp: 所有系统创建的短时actor的监管者, 例如那些在ActorRef.ask的实现中用到的actor
  • /remote: 人造虚拟路径, 用来存放所有其监管者是远程actor引用的actor

网状节点状态如下:

node
  |
  |----- /system(获取整体信息系统节点)
  |        |
  |        |------ logger
  |        |------ .......
  |----- /user
  |        |----- authority(登录授权) 
  |        |----- player(玩家挂载,动态扩展子节点)
  |        |        |----- user-1001
  |        |        |----- user-1003
  |        |        |----- .......(玩家节点统一由player管理)
  |        |----- battle(跨服等共享战斗服务)
  |        |----- ......
  |----- /deadLetters(丢失的消息记录, 消息100%不丢失不太可能)
  |----- /temp(临时节点数据)
  |----- /remote(远程节点)

大部分游戏过程其实就是玩家的 自娱自乐, 也就是玩家针对 player 自己参与策划玩法对自己数据做 curd, 但是也有涉及到跨节点直接共享战斗情况, 所以这种情况会也是会独立构建 battle 节点处理这种情况.

Akka 采用强制父子节点配置, 所以当你通过创建 ActorSystem 的时候其实已经帮你构建好了 monitor

这里创建 OneForOneStrategy 的管理器:

///  authority/AuthoritySupervisor.java 授权管理器类
package com.meteorcat.fusion.actors.authority;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.SupervisorStrategy;
import akka.japi.Function;
import lombok.Getter;
import scala.concurrent.duration.Duration;

import static akka.actor.SupervisorStrategy.*;


/**
 * 授权Actor管理器
 * 采用 OneForOneStrategy 静态方案
 */
@Getter
public class AuthoritySupervisor extends AbstractActor {

    /**
     * 工作 Actor
     */
    private final ActorRef worker = getContext().actorOf(
            AuthorityWorker.props(),
            "authorityWorker"
    );

    /**
     * 设置管理器启动规则, 最多5s之内重启3次
     */
    @Getter
    private static SupervisorStrategy strategy = new OneForOneStrategy(
            3,
            Duration.create("5 second"),
            new Function<Throwable, SupervisorStrategy.Directive>() {
                @Override
                public SupervisorStrategy.Directive apply(Throwable throwable) {
                    if (throwable instanceof ArithmeticException) {
                        return resume(); // 忽略策略
                    } else if (throwable instanceof NullPointerException) {
                        return restart(); // 重启策略
                    } else if (throwable instanceof IllegalArgumentException) {
                        return stop(); // 关闭策略
                    } else {
                        return escalate(); // 忽略
                    }
                }
            }
    );

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .matchAny(any -> {
                    // 转发给子节点, 并且不然其传入 Sender
                    // 如果需要让其能够交互则需要按照以下方式处理
                    // worker.tell(any, getSelf());
                    worker.tell(any, ActorRef.noSender());
                })
                .build();
    }
}


/// authority/AuthorityWorker.java 授权工程进程类
package com.meteorcat.fusion.actors.authority;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;

/**
 * Authority工作线程
 */
public class AuthorityWorker extends AbstractActor {

    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);


    /**
     * 静态返回 Actor 示例
     *
     * @return Props
     */
    public static Props props() {
        return Props.create(AuthorityWorker.class);
    }

    /**
     * 启动回调
     */
    @Override
    public void preStart() {
        log.info("Starting Actor Worker({}) : {}", getSelf().path(), this.hashCode());
    }


    /**
     * 关闭回调
     */
    @Override
    public void postStop() {
        log.info("Stopping Actor Worker({}) : {}", getSelf().path(), this.hashCode());
    }

    /**
     * Actor 消息回调
     *
     * @return Receive
     */
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(String.class, s -> {
                    log.info("Received String : {}", s);
                })
                .match(Long.class, l -> {
                    log.info("Received Long : {}", l);
                })
                .match(Integer.class, i -> {
                    log.info("Received Integer : {}", i);
                })
                .build();
    }
}


/// 服务启动入口
package com.meteorcat.fusion;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.meteorcat.fusion.actors.authority.AuthoritySupervisor;

public class FusionGameServiceApplication {
    public static void main(String[] args) {

        // 创建 Actor 服务
        final ActorSystem system = ActorSystem.create("FusionGameService");

        // 追加 Authority 管理器
        final Props authority = Props.create(AuthoritySupervisor.class);
        final ActorRef authorityRef = system.actorOf(authority);

        // 这里提交消息即可
        authorityRef.tell(100L, ActorRef.noSender());

    }
}

这里就是创建 OneForOneStrategy 的对象, 可以按照以上样例思考下创建 player 动态构建多 Actor 列表; 以上就是构建单个服务器的 Actor, 但是现代系统已经不满足 单Actor 现在需要将服务分布扩展到多个物理(虚拟)服务器.

集群化

游戏等强交互服务不可能单独部署到单机服务上, 需要将游戏服务负载到不同服务器设备上以缓解大量计算工作, 而 Akka 提供自带的集群功能可以直接使用, 这里需要知道以下概念:

  • node(节点): 集群的最基础逻辑成员, 物理计算机上可能有多个节点, 由 hostname:port:uid 元组定义
  • cluster(集群): 通过成员服务连接在一起的一组节点node
  • leader(领导): 集群中充当领导者的单个节点用来选举, 管理集群 聚合(convergence)成员(membership) 状态转换

官方说明很复杂但是基本上离不开最终的主要原理: 选举领导机制, 关于这个机制很复杂建议其他方面机制了解.

集群参与 状态(state):

  • joining:节点正在加入集群时的状态。
  • weekly up:配置了akka.cluster.allow-weakly-up-members=on时,启用的状态。
  • up:集群中节点的正常状态。
  • leaving/exiting:优雅的删除节点时,节点的状态。
  • down:标记为已下线的状态。
  • removed:墓碑状态,表示已经不再是集群的成员。

集群的 行为(action):

  • join:加入集群。
  • leave:告知节点优雅的离开集群。
  • down:标记集群为已下线。

这里主要概括就是在不同设备挂载 Akka 服务, 加入(Join) 构建成不同成员节点从而把服务在不同设备计算, 其中通过选举出某个节点进行委派计算任务从而有效将计算功能分布到不同服务器设备上.

然后需要大概概念了解下就行了, 后续在官方集群案例里面你就会看的很迷糊, 这里了解完之后还有架构部署差别:

  • 服务注册(application) * N - 工作程序(worker) * N
  • 注册工作程序一体 * N

官方推荐的是把服务注册和工作进程抽离分开, 多台种子节点分布部署集群:

akka {
  loglevel = debug
  actor {
    provider = "cluster"
  }
  remote {
    artery {
      canonical.hostname = "127.0.0.1"
      # 可以看到这里端口赋予 0 让其随机分配
      canonical.port = 0
    }
  }
  cluster {
    # 其他服务器当中的集群地址, 如果正式的话 127.0.0.1 需要分布处理
    seed-nodes = [
      "akka://[email protected]:25251",
      "akka://[email protected]:25252"]
    downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
    
    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    # 集群时候千万不要使用该配置, 自动关闭会导致集群单例直接重复创建
    # auto-down-unreachable-after = 10s
  }
}

# Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]

# Sigar native library extract location during tests.
# Note: use per-jvm-instance folder when running multiple jvm on one host.
akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native

另外的集群服务(如 25251/25252 端口种子服务)则另外重新启动 worker 项目编写具体业务逻辑

官方建议采用第三方服务(zookeeper,spring-cloud)发现节点链接转发到内部服务之中, 但是为了更好节约利用服务器资源会让服务节点和种子节点合为一体而让外部做TCP|WebSocket|HTTP负载均衡:

akka {
  actor {
    provider = "cluster"
  }
  remote {
    artery {
      canonical.hostname = "127.0.0.1"
      # 可以看到这里端口赋予 2551, 代表这里不仅作为服务发现还作为业务节点
      canonical.port = 2551
    }
  }
  cluster {
    # 可以看到首个服务节点为
    seed-nodes = [
      "akka.tcp://[email protected]:2551",
      "akka.tcp://[email protected]:2552"]
    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    #
    # auto-down-unreachable-after = 10s
    
    downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
  }
}

# Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]

# Sigar native library extract location during tests.
# Note: use per-jvm-instance folder when running multiple jvm on one host.
akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native

官方样例

后续启动多个种子节点只需要把 akka.remote.artery.canonicalseed-nodes 首个节点修改为自身节点信息即可.

需要说明 角色+Hostname+端口 才能作为唯一标识, 然后每个启动服务配置大概如下:

akka {
  # 设置日志等级
  loglevel = debug
  actor {
    # 数据序列化转发, 这里需要查看官方支持的序列化转发方法
    # 不过一般采用明文原生转发即可, 一般直接二进制流之后让各自 Actor 自身去解析
    allow-java-serialization = "off"
  
    # 设置集群
    provider = "cluster"
  }
  remote {
    artery {
      # 按照正式|测试服环境分配的内网监听 Hostname
      canonical.hostname = "127.0.0.1"
      # 假设目前设定网关集群, 那么应该启动的为 12551 端口
      canonical.port = 12551
    }
  }
  cluster {
    # 启动系统时候的默认携带角色
    # 用于分配节点的时候给当前节点附带上角色信息, 在集群的时候十分关键的配置项
    roles = [
        "gateway"
        #"global",
        #"console",
        #"player",
        #"world"
    ]
  
    # 所有种子节点信息
    # 这里 127.0.0.1 可能是本地也可能是远程服务
    # 并且需要注意游戏网关绝对要处于首位确保先被启动的服务
    # 必须要保证首个节点启动挂载, 首个节点为最高优先选举节点
    # fusion 代表系统归属名称, 可以理解为集群中心名称
    seed-nodes = [
      "akka://[email protected]:12551",
      "akka://[email protected]:12552",
      "akka://[email protected]:12553",
      "akka://[email protected]:12554",
      "akka://[email protected]:12555",
      "akka://[email protected]:12556",
      "akka://[email protected]:12557"
    ]
    
    # 关闭掉自动 down 功能
    auto-down-unreachable-after = "off"
    
    # 监控同一JVM中注册和管理多个Akka集群节点的JMXMBean实例, 方便调试节点
    jmx.multi-mbeans-in-same-jvm = "on"
  }
}


# metrics 暂时不需要
# Enable metrics extension in akka-cluster-metrics.
# akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]

# Sigar native library extract location during tests.
# Note: use per-jvm-instance folder when running multiple jvm on one host.
# akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native

注意: 节点过多也不利于管理且增大出错的概率, 所以尽可能控制好节点数量

还有一点就是动态和静态加载问题, 实际上更加推荐动态读取 Memory|DB 内部配置方便被 运维后台 配置更新.

强关联的数据留意好动态加载配置, 因为一旦奔溃的话是服务连锁一起大范围崩溃, 所以如果可以尽量采用确定静态配置项而非动态

之后现在就是编写 gateway 启动入口, 这就是目前单个游戏网关节点:

package com.meteorcat.fusion;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.event.Logging;
import akka.event.LoggingAdapter;

/**
 * 游戏网关
 */
public class GameGatewayApplication {


    /**
     * 入口方法
     *
     * @param args 参数
     */
    public static void main(String[] args) {
        // 启动Actor中心系统
        final ActorSystem system = ActorSystem.create("fusion");
        final LoggingAdapter log = system.log();
        log.info("Creating actor system: {}", system.name());


        // 创建集群监听
        final ActorRef listener = system.actorOf(GameGatewayMonitor.props());
    }


    /**
     * 简单网关
     */
    public static class GameGatewayMonitor extends AbstractActor {

        /**
         * 日志句柄
         */
        final LoggingAdapter log = Logging.getLogger(getContext().system(), this);


        /**
         * 集群句柄
         */
        final Cluster cluster = Cluster.get(getContext().getSystem());

        /**
         * 静态获取 Props 对象
         *
         * @return Props
         */
        public static Props props() {
            return Props.create(GameGatewayMonitor.class);
        }

        /**
         * 订阅集群事件
         */
        @Override
        public void preStart() {
            cluster.subscribe(
                    getSelf(),
                    ClusterEvent.initialStateAsEvents(),
                    ClusterEvent.MemberEvent.class,
                    ClusterEvent.UnreachableMember.class
            );
        }

        /**
         * 退出集群
         */
        public void postStop() {
            cluster.unsubscribe(getSelf());
        }

        /**
         * 回调请求
         *
         * @return Receive
         */
        @Override
        public Receive createReceive() {
            // todo: 可以自定义序列化结构转发消息

            // ClusterEvent 带有大量流程事件可以拦截监听, 可以按照所需去拦截处理
            return receiveBuilder()
                    .match(ClusterEvent.MemberUp.class, memberUp -> {
                        // 集群加入
                        log.info("Member Up: {}", memberUp.member());
                    })
                    .match(ClusterEvent.UnreachableMember.class, unreachableMember -> {
                        // 集群无法投递
                        log.info("Member Unreachable: {}", unreachableMember.member());
                    })
                    .match(ClusterEvent.MemberRemoved.class, memberRemoved -> {
                        // 集群退出
                        log.info("Member Removed: {}", memberRemoved.member());
                    })
                    .match(ClusterEvent.MemberEvent.class, memberEvent -> {
                        // 其他集群事件
                        log.info("Member Event: {}", memberEvent.member());
                    })
                    .matchAny(this::unhandled) // 其他事件不处理
                    .build();
        }
    }
}

这里 createReceive 回调只针对成员简单回调事件做推送, 暂时没有做消息交互的相关功能, 最后打印内容关键字显示:

# 已经选举出首个服务节点
Leader is moving node [akka://[email protected]:12551] to [Up]

现在网关已经架设完成, 现在需要外部推送到网关形成 RPC 服务中心, 可以考虑结合上面的集群考虑怎么设计数据传输层.

如果仅仅想做个负载请求转发的 RPC 网关, 目前的以上功能差不多可以满足条件, 后续就会进入更加复杂的集群概念.

还有 akka集群单例 功能, 这个功能参照下文档就能理解, 其实就是集群构建永久唯一节点分享

集群分片和单例

刚刚示范怎么去启动简单集群网关, 但是没办法满足复杂 Actor 节点转发请求, 这种方式我更加喜欢称为 同名节点; 当然你可以自动手动编写 Actor分片分区 功能, 而 Akka 内部已经集成了 Cluster Sharding 模块来处理; 到这里其实已经算 akka 比较高级的部分, 所以有些概念没办法面面俱到的概括出来.

Cluster Sharding 会额外增加服务器开销, 如果业务简单且计算密集的话可以采用简单 Cluster 处理, 不要盲目选择akka分区分片

集群分片用于需要大量设备分布运算的情况, 如果所需计算设备不多且只需少量 Actor 做运算的情况, 推荐采用 集群单例(Cluster Singleton) 而非直接采用 集群分片(Cluster Sharding).

Akka 的分区分片需要变动一些使用方式:

# 第三方库变动
akka-cluster -> akka-cluster-sharding

# 类型变动
Cluster -> ClusterSharding

# 不再以 subscribe|unsubscribe 做数据消费模式
# 以 ClusterSharding.get(this).start 启动分区代理(启动模式)
# 以 ClusterSharding.get(this).startProxy 启动分区代理(仅代理模式)

集群分片在集群当中追加了 实体(entity) 的标识符概念, 实现集群分片的节点被统称为 实体Actor(entity actor); 每个实体actor只运行在某个进程, 消息发送到该实体actor让其分发到分片出来的实体.

如果采用集群分片需要标记有特定角色的节点组启动, 实体是指由集群分片管理的 Actor

上面所说的 MessageExtractor 其实就是分配调度实体actor目标对象, 可以去自己手动构建分片算法

/// 这里就是简单的 ShardRegion.MessageExtractor 分片算法实现

import akka.cluster.sharding.ShardRegion;

/**
 * 集群分片算法实现
 */
public static class MessageExtractor implements ShardRegion.MessageExtractor {


    /**
     * 默认消息结构接口, 用于外部扩展
     */
    public interface Message {
    }

    /**
     * 分片消息结构接口, 用于实现消息结构来传递
     *
     * @param <T>
     */
    public interface ShardMessage<T> extends Message {
        T getId();
    }

    /**
     * 取模算法的最大分片数量
     */
    private final int numberOfShards;

    /**
     * 构造方法
     *
     * @param numberOfShards 最大分片数量
     */
    public MessageExtractor(int numberOfShards) {
        this.numberOfShards = numberOfShards;
    }


    /**
     * 分片实体标识符
     *
     * @param message 数据消息
     * @return String
     */
    @Override
    public String entityId(Object message) {
        if (message instanceof ShardMessage<?> msg) {
            // 要求传递的消息必须继承 ShardMessage<T>
            return msg.getId().toString();
        } else {
            return String.format("Unknown message type: %s", message.toString());
        }
    }

    /**
     * 分片实体消息
     * 可以对消息进行 hook,对消息做前置处理
     *
     * @param message 传递消息
     * @return Object
     */
    @Override
    public Object entityMessage(Object message) {
        return message;
    }


    /**
     * 计算消息应该调度分配到哪个分片
     *
     * @param message 消息内容
     * @return String
     */
    @Override
    public String shardId(Object message) {
        if (message instanceof ShardMessage<?> msg) {
            // 取模算法
            long msgId = Long.parseLong(msg.getId().toString());
            long shardId = Math.abs(msgId % numberOfShards);
            return Long.toString(shardId);
        } else {
            return String.format("Unknown message type: %s", message.toString());
        }
    }
}

以上就是手动实现集群分片算法过程, 用来获取消息id然后解构出消息取模投递到指定的消息实体之中, 但现在还不能改造 gateway.

接下来就是另外的概念: 启动和代理启动, ClusterSharding.get(this).startClusterSharding.get(this).startProxy 差别.

// 启动模式主要的2个扩展方法
start(final String typeName, final Props entityProps, final ClusterShardingSettings settings, final ShardRegion.MessageExtractor messageExtractor);

start(final String typeName, final Props entityProps, final ShardRegion.MessageExtractor messageExtractor);

// 代理启动主要的3个扩展方法
startProxy(final String typeName, final Optional<String> role, final ShardRegion.MessageExtractor messageExtractor);

startProxy(final String typeName, final Option<String> role, final PartialFunction<Object, Tuple2<String, Object>> extractEntityId, final Function1<Object, String> extractShardId);

startProxy(final String typeName, final Option<String> role, final Option<String> dataCenter, final PartialFunction<Object, Tuple2<String, Object>> extractEntityId, final Function1<Object, String> extractShardId);

startProxy(final String typeName, final Optional<String> role, final Optional<String> dataCenter, final ShardRegion.MessageExtractor messageExtractor);
  • typeName: 分片类型名称, 对于 代理(proxy) 来说这个是角色标识字符串, 对于 启动(start) 代表定义分片实体类型名称
  • Optional<String> role: 分片所属角色, 对于 代理(proxy) 来说就是请求目标角色, 对于 启动(start) 代表定义分片协调者角色
  • MessageExtractor: 分片数据逻辑, 用来把消息分配到指定分片数据, 这是需要集成 ShardRegion.MessageExtractor 去实现内部分片路由
  • PartialFunction-extractEntityId|Function1-extractShardId: MessageExtractor 的函数化实现
  • entityProps: 跟谁分片一起启动的实体属性, 也就是启动之后需要一起启动创建的 actor

集群模式下, role 参数必须与集群配置中的角色一致, 否则协调器可能无法正常启动: Trying to register to coordinator

对于启动者来说, typerole 代表创建和生成分片节点所属类型和角色; 对于代理来说,typerole 代表连接的分片对象

是否携带 entityProps 参数就是启动和代理启动最大区别:

  • start: 当分片系统启动的时候, 会在当前 actor 直接注册并创建 实体(entity)
  • startProxy: 分片系统启动时候, 只会挂起转发服务而不会在节点下创建实体, 也就是只做消息转发而不做其他多余操作

为什么要区分这两种启动模式? 这是为了把功能和业务隔离, 利用 startProxy 做代理中转到内部另外启动的实体对象, 可以避免如果实体内部业务逻辑出现严重错误时不会影响到内部启动 actor, 从而给实体对象重启服务的机会; 另外还有集群节点后续规模增大是采用多个服务器部署功能, 在本地挂起代理接口能够方便从本地直接转发到远程集群上面.

注意: 如果分片出来的实体业务很复杂的情况, 不要采用 start 而是需要构建 startProxy 转发到自定义实体对象.

而外部客户端则可以在链接集群之后采用, 直接初始化获取远程地址来推送:

import akka.cluster.sharding.ClusterSharding;

// 代理挂起而不会做多余操作, 客户端不必关心 entity 启动而只需要转发即可
ActorRef address = ClusterSharding.get(system).startProxy(
   "gateway", // 连接的分片
    Optional.of("gateway"), // 连接的角色
    new MessageExtractor(3)
);

这里最好停下梳理下分片消息传递流程:

  1. 假设玩家传入消息 (请求)req
  2. 传入消息 req网关(gateway) 分片程序
  3. gateway 拿到消息 id 取模计算获取到是要推送到 分片(shard-1)
  4. gateway 目前并不清楚 shard-1 位置, 需要去 调度器(ShardCoordinator) 查询所在位置
  5. ShardCoordinator 返回 gateway:shard-1实体(entity-2) 地址
  6. gateway:shard-1 发现内部没有 entity-2 就直接创建子实体对象, 并将 req 消息转发给 entity-2
  7. req 消息最后发送给 gateway:shard-1:entity-2 分片对象并处理

ShardCoordinator 也是关键概念, 它作为一个集群单例运行用于调配决策选择分片消息最终发送到分片地址

这里需要用代码重构下网关服务才能看到最后的服务流程形态:

package com.meteorcat.fusion;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.cluster.sharding.ShardCoordinator;
import akka.cluster.sharding.ShardRegion;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;

import java.util.Optional;

/**
 * 游戏网关
 */
public class GameGatewayApplication {


    /**
     * 入口方法
     *
     * @param args 参数
     */
    public static void main(String[] args) {
        // 启动Actor系统 - fusion
        final ActorSystem system = ActorSystem.create("fusion");
        final LoggingAdapter log = system.log();
        log.info("Creating actor system: {}", system.name());


        // 创建集群监听
        final ActorRef listener = system.actorOf(GameGatewayShardMonitor.props(
                "gateway",
                "gateway",
                new ShardCoordinator.LeastShardAllocationStrategy(1, 3)
        ));
    }


    /**
     * 网关消息对象
     */
    public static class GameGatewayMessage {

        /**
         * 默认消息结构接口, 用于外部扩展
         */
        public interface Message {
        }

        /**
         * 分片消息结构接口, 用于实现消息结构来传递
         *
         * @param <T>
         */
        public interface ShardMessage<T> extends Message {
            T getId();
        }


        /**
         * 分片数据状态切换的消息架构
         */
        public static class ShardHandoffMessage implements Message {

        }

    }


    /**
     * 集群分片算法实现
     */
    public static class GameGatewayMessageExtractor implements ShardRegion.MessageExtractor {


        /**
         * 取模算法的最大分片数量
         */
        private final int numberOfShards;

        /**
         * 构造方法
         *
         * @param numberOfShards 最大分片数量
         */
        public GameGatewayMessageExtractor(int numberOfShards) {
            this.numberOfShards = numberOfShards;
        }


        /**
         * 分片实体标识符
         *
         * @param message 数据消息
         * @return String
         */
        @Override
        public String entityId(Object message) {
            if (message instanceof GameGatewayMessage.ShardMessage<?> msg) {
                // 要求传递的消息必须继承 ShardMessage<T>
                return msg.getId().toString();
            } else {
                return String.format("Unknown message type: %s", message.toString());
            }
        }

        /**
         * 分片实体消息
         * 可以对消息进行 hook 处理
         *
         * @param message 传递消息
         * @return Object
         */
        @Override
        public Object entityMessage(Object message) {
            return message;
        }


        /**
         * 计算消息应该调度分配到哪个分片
         *
         * @param message 消息内容
         * @return String
         */
        @Override
        public String shardId(Object message) {
            if (message instanceof GameGatewayMessage.ShardMessage<?> msg) {
                // 取模算法
                long msgId = Long.parseLong(msg.getId().toString());
                long shardId = Math.abs(msgId % numberOfShards);
                return Long.toString(shardId);
            } else {
                return String.format("Unknown message type: %s", message.toString());
            }
        }
    }


    /**
     * 集群分片下的实体对象
     */
    public static class GameGatewayEntity extends AbstractActor implements Creator<GameGatewayEntity> {

        /**
         * 日志句柄
         */
        final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

        /**
         * 父级Actor
         */
        final AbstractActor root;

        /**
         * 构造方法
         *
         * @param root 父级Actor
         */
        public GameGatewayEntity(AbstractActor root) {
            this.root = root;
        }

        /**
         * 静态获取 Props 对象
         *
         * @return Props
         */
        public static Props props(AbstractActor root) {
            return Props.create(GameGatewayEntity.class, (Creator<GameGatewayEntity>) () -> new GameGatewayEntity(root));
        }

        /**
         * 回调处理方法
         *
         * @return Receive
         */
        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(String.class, message -> log.info("websocket message: {}", message))
                    .matchAny(this::unhandled) // 其他事件不处理
                    .build();
        }

        @Override
        public GameGatewayEntity create() {
            return null;
        }
    }

    /**
     * 集群分片网关
     */
    public static class GameGatewayShardMonitor extends AbstractActor {

        /**
         * 日志句柄
         */
        final LoggingAdapter log = Logging.getLogger(getContext().system(), this);


        /**
         * 集群句柄
         */
        final ClusterSharding cluster = ClusterSharding.get(getContext().getSystem());


        /**
         * 集群类型名称
         */
        final String typeName;


        /**
         * 集群角色
         */
        final String role;


        /**
         * 分片调度器
         */
        final ShardCoordinator.ShardAllocationStrategy strategy;


        /**
         * 分片集群初始化地址
         */
        Optional<ActorRef> sharding = Optional.empty();


        /**
         * 构造方法
         *
         * @param typeName 类型名称
         * @param role     角色
         * @param strategy 分片调度器
         */
        public GameGatewayShardMonitor(String typeName, String role, ShardCoordinator.ShardAllocationStrategy strategy) {
            this.typeName = typeName;
            this.role = role;
            this.strategy = strategy;
        }


        /**
         * 静态获取 Props 对象
         *
         * @return Props
         */
        public static Props props(String typeName, String role, ShardCoordinator.ShardAllocationStrategy strategy) {
            return Props.create(GameGatewayShardMonitor.class, (Creator<GameGatewayShardMonitor>) () -> new GameGatewayShardMonitor(typeName, role, strategy));
        }

        /**
         * 启动回调
         */
        @Override
        public void preStart() {
            // 设置分片配置
            ClusterShardingSettings settings = ClusterShardingSettings
                    .create(getContext().getSystem())
                    .withRole(role);


            // 启动分片
            ActorRef ref = cluster.start(
                    typeName, // 分片类型
                    GameGatewayEntity.props(this), // 启动时候挂起的实体
                    settings, // 分片启动配置
                    new GameGatewayMessageExtractor(3), // 分片算法
                    strategy, // 实体启动策略
                    new GameGatewayMessage.ShardHandoffMessage() // 异常切换消息
            );
            this.sharding = Optional.of(ref);
        }

        /**
         * 退出回调
         */
        public void postStop() {

        }

        /**
         * 回调请求
         *
         * @return Receive
         */
        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .matchAny(this::unhandled) // 其他事件不处理
                    .build();
        }
    }

}

这里的流程就是创建 GameGatewayShardMonitor 作为网关集群分片功能, 里面涵盖了消息结构|算法调度|动态Actor构建方法; 一般来说分片集群适合在计算极为频繁的服务, 比如游戏对战和数据同步等情况而千万不要生成太多分片.

网关的配置文件如下:

akka {
  # 设置日志等级
  loglevel = info
  actor {
    # 数据序列化转发, 这里需要查看官方支持的序列化转发方法
    # 不过一般采用明文原生转发即可, 一般直接二进制流之后让各自 Actor 自身去解析
    allow-java-serialization = "off"

    # 设置集群
    provider = "cluster"
  }
  remote {
    artery {
      # 按照正式|测试服环境分配的内网 Hostname
      canonical.hostname = "127.0.0.1"
      # 网关服务, 这里设定绑定服务端口
      canonical.port = 12551
    }
  }
  cluster {

    # 目前服务所属的角色
    roles = [
        "gateway"
    ]

    # 所有种子节点信息
    seed-nodes = [
      "akka://[email protected]:12551",

      # websocket 服务节点
      "akka://[email protected]:12558",
    ]

    # 关闭掉自动 down 功能
    auto-down-unreachable-after = "off"

    # 监控同一JVM中注册和管理多个Akka集群节点的JMXMBean实例, 方便调试节点
    jmx.multi-mbeans-in-same-jvm = "on"
  }
}

集群分片过多会带来性能降低且不好定位问题, 建议一般分片服务需要结合远程服务器和服务器硬件来测试得出合理分配数值.

网关分片服务已经生成, 之后就是需要客户端测试挂起本地代理来转发给到集群让其实体拦截消息来处理, 这里简单编写个 websocket 客户端处理消息转发, 首先是引入第三方库:

<!-- 第三方依赖 -->
<dependencies>
    <!-- Actor Cluster -->
    <dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-cluster-sharding-typed_${scala.binary.version}</artifactId>
    </dependency>

    <!-- SpringWebSocket -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
</dependencies>

启动类功能:

package com.meteorcat.fusion;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.socket.config.annotation.EnableWebSocket;

/**
 * 启用SpringWebsocket
 */
@EnableWebSocket
@SpringBootApplication
public class GameWebSocketApplication {
    public static void main(String[] args) {
        SpringApplication.run(GameWebSocketApplication.class, args);
    }
}

之后创建转发服务:

package com.meteorcat.fusion;

import akka.actor.*;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ShardRegion;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
import lombok.Getter;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.util.Optional;


/**
 * 集群转发器, 启动的时候托管给 SpringBoot 单例化
 */
@Getter
@Order(-1)
@Component
public class GameGatewayForwarder implements ApplicationRunner {


    /**
     * Actor句柄
     */
    private ActorSystem system;

    /**
     * 日志句柄
     */
    private LoggingAdapter log;


    /**
     * 自身管理器对象
     */
    private ActorRef that;


    /**
     * 启动入口
     *
     * @param args 参数
     */
    @Override
    public void run(ApplicationArguments args) {
        this.system = ActorSystem.create("fusion"); // 访问地址: akka://[email protected]:12558
        this.log = this.system.log(); // 写入日志管理器
        // 连接集群分片 type = gateway, role = gateway 的服务
        this.that = this.system.actorOf(GameWebSocketMonitor.props("gateway", "gateway"));
        this.log.info("GameGatewayApplication context has been initialized");
    }


    /**
     * 推送消息
     *
     * @param message 消息
     * @param sender  请求来源
     */
    public void tell(final Object message, final ActorRef sender) {
        this.that.tell(message, sender);
    }

    /**
     * 推送消息
     *
     * @param message 消息
     */
    public void tell(final Object message) {
        this.tell(message, that);
    }


    /**
     * 网关消息对象
     */
    public static class GameGatewayMessage {

        /**
         * 默认消息结构接口, 用于外部扩展
         */
        public interface Message {
        }

        /**
         * 分片消息结构接口, 用于实现消息结构来传递
         *
         * @param <T>
         */
        public interface ShardMessage<T> extends Message {
            T getId();
        }


    }


    /**
     * 集群分片算法实现
     */
    public static class GameGatewayMessageExtractor implements ShardRegion.MessageExtractor {


        /**
         * 取模算法的最大分片数量
         */
        private final int numberOfShards;

        /**
         * 构造方法
         *
         * @param numberOfShards 最大分片数量
         */
        public GameGatewayMessageExtractor(int numberOfShards) {
            this.numberOfShards = numberOfShards;
        }


        /**
         * 分片实体标识符
         *
         * @param message 数据消息
         * @return String
         */
        @Override
        public String entityId(Object message) {
            if (message instanceof GameGatewayMessage.ShardMessage<?> msg) {
                // 要求传递的消息必须继承 ShardMessage<T>
                return msg.getId().toString();
            } else {
                // todo:这里是错误建议打印看看再思考怎么修改
                return String.format("Unknown message type: %s", message.toString());
            }
        }

        /**
         * 分片实体消息
         * 可以对消息进行 hook 处理
         *
         * @param message 传递消息
         * @return Object
         */
        @Override
        public Object entityMessage(Object message) {
            return message;
        }


        /**
         * 计算消息应该调度分配到哪个分片
         *
         * @param message 消息内容
         * @return String
         */
        @Override
        public String shardId(Object message) {
            if (message instanceof GameGatewayMessage.ShardMessage<?> msg) {
                // 取模算法
                long msgId = Long.parseLong(msg.getId().toString());
                long shardId = Math.abs(msgId % numberOfShards);
                return Long.toString(shardId);
            } else {
                // todo:这里是错误建议打印看看再思考怎么修改
                return String.format("Unknown message type: %s", message.toString());
            }
        }
    }


    /**
     * 网关管理器
     */
    @Getter
    public static class GameWebSocketMonitor extends AbstractActor {

        /**
         * 日志句柄
         */
        final LoggingAdapter log = Logging.getLogger(getContext().system(), this);


        /**
         * 集群句柄
         */
        final ClusterSharding cluster = ClusterSharding.get(getContext().getSystem());

        /**
         * 集群类型名称
         */
        final String typeName;


        /**
         * 集群角色
         */
        final String role;

        /**
         * 分片集群初始化地址
         */
        Optional<ActorRef> sharding = Optional.empty();

        /**
         * 构造方法
         *
         * @param typeName 类型
         * @param role     角色
         */
        public GameWebSocketMonitor(String typeName, String role) {
            this.typeName = typeName;
            this.role = role;
        }

        /**
         * 静态获取 Props 对象
         *
         * @return Props
         */
        public static Props props(String typeName, String role) {
            return Props.create(GameWebSocketMonitor.class, (Creator<GameWebSocketMonitor>) () -> new GameWebSocketMonitor(typeName, role));
        }


        /**
         * 加入集群消息
         */
        @Override
        public void preStart() {
            // 启动分片代理
            ActorRef ref = cluster.startProxy(
                    typeName,
                    Optional.of(role),
                    new GameGatewayMessageExtractor(3)
            );
            sharding = Optional.of(ref);
        }

        /**
         * 退出集群
         */
        public void postStop() {

        }

        /**
         * 回调请求
         *
         * @return Receive
         */
        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(String.class, message -> {
                        // 拦截 String 消息转发给分片数据
                        sharding.ifPresent(actorRef -> actorRef.tell(message, getSender()));
                    })
                    .matchAny(this::unhandled) // 其他事件不处理
                    .build();
        }
    }
}

创建 WebSocket 实例让 SpringBoot 负责管理:

package com.meteorcat.fusion;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.util.HashMap;
import java.util.Map;

/**
 * 设置文本流 WebSocket
 * 之所以采用 TEXT 主要方便 Postman 之类客户端调试
 */
@Slf4j
@Component
public class GameWebSocketHandler extends TextWebSocketHandler {

    /**
     * 网关句柄
     */
    final GameGatewayForwarder gateway;


    /**
     * 会话列表
     */
    final Map<String, WebSocketSession> sessions = new HashMap<>();

    /**
     * 追加网关初始化
     *
     * @param gateway 网关对象
     */
    @Autowired
    public GameWebSocketHandler(GameGatewayForwarder gateway) {
        this.gateway = gateway;
    }

    /**
     * Established - 会话连接回调
     *
     * @param session 会话句柄
     * @throws Exception 异常错误
     */
    @Override
    public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
        sessions.put(session.getId(), session);
    }


    /**
     * Closed - 会话连接关闭回调
     *
     * @param session 会话句柄
     * @param status  关闭状态
     * @throws Exception 异常错误
     */
    @Override
    public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus status) throws Exception {
        sessions.remove(session.getId());
    }

    /**
     * Error - 会话错误异常回调
     *
     * @param session   会话句柄
     * @param exception 回调错误
     * @throws Exception 异常错误
     */
    @Override
    public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {

    }

    /**
     * Text - 接收到文本数据回调
     * 如果二进制数据则是 handleBinaryMessage 回调
     *
     * @param session Websocket
     * @param message data
     * @throws Exception Error
     */
    @Override
    protected void handleTextMessage(@NonNull WebSocketSession session, @NonNull TextMessage message) throws Exception {
        String payload = message.getPayload();
        if (payload.isBlank()) return;
        log.info("received websocket message: {}", payload);

        // 转发给 actor
        gateway.tell(payload);
    }
}

编写 WebSocket 配置:

package com.meteorcat.fusion;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.NonNull;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;

@Configuration
public class GameWebSocketConfig implements WebSocketConfigurer {

    /**
     * 访问路径
     */
    @Value("${websocket.server.path:/}")
    private String serverPath;

    /**
     * 传输数据缓存大小
     */
    @Value("${websocket.buffer.max.size:8192}")
    private Integer bufferMaxSize;


    /**
     * 待机主动中断时间
     */
    @Value("${websocket.idle.timeout:600000}")
    private Long idleTimeout;


    /**
     * 允许跨域地址
     */
    @Value("${websocket.allowed.origins:*}")
    private String allowOrigins;

    /**
     * 全局运行句柄
     */
    private final AbstractWebSocketHandler handler;


    @Autowired
    public GameWebSocketConfig(AbstractWebSocketHandler handler) {
        this.handler = handler;
    }


    @Override
    public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
        registry.addHandler(handler, serverPath).setAllowedOrigins(allowOrigins);
    }
}

WebSocket 的集群配置文件:

akka {
  # 设置日志等级
  loglevel = info
  actor {
    # 数据序列化转发, 这里需要查看官方支持的序列化转发方法
    # 不过一般采用明文原生转发即可, 一般直接二进制流之后让各自 Actor 自身去解析
    allow-java-serialization = "off"

    # 设置集群
    provider = "cluster"
  }
  remote {
    artery {
      # 按照正式|测试服环境分配的内网 Hostname
      canonical.hostname = "127.0.0.1"
      # 网关服务, 这里设定绑定服务端口
      canonical.port = 12558
    }
  }
  cluster {
    # 所有角色信息
    roles = [
        "websocket"
    ]

    # 所有种子节点信息
    seed-nodes = [
      "akka://[email protected]:12551",
      "akka://[email protected]:12558"
    ]

    # 关闭掉自动 down 功能
    auto-down-unreachable-after = "off"

    # 监控同一JVM中注册和管理多个Akka集群节点的JMXMBean实例, 方便调试节点
    jmx.multi-mbeans-in-same-jvm = "on"
  }
}

最后启动两个服务, 利用 postman 之类方便客户端调试发送请求即可.

# websocket 服务打印
c.meteorcat.fusion.GameWebSocketHandler  : received websocket message: test3333333333

# gateway 服务打印
com.meteorcat.fusion.GameGatewayApplication$GameGatewayEntity -- websocket message: test3333333333

这里仅仅作为示例来演示, 如果要构建高性能的游戏服务需要去重新优化和构建服务才能到达可用的地步, 同时还要让服务支持热更新支持的技术才能满足些简单的H5游戏服务端需求.

注意: 这里仅仅作为实例, 直接 String 默认是支持被默认序列化传递, 但是如果自定义结构要转发就需要利用自定义或者第三方序列化处理器

定时器

如果作为高性能网络服务, 除了合理的调度模型之外, 最关键的就是 定时器(timer) 功能来维持系统稳定运行:

  • 心跳保持: 长连接从应用方面维护网络会话持续
  • 数据同步: 需要定时将内存数据同步给客户端
  • 延迟推送: 在指定时间后推送给客户端消息

以游戏服务为例, 按照流程来说只要玩家完成授权就要周期性(15~30s)定时给客户端推送一个包确认从而让访问连接保持活动.

也有通过客户端不断推送通知服务端, 让服务端获知目前玩家依旧在线

Akka 单中有两套方案分别是对应不同的应用场景:

  • scheduler: 简单调度器, 随着系统 akka.actor.Scheduler 附带, 用于内部自己的定时调度
  • actor timers: 单独调度定时调度服务, 可以说单独启动的另外 Actor 对象作为定时器服务来管理

如果消息是作为简单的延迟单次通知就可以直接采用 ActorSystem 内置的 scheduler 模块:

# 调度定时任务, 它将在 50 毫秒之后调用 testActor 发送 "foot" 消息  
system.scheduler()
      .scheduleOnce(Duration.ofMillis(50), testActor, "foo", system.dispatcher(), null);

# 调度一个Runnable, 它将在 50 毫秒后执行传入的 Runnable 
system.scheduler()
      .scheduleOnce(
        Duration.ofMillis(50),
        new Runnable() {
          @Override
          public void run() {
                testActor.tell(System.currentTimeMillis(), ActorRef.noSender());
          }
        },
    system.dispatcher()
);

千万注意: 不要再定时任务内调度本身自己作为 actor 对象功能(this|system|actor), 而是直接给自己发个信号唤起自己 Receive

这种方法十分简单易用, 但是如果在做大规模的集群上面却不怎么好用; 首先 scheduler 是挂靠单独某个 system, 这也代表如果 systemdown 时候附着在该系统下面所有 scheduler 也会一起被取消.

如果是简单 actor 你会感觉没问题, 本身简单 actor 系统奔溃就应该连带 scheduler, 但涉及到集群就出现问题了:

  1. 玩家(player) 是集群分片, 玩家登录之后存档挂载其中
  2. 场景(world) 也是集群分片, 带有家园系统用于定义对家园作物做产值生产(比如体力值+1,金币+1 这种收益操作)
  3. 场景 利用 scheduler 挂载定时服务来计算, 但是因为某些不可控原因导致场景奔溃重启
  4. 场景 重启导致定时器全部失效, 这时候没办法再次重新针对用户执行定时任务

当然你也可以编写场景重启还原逻辑, 让重启之后定时任务重新还原; 不过为了避免业务干扰到定时任务的情况, 最好把 定时器(timer) 提取到单独 actor 来负责利用 Receive 触发业务监听.

基于需要实现定时器情况, 官方其实也提供 Actor Timer 抽象功能:

import akka.actor.AbstractActorWithTimers;

import java.time.Duration;

/**
 * 全局定时调度 Actor
 */
public static class ScheduleTimer extends AbstractActorWithTimers {

    /**
     * 定时器标识
     */
    private static Object TICK_KEY = "TickKey";

    /**
     * 首次唤起
     */
    private static final class FirstTick {
    }


    /**
     * 再次调起
     */
    private static final class SecondTick {
    }

    /**
     * 构造方法
     */
    public ScheduleTimer() {
        // 启动的时候构建定时器, 在 500 毫秒之后唤起 TICK_KEY 为名的 FirstTick 类型事件
        getTimers().startSingleTimer(TICK_KEY, new FirstTick(), Duration.ofMillis(500));
    }


    /**
     * Receive 回调
     */
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(
                        FirstTick.class,
                        message -> {
                            // do something useful here
                            getTimers().startTimerWithFixedDelay(TICK_KEY, new Tick(), Duration.ofSeconds(1));
                        }
                )
                .match(
                        Tick.class,
                        message -> {
                            // do something useful here
                        }
                )
                .build();
    }

}

之后和常规 actor 挂载到系统即可, 使用过程和常规 actor 一般推送信号然后等待回调 hook 即可.

这里还引申出规则: actor 触发本身就有的功能方法千万不要用 this 直接执行, 而是需要 actor 邮箱机制继续投递回来执行

集群单例

现在回过头来说明如何在集群当中构建唯一单例节点, 集群单例可以在普通集群和分片集群当中适用, 虽然官方说有的简单的节点比较少的集群功能可以采用 集群单例 处理而不用复杂的 集群分片, 但是 集群单例集群分片 并不是互斥关系而是可以直接交替一起使用.

对于在线游戏服务端作为示例来说:

  • 对外网关服务是集群当中唯一对象就可以使用 集群单例(不过为了容错也可能网关采用集群分片来分发对内消息)
  • 玩家服务交互比较频繁需要 集群分片 将服务分配给服务器运算
  • 场景服务也是需要同步大量地图状态|产出更新, 所以也需要 集群分片 合理分配同步任务
  • GM(Game Management) 需要让外部可以直接做 运维(关服|开服)|运营(玩家下线|推送邮件) 等操作, 请求量不多可以采用 集群单例

当然 集群单例 也有潜在问题需要注意:

  • 全集群唯一节点, 也代表如果请求量级达到一定程度就会带来性能瓶颈问题
  • 集群当中的单例节点出现意外情况而重启迁移并不是实时, 而是需要几秒时间才能恢复完成

所以能够看到 集群单例 最好应用于那些请求量级中等偏下水平, 且业务简单不会容易异常出错的业务节点.

单例节点需要采用第三方库来引入扩展:

<!-- Maven -->
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-cluster-tools_${scala.binary.version}</artifactId>
</dependency>

官方提供 SupervisorActor 具体实践, 这里:

import akka.actor.AbstractActor;
import akka.actor.AbstractActor.Receive;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;

public class SupervisorActor extends AbstractActor {
    final Props childProps;
    final SupervisorStrategy supervisorStrategy;
    final ActorRef child;

    SupervisorActor(Props childProps, SupervisorStrategy supervisorStrategy) {
        this.childProps = childProps;
        this.supervisorStrategy = supervisorStrategy;
        this.child = getContext().actorOf(childProps, "supervised-child");
    }

    @Override
    public SupervisorStrategy supervisorStrategy() {
        return supervisorStrategy;
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().matchAny(msg -> child.forward(msg, getContext())).build();
    }
}

之后按照 ClusterSingletonManager 来初始化单例节点:

import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.cluster.singleton.ClusterSingletonManager;
import akka.cluster.singleton.ClusterSingletonManagerSettings;


public void init() {
    system.actorOf(
            ClusterSingletonManager.props(
                    Props.create(
                            SupervisorActor.class, () -> new SupervisorActor(props, supervisorStrategy)),
                    PoisonPill.getInstance(),
                    ClusterSingletonManagerSettings.create(getContext().system())),
            name = name);
}

这里就是理想状态的 管理器(Monitor) 定义, 通过启动 SupervisorActor 单例节点当作管理器, 如果构建的节点用户量不多情况用该模型更加符合现实当中的可理解模型, 问题在于 用户量不多不产生瓶颈.

对于集群单例来说只是原来集群 Props.create 追加一层 ClusterSingletonManager.props 构建而已, 实际应用起来和常规集群没什么差别.

update: 在多次考虑之后还是感觉网关不该设置成 集群单例, 而是应该采用 集群分片 自定义消息调度到不同集群分片

建议如果定义集群单例的话, 学习上面的 SupervisorActor 来构建 监督者 - 工作者 消息转发传递模型

Java 热更

其实 Java 本身已经提供了热更新技术用于正式上线达到不停机业务更新的功能, 只需要设置代理 java agent 和编写代理类即可.

# java 的 启动选项之中带有指定代理参数

# 类代理库设置
-agentlib:<库名>[=<选项>]
            加载本机代理库 <库名>, 例如 -agentlib:jdwp
            另请参阅 -agentlib:jdwp=help
           
# 类代理库路径       
-agentpath:<路径名>[=<选项>]
            按完整路径名加载本机代理库

# 代理 jar 包
-javaagent:<jar 路径>[=<选项>]
            加载 Java 编程语言代理, 请参阅 java.lang.instrument
            
            
# 假设游戏网关已经打包出 fusion-gateway.jar 包
# 这时候需要编写代理功能类, 打包成 gateway-agent.jar
# 之后利用代理类启动我们自己编写的 jar 包
java -javaagent:/game/gateway-agent.jar -jar /game/fusion-gateway.jar

# 上面就是标准的代理挂起 Jar 过程, 如果这时候假设需要 ReloadFile.java 热更
# 首先需要把 ReloadFile.java 编译成 class 文件:
javac ReloadFile.java

# 然后指定 agent 程序来更新当前 jvm 的同名类
java -javaagent:/game/gateway-agent.jar ReloadFile.class

代理 javaagent 需要自己去编写打包生成代理包, 这个包只要包含两个方法二选一实现:

public static void premain(String agentArgs, Instrumentation inst);

public static void premain(String agentArgs);

方法优先加载 String agentArgs, Instrumentation inst 签名方法, 之后再去加载 String agentArgs 签名方法

需要注意该代理类需要在 resources 目录下新建 MANIFREST.MF 声明 Jar 包信息, 这里只需要填写代理包相关信息即可:

Manifest-Version: 1.0
Can-Redefine-Classes: true
Can-Retransform-Classes: true
Premain-Class: FusionGatewayAgent

最后一行必须留空, 具体的包配置作用如下( 其他配置网上检索就行了, 关键配置是下面提到的 ):

  • Can-Redefine-Classes: (true)表示能重定义此代理所需的类, 默认值为 false
  • Can-Retransform-Classes: (true)表示能重新转换此代理所需的类, 默认值为 false
  • Premain-Class: 代理入口的类名, 该类必须实现 premain 方法

也可以用 maven 插件配置, 这样就不用手动创建 MANIFREST.MF 文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <artifactId>fusion-agent</artifactId>

    <!-- 继承框架 -->
    <parent>
        <groupId>com.meteorcat.fusion</groupId>
        <artifactId>fusion-framework</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>


    <!-- 打包配置 -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.4.2</version>
                <configuration>
                    <archive>
                        <!--自动添加META-INF/MANIFEST.MF -->
                        <manifest>
                            <addClasspath>true</addClasspath>
                        </manifest>

                        <!-- 代理信息 -->
                        <manifestEntries>
                            <Premain-Class>com.meteorcat.fusion.FusionGatewayAgent</Premain-Class>
                            <Agent-Class>com.meteorcat.fusion.FusionGatewayAgent</Agent-Class>
                            <Can-Redefine-Classes>true</Can-Redefine-Classes>
                            <Can-Retransform-Classes>true</Can-Retransform-Classes>
                        </manifestEntries>
                    </archive>
                </configuration>
            </plugin>
        </plugins>

    </build>
</project>

最后编写代理功能类:

package com.meteorcat.fusion;

import java.lang.instrument.ClassFileTransformer;
import java.lang.instrument.IllegalClassFormatException;
import java.lang.instrument.Instrumentation;
import java.security.ProtectionDomain;

/**
 * 网关代理类
 */
public class FusionGatewayAgent {

    /**
     * 启动入口
     *
     * @param agentArgs 代理参数
     * @param inst      代理句柄
     */
    public static void premain(String agentArgs, Instrumentation inst) {
        inst.addTransformer(new GatewayClassFileTransformer(), true);
    }


    /**
     * 网关类更新器
     */
    public static class GatewayClassFileTransformer implements ClassFileTransformer {


        /**
         * 更新方法
         */
        @Override
        public byte[] transform(ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws IllegalClassFormatException {
            // className 就是加载类名, 含有目录嵌套, 格式如下:
            // akka/actor/ActorRef
            System.out.println("Gateway hot reload file = " + className);

            // todo: 这里就是具体热更新逻辑
            // todo: 可以考虑本地映射目录, 首次加载判断和目录之中对应 xxx.class 是否存在
            // todo: 不存在该目录文件直接跳过, 存在并且保存 md5 map 是否匹配
            // todo: 不匹配直接更新 jvm 类数据, 匹配则跳过
            return classfileBuffer;
        }
    }
}

执行打包命令之后会生成最终我们需要的代理 jar 包: fusion-agent-1.0-SNAPSHOT.jar

该名称成 fusion-agent.jar 就可以直接用了, 之前在集群分片网关实体响应修改成这样:

// GameGatewayEntity 类对象

/**
 * 测试热更新
 *
 * @param message 消息
 */
public void stringReceive(String message) {
    log.info("websocket message: {}", message);
}

/**
 * 回调处理方法
 *
 * @return Receive
 */
@Override
public Receive createReceive() {
    return receiveBuilder()
            .match(String.class, this::stringReceive)
            .matchAny(this::unhandled) // 其他事件不处理
            .build();
}

打包获取 jar 包之后准备执行代理:

# 启动不停机的代理集群分片网关服务
java -javaagent:/game/fusion-agent.jar -jar /game/fusion-gateway.jar