Quarkus 注入集成
这也是比较好用的技巧, 之前搭建网关常常能够看到以下类型的对象注入:
/**
* WebSocket 网关
*/
@ApplicationScoped
@WebSocket(path = "/bootstrap")
public class WebSocketBootstrap {
/**
* 捕获全局的 ActorSystem 注入 Bean 对象
*/
@Inject
ActorSystem system;
}
这种对象注入方式很大程度节约对象传递的功能, 而其实就是利用反射原理给对象实例化传递对应 @Inject 注解的对象实例.
反射也带来有少许性能损耗, 如果对性能要求比较偏激的情况, 后续可以仅当做了解
这时候就想这种自动注入机制能不能自动加载到 Actor 当中, 让 Props.create 生成出来的对应也支持这种特性.
Quarkus 内部已经支持这种方式处理, 这里就是实现方法:
/**
* 这里需要全局注册实例, 该工厂类能够全局捕获
*/
@ApplicationScoped
public class QuarkusActorFactory {
/**
* Quarkus 的全局 Bean 对象实例句柄
*/
@Inject
Instance<Object> cdiInstance;
/**
* 通过反射将 cdiInstance 内部的全局实例化对象注入进 Actor 之中
*/
void injectFields(Actor actor) throws IllegalAccessException {
// 从 Actor 实例的实际类型开始(避免接口/父类类型导致的字段扫描不全)
Class<?> currentClass = actor.getClass();
// 递归遍历类继承链:直到 Object 类(确保所有层级的字段都被扫描)
while (currentClass != Object.class) {
// 获取当前类的所有声明字段(包括 private,但不包括父类字段)
Field[] fields = currentClass.getDeclaredFields();
for (Field field : fields) {
// 仅处理带 @Inject 注解的字段(过滤无注入需求的字段)
if (field.isAnnotationPresent(Inject.class)) {
// 突破访问权限限制:确保 private/protected 字段也能被赋值
field.setAccessible(true);
// 从 CDI 容器获取与字段类型匹配的依赖实例(若有多个实现,需通过 Qualifier 筛选)
Object dependency = cdiInstance.select(field.getType()).get();
// 为 Actor 实例的字段赋值(完成注入)
field.set(actor, dependency);
}
}
// 切换到父类,继续扫描父类字段
currentClass = currentClass.getSuperclass();
}
}
/**
* 创建 Actor 的 Props 对象,并自动注入指定类型的依赖
* @param actorClass Actor 类
* @param dependencyTypes 需要注入的依赖类型
* @param <T> Actor 类型
* @return 配置好的 Props 对象
*/
public <T> Props createProps(Class<T> actorClass, Class<?>... dependencyTypes) {
// 从 CDI 容器获取依赖实例
Object[] dependencies = Arrays.stream(dependencyTypes)
.map(type -> cdiInstance.select(type).get())
.toArray();
// 创建 Supplier 用于实例化 Actor 并注入依赖
return Props.create(actorClass, () -> {
try {
// 1. 获取与依赖类型匹配的构造函数
Constructor<T> constructor = actor.getConstructor(dependencyTypes);
// 2. 调用构造函数创建 Actor 实例
T actorInstance = constructor.newInstance(dependencies);
// 3. 为 Actor 中带 @Inject 注解的字段补充注入(支持父类字段)
injectFields(actorInstance);
return actorInstance;
} catch (Exception e) {
throw new RuntimeException("Failed to create actor instance: " + actorClass.getName(), e);
}
});
}
/**
* 简化版创建方法,自动查找唯一的构造函数并注入所有参数
* @param actorClass Actor 类
* @param <T> Actor 类型
* @return 配置好的 Props 对象
*/
public <T> Props createProps(Class<T> actorClass) {
return Props.create(actor, () -> {
try {
// 1. 反射获取无参构造函数:突破访问权限(支持 private 构造),确保非 public 构造也能被调用
Constructor<T> constructor = actor.getDeclaredConstructor();
constructor.setAccessible(true); // 解除访问权限限制(关键:避免非 public 构造无法调用的问题)
// 2. 调用无参构造创建 Actor 实例
T actorInstance = constructor.newInstance();
// 3. 自动注入所有带 @Inject 注解的字段(递归处理父类字段)
injectFields(actorInstance);
return actorInstance;
} catch (Exception e) {
// 封装异常信息:明确异常场景(创建+注入),便于问题定位
throw new RuntimeException("Failed to create and inject actor [actorClass: " + actor.getName() + "]", e);
}
});
}
}
这里就是很简单的注入对象全局处理器, 这个工厂类就可以直接生成 Props 并自动做系统注入:
/**
* WebSocket 网关
*/
@ApplicationScoped
@WebSocket(path = "/bootstrap")
public class WebSocketBootstrap {
/**
* 捕获全局的 ActorSystem 注入 Bean 对象
*/
@Inject
ActorSystem system;
/**
* 获取获取的 QuarkusActorFactory 工厂对象
*/
@Inject
QuarkusActorFactory actorFactory;
/**
* 生成支持 @Inject 注入的 Actor, 并挂载到 ActorSystem 之中
*/
public ActorRef createActor(String name) {
// 假设 CDIEchoActor 是实现好的 AbstractActor 虚类的 Actor
return system.actorOf(
actorFactory.props(CDIEchoActor.class),
name
);
}
}
这样实现了 AbstractActor 的功能类也能享受到支持 @Inject 便利, 不过因为是反射所以性能方面也有点损失;
但是对于共用库之中直接内部就要声明 @ApplicationScoped 就显得没这么好用,
所以可以设计成通用接口让引入通用库的开发者自己实现:
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import org.apache.pekko.actor.Actor;
import org.apache.pekko.actor.Props;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.Arrays;
/**
* Actor Props 配置器接口
* <p>
* 核心能力:基于 CDI(Contexts and Dependency Injection)容器实现 Pekko Actor 的依赖注入与 Props 对象创建,
* 支持两种实例化模式(显式构造函数依赖注入、无参构造+字段自动注入),并提供字段级递归注入能力,解决 Actor 与 CDI 生态的集成问题。
* <p>
* 适用场景:Pekko Actor 开发中需通过 CDI 管理依赖(如服务类、配置类),且需要统一管控 Actor 实例化流程的场景。
*/
public interface ActorPropsConfigurator {
/**
* 创建 Actor 的 Props 对象(显式指定构造函数依赖)
* <p>
* 核心逻辑:先从 CDI 容器中获取与 {@code dependencyTypes} 匹配的依赖实例,再通过反射调用对应构造函数创建 Actor,
* 最后对 Actor 中带 {@link Inject} 注解的字段进行补充注入,确保依赖完整性。
* <p>
* 适用场景:Actor 构造函数存在明确依赖(如多个服务类),需精确控制依赖类型的场景(避免 CDI 类型匹配歧义)。
* <p>
* 风险提示:1. 若 {@code dependencyTypes} 与 Actor 构造函数参数不匹配,会抛出 NoSuchMethodException;
* 2. 若 CDI 容器中无对应类型的依赖,会抛出 UnsatisfiedResolutionException。
*
* @param instance CDI 实例容器,用于获取依赖的 Bean 实例(由 CDI 容器自动注入)
* @param actor 目标 Actor 的 Class 对象(需继承自 {@link Actor})
* @param dependencyTypes Actor 构造函数所需的依赖类型数组(顺序需与构造函数参数顺序一致)
* @param <T> 目标 Actor 的具体类型(泛型约束确保类型安全)
* @return 用于创建 Actor 的 Pekko Props 对象,包含 Actor 实例化逻辑与依赖注入逻辑
* @throws RuntimeException 封装反射异常(构造函数获取失败、实例化失败)或 CDI 依赖获取异常,便于上层统一捕获
*/
default <T extends Actor> Props props(Instance<Object> instance, Class<T> actor, Class<?>... dependencyTypes) {
// 从 CDI 容器批量获取依赖实例:按传入的类型数组顺序匹配,确保与构造函数参数顺序一致
Object[] dependencies = Arrays.stream(dependencyTypes)
.map(type -> instance.select(type).get())
.toArray();
// 通过 Props.create + Supplier 实现延迟实例化(符合 Pekko Actor 异步创建模型)
return Props.create(actor, () -> {
try {
// 1. 获取与依赖类型匹配的构造函数
Constructor<T> constructor = actor.getConstructor(dependencyTypes);
// 2. 调用构造函数创建 Actor 实例
T actorInstance = constructor.newInstance(dependencies);
// 3. 为 Actor 中带 @Inject 注解的字段补充注入(支持父类字段)
injectFields(instance, actorInstance);
return actorInstance;
} catch (Exception e) {
// 封装异常信息:明确目标 Actor 类名,便于问题定位
throw new RuntimeException("Failed to create actor instance [actorClass: " + actor.getName() + "]", e);
}
});
}
/**
* 创建 Actor 的 Props 对象(无参构造+字段自动注入)
* <p>
* 核心逻辑:通过反射调用 Actor 的无参构造函数(支持非 public 构造)创建实例,
* 再递归扫描 Actor 及其父类中带 {@link Inject} 注解的字段,从 CDI 容器获取依赖并注入。
* <p>
* 适用场景:Actor 无构造函数依赖,或依赖仅通过字段注入(如简单服务依赖)的场景,简化调用流程。
* <p>
* 风险提示:1. 若 Actor 无无参构造函数,会抛出 NoSuchMethodException;
* 2. 若字段依赖在 CDI 容器中不存在,会抛出 UnsatisfiedResolutionException。
*
* @param instance CDI 实例容器,用于获取字段依赖的 Bean 实例(由 CDI 容器自动注入)
* @param actor 目标 Actor 的 Class 对象(需继承自 {@link Actor})
* @param <T> 目标 Actor 的具体类型(泛型约束确保类型安全)
* @return 用于创建 Actor 的 Pekko Props 对象,包含无参构造实例化逻辑与字段自动注入逻辑
* @throws RuntimeException 封装反射异常(无参构造获取失败、字段访问失败)或 CDI 依赖获取异常
*/
default <T extends Actor> Props props(Instance<Object> instance, Class<T> actor) {
return Props.create(actor, () -> {
try {
// 1. 反射获取无参构造函数:突破访问权限(支持 private 构造),确保非 public 构造也能被调用
Constructor<T> constructor = actor.getDeclaredConstructor();
constructor.setAccessible(true); // 解除访问权限限制(关键:避免非 public 构造无法调用的问题)
// 2. 调用无参构造创建 Actor 实例
T actorInstance = constructor.newInstance();
// 3. 自动注入所有带 @Inject 注解的字段(递归处理父类字段)
injectFields(instance, actorInstance);
return actorInstance;
} catch (Exception e) {
// 封装异常信息:明确异常场景(创建+注入),便于问题定位
throw new RuntimeException("Failed to create and inject actor [actorClass: " + actor.getName() + "]", e);
}
});
}
/**
* 递归扫描 Actor 及其父类,注入带 {@link Inject} 注解的字段
* <p>
* 核心逻辑:从 Actor 实际类型开始,向上遍历至 {@link Object} 类(覆盖所有父类),
* 对每个带 {@link Inject} 注解的字段,突破访问权限限制后,从 CDI 容器获取匹配类型的依赖并赋值。
* <p>
* 设计亮点:支持父类字段注入(解决普通反射仅能访问当前类字段的问题),且兼容 private 字段注入。
*
* @param instance CDI 实例容器,用于获取字段依赖的 Bean 实例
* @param actor 待注入字段的 Actor 实例(非 Class 对象,需已实例化)
* @throws IllegalAccessException 反射访问字段失败时抛出(如字段不可见且 setAccessible 失效),
* 需上层捕获并处理(通常封装为 RuntimeException)
*/
default void injectFields(Instance<Object> instance, Actor actor) throws IllegalAccessException {
// 从 Actor 实例的实际类型开始(避免接口/父类类型导致的字段扫描不全)
Class<?> currentClass = actor.getClass();
// 递归遍历类继承链:直到 Object 类(确保所有层级的字段都被扫描)
while (currentClass != Object.class) {
// 获取当前类的所有声明字段(包括 private,但不包括父类字段)
Field[] fields = currentClass.getDeclaredFields();
for (Field field : fields) {
// 仅处理带 @Inject 注解的字段(过滤无注入需求的字段)
if (field.isAnnotationPresent(Inject.class)) {
// 突破访问权限限制:确保 private/protected 字段也能被赋值
field.setAccessible(true);
// 从 CDI 容器获取与字段类型匹配的依赖实例(若有多个实现,需通过 Qualifier 筛选)
Object dependency = instance.select(field.getType()).get();
// 为 Actor 实例的字段赋值(完成注入)
field.set(actor, dependency);
}
}
// 切换到父类,继续扫描父类字段
currentClass = currentClass.getSuperclass();
}
}
}
这样封装成接口对象, 后续如果引入共用库的开发者想使用这个功能自己去做扩展即可, 不需要做太复杂的引入实现:
/**
* ActorPropsConfigurator接口的CDI适配实现
* <p>
* 测试作用:
* 1. 封装Instance<Object>的注入细节,提供简化的props()方法
* 2. 作为CDI托管Bean,验证接口实现类的依赖注入能力
* 3. 桥接CDI容器与Pekko Actor的创建过程
*/
@ApplicationScoped
public static class ActorPropsConfiguratorExt implements ActorPropsConfigurator {
/**
* CDI实例容器,用于获取所有类型的Bean实例
* <p>
* 由Quarkus容器自动注入,提供依赖查找能力
*/
@Inject
Instance<Object> instance;
/**
* 简化版构造函数依赖注入方法
* <p>
* 测试适配:封装Instance参数,便于测试代码直接调用
*/
public <T extends Actor> Props props(Class<T> actor, Class<?>... dependencyTypes) {
return props(instance, actor, dependencyTypes);
}
/**
* 简化版无参构造+字段注入方法
* <p>
* 测试适配:封装Instance参数,便于测试代码直接调用
*/
public <T extends Actor> Props props(Class<T> actor) {
return props(instance, actor);
}
}
// 剩下就是自己去引入
// @Inject ActorPropsConfiguratorExt actorPropsConfigurator;
// 自己去做调用实例化
这种 Bean 注入方式后续方便把全局数据库等对象 Bean 直接注入进来, 还不需要改动实例化参数对象等.
另外还有 ActorSystem 设计个容器管理, 用来管理 ActorSystem 的节点内容, 这里直接编写容器对象, 直接用就行了:
package io.fortress.actor.ext;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import org.apache.pekko.actor.*;
import org.apache.pekko.event.LoggingAdapter;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
/**
* ActorSystemContainer 是一个用于集中管理 Pekko Actor 引用的容器类。
* 提供了 Actor 的创建、查找、删除、消息发送等一站式管理功能,
* 并通过 ConcurrentHashMap 实现了线程安全的并发操作支持。
* 实现 AutoCloseable 接口,支持资源自动释放,适合在需要批量管理 Actor 的场景中使用。
*/
public final class ActorSystemContainer implements AutoCloseable {
/**
* Pekko Actor 系统实例,所有托管的 Actor 都属于该系统
* 生命周期由外部管理,容器仅持有引用并用于创建/管理 Actor
*/
private final ActorSystem system;
/**
* 日志适配器,用于记录容器操作相关日志(如Actor创建/销毁、异常信息)
* 绑定到当前ActorSystem的日志系统,确保日志上下文一致
*/
private final LoggingAdapter loggingAdapter;
/**
* 存储 Actor 名称到 Actor 引用的映射,使用 ConcurrentHashMap 保证线程安全
* 键:Actor的唯一名称(在ActorSystem中需唯一)
* 值:对应的ActorRef实例(用于发送消息和监控生命周期)
*/
private final Map<String, ActorRef> actors;
/**
* CDI实例容器,用于获取需要注入的依赖对象实例
* 采用Optional包装,支持无CDI环境下的降级使用
*/
private final Instance<Object> cdiInstance;
/**
* 简化构造方法,指定 ActorSystem 和初始容量创建容器。
* 日志使用系统默认日志(system.log()),存储使用 ConcurrentHashMap。
*
* @param system Pekko Actor 系统,所有创建的 Actor 将属于该系统
* @param capacity 内部存储 Map 的初始容量,用于优化性能,避免频繁扩容
*/
public ActorSystemContainer(final ActorSystem system, final int capacity) {
this.system = system;
this.loggingAdapter = system.log();
this.actors = new ConcurrentHashMap<>(capacity);
this.cdiInstance = null;
}
/**
* 带CDI实例容器的构造方法(子类专用)
* 允许子类传入Instance<Object>以支持依赖注入功能
* 私有构造,确保只能通过子类调用
*
* @param system Pekko Actor系统
* @param instance CDI实例容器
* @param capacity 内部存储 Map 的初始容量,用于优化性能,避免频繁扩容
*/
public ActorSystemContainer(final ActorSystem system, final int capacity, Instance<Object> instance) {
this.system = system;
this.loggingAdapter = system.log();
this.actors = new ConcurrentHashMap<>(capacity);
this.cdiInstance = instance;
}
/**
* 最简构造方法,仅需传入 ActorSystem。
* 日志使用系统默认日志(system.log()),存储使用默认容量的 ConcurrentHashMap。
*
* @param system Pekko Actor 系统,所有创建的 Actor 将属于该系统
*/
public ActorSystemContainer(final ActorSystem system) {
this.system = system;
this.loggingAdapter = system.log();
this.actors = new ConcurrentHashMap<>();
this.cdiInstance = null;
}
/**
* 带CDI实例容器的构造方法(子类专用)
* 允许子类传入Instance<Object>以支持依赖注入功能
* 私有构造,确保只能通过子类调用
*
* @param system Pekko Actor系统
* @param instance CDI实例容器
*/
public ActorSystemContainer(final ActorSystem system, Instance<Object> instance) {
this.system = system;
this.loggingAdapter = system.log();
this.actors = new ConcurrentHashMap<>();
this.cdiInstance = instance;
}
/**
* 判断容器中是否包含指定名称的 Actor。
*
* @param name Actor 名称(区分大小写)
* @return 存在返回 true,否则返回 false
*/
public boolean containsKey(String name) {
return actors.containsKey(name);
}
/**
* 获取不可修改的 Actor 名称-引用映射表。
* 防止外部直接修改内部存储,保证容器状态一致性。
*
* @return 不可修改的 Map 视图,修改操作会抛出 UnsupportedOperationException
*/
public Map<String, ActorRef> actors() {
return Collections.unmodifiableMap(actors);
}
/**
* 获取容器关联的 ActorSystem 实例。
*
* @return 容器使用的 Pekko Actor 系统
*/
public ActorSystem system() {
return system;
}
/**
* 根据名称和 Props 创建 Actor,并注册到容器中。
* 若同名 Actor 已存在,会先停止旧 Actor 再创建新 Actor,确保名称唯一性。
*
* @param name Actor 唯一名称(需保证在 ActorSystem 中唯一,否则会抛出异常)
* @param props Actor 配置对象,包含 Actor 类型及创建参数
* @return 新创建的 Actor 引用,可用于直接发送消息
*/
public ActorRef actorOf(String name, Props props) {
return actors.compute(name, (key, oldActorRef) -> {
if (oldActorRef != null) {
// 停止旧实例,确保资源释放
system.stop(oldActorRef);
loggingAdapter.info("Old instance of Actor [{}] has been stopped, preparing to create a new instance", name);
}
// 创建新实例并注册
ActorRef newActorRef = system.actorOf(props, name);
loggingAdapter.info("Actor [{}] created successfully, path: {}", name, newActorRef.path());
return newActorRef;
});
}
/**
* 根据名称、Actor 类和构造参数创建 Actor,并注册到容器中。
* 内部自动通过 Props.create() 生成配置对象,简化 Actor 创建流程。
*
* @param name Actor 唯一名称(需在 ActorSystem 中唯一)
* @param actor Actor 类型的 Class 对象,指定要创建的 Actor 类型
* @param <T> Actor 的具体类型,必须继承自 pekko.actor.Actor
* @return 新创建的 Actor 引用
*/
public <T extends Actor> ActorRef actorOf(String name, Class<T> actor) {
return actorOf(name, Props.create(actor));
}
/**
* 根据名称、Actor 类和工厂方法创建 Actor,并注册到容器中。
* 适用于需要复杂初始化逻辑的 Actor 创建场景,提供更高的灵活性。
*
* @param name Actor 唯一名称
* @param actor Actor 类型的 Class 对象
* @param creator 生成 Actor 实例的工厂方法,负责处理复杂初始化逻辑
* @param <T> Actor 的具体类型
* @return 新创建的 Actor 引用
*/
public <T extends Actor> ActorRef actorOf(String name, Class<T> actor, Supplier<T> creator) {
return actorOf(name, Props.create(actor, creator));
}
/**
* 根据名称查找 Actor 引用。
*
* @param name Actor 名称
* @return 包含 ActorRef 的 Optional,若不存在则返回空 Optional
*/
public Optional<ActorRef> get(String name) {
return Optional.ofNullable(actors.get(name));
}
/**
* 移除指定名称的 Actor,并停止该 Actor 以释放资源。
* 操作是原子的,确保不会出现内存泄漏。
*
* @param name 需要移除的 Actor 名称
*/
public void remove(String name) {
ActorRef ref = actors.remove(name);
if (ref != null) {
// 停止 Actor 实例
system.stop(ref);
loggingAdapter.info("Actor [{}] has been removed from the container and stopped, path: {}", name, ref.path());
} else {
loggingAdapter.warning("Attempted to remove non-existent Actor [{}]", name);
}
}
/**
* 停止容器中所有 Actor,并清空存储。
* 适用于批量销毁场景(如模块重启、系统关闭),确保资源彻底释放。
*/
public void clear() {
// 停止所有 Actor
actors.values().forEach(ref -> {
system.stop(ref);
loggingAdapter.info("Actor [{}] has been stopped", ref.path().name());
});
// 清空映射表
actors.clear();
loggingAdapter.info("Container has been cleared, number of currently managed Actors: 0");
}
/**
* 获取容器中当前管理的 Actor 数量。
*
* @return Actor 数量
*/
public int size() {
return actors.size();
}
/**
* 判断容器是否为空(不包含任何 Actor)。
*
* @return 空返回 true,否则返回 false
*/
public boolean isEmpty() {
return actors.isEmpty();
}
/**
* 判断容器中是否包含指定的 Actor 引用。
* 用于通过引用反向检查是否在容器管理范围内。
*
* @param value 需要检查的 Actor 引用
* @return 存在返回 true,否则返回 false
*/
public boolean containsValue(ActorRef value) {
return actors.containsValue(value);
}
/**
* 获取不可修改的 Actor 引用集合。
* 可用于遍历所有 Actor 实例。
*
* @return 不可修改的 Collection 视图
*/
public Collection<ActorRef> values() {
return Collections.unmodifiableCollection(actors.values());
}
/**
* 获取不可修改的 Actor 名称-引用键值对集合。
* 可用于同时需要名称和引用的遍历场景。
*
* @return 不可修改的 Set 视图
*/
public Set<Map.Entry<String, ActorRef>> entrySet() {
return Collections.unmodifiableSet(actors.entrySet());
}
/**
* 返回 Actor 键值对的 Stream,支持流式处理(过滤、映射等)。
* 适用于需要复杂查询或转换的场景。
*
* @return 包含所有 Actor 键值对的 Stream
*/
public Stream<Map.Entry<String, ActorRef>> stream() {
return Collections.unmodifiableSet(actors.entrySet()).stream();
}
/**
* 遍历容器中所有 Actor,对每个 Actor 执行自定义操作。
* 提供函数式编程风格的遍历方式。
*
* @param action 接收 Actor 名称和引用的消费函数,定义对每个 Actor 的操作
*/
public void forEach(BiConsumer<String, ActorRef> action) {
actors.forEach(action);
}
/**
* 向指定名称的 Actor 发送消息,并指定发送者。
* 发送者会收到目标 Actor 的回复消息(如果有的话)。
*
* @param name 目标 Actor 名称
* @param message 要发送的消息对象(建议使用不可变对象)
* @param target 发送者 Actor 引用(用于消息回传)
* @return 包含目标 Actor 引用的 Optional,若目标不存在则为空
*/
public Optional<ActorRef> tell(String name, Object message, ActorRef target) {
ActorRef ref = actors.get(name);
if (ref == null) {
loggingAdapter.warning("Failed to send message to non-existent Actor [{}]", name);
return Optional.empty();
}
ref.tell(message, target);
loggingAdapter.debug("Message sent to Actor [{}]: {}", name, message);
return Optional.of(ref);
}
/**
* 向指定名称的 Actor 发送消息,使用默认发送者(无发送者)。
* 目标 Actor 无法回复此消息。
*
* @param name 目标 Actor 名称
* @param message 要发送的消息对象
* @return 包含目标 Actor 引用的 Optional,若目标不存在则为空
*/
public Optional<ActorRef> tell(String name, Object message) {
return tell(name, message, ActorRef.noSender());
}
/**
* 转发消息到指定名称的 Actor,保留原始发送者信息。
* 适用于消息代理、中间件场景,目标 Actor 会认为消息来自原始发送者。
*
* @param name 目标 Actor 名称
* @param message 要转发的消息对象
* @param ctx 当前 Actor 的上下文(用于保留消息上下文)
* @return 包含目标 Actor 引用的 Optional,若目标不存在则为空
*/
public Optional<ActorRef> forward(String name, Object message, ActorContext ctx) {
ActorRef ref = actors.get(name);
if (ref == null) {
loggingAdapter.warning("Failed to forward message to non-existent Actor [{}]", name);
return Optional.empty();
}
ref.forward(message, ctx);
loggingAdapter.debug("Message forwarded to Actor [{}]: {}", name, message);
return Optional.of(ref);
}
/**
* 监控指定名称的 Actor,当 Actor 终止时会收到 Terminated 消息。
* 用于跟踪 Actor 生命周期,及时处理异常终止情况。
*
* @param name 要监控的 Actor 名称
* @param ctx 当前 Actor 的上下文(用于注册监控)
* @return 包含被监控 Actor 引用的 Optional,若目标不存在则为空
*/
public Optional<ActorRef> watch(String name, ActorContext ctx) {
ActorRef ref = actors.get(name);
if (ref == null) {
loggingAdapter.warning("Failed to watch non-existent Actor [{}]", name);
return Optional.empty();
}
ctx.watch(ref);
loggingAdapter.info("Started watching Actor [{}]", name);
return Optional.of(ref);
}
/**
* 解绑指定名称的 Actor 生命周期跟踪
*
* @param name 要监控的 Actor 名称
* @param ctx 当前 Actor 的上下文(用于注册监控)
* @return 包含被监控 Actor 引用的 Optional,若目标不存在则为空
*/
public Optional<ActorRef> unwatch(String name, ActorContext ctx) {
ActorRef ref = actors.get(name);
if (ref == null) {
loggingAdapter.warning("Failed to unwatch non-existent Actor [{}]", name);
return Optional.empty();
}
ctx.unwatch(ref);
loggingAdapter.info("Stopped watching Actor [{}]", name);
return Optional.of(ref);
}
/**
* 向容器中所有 Actor 广播消息,并指定发送者。
* 适用于全局通知、状态同步等需要所有 Actor 接收的场景。
*
* @param message 要广播的消息对象
* @param target 发送者 Actor 引用
*/
public void broadcast(Object message, ActorRef target) {
int count = actors.size();
loggingAdapter.info("Started broadcasting message to {} Actors: {}", count, message);
actors.forEach((name, ref) -> ref.tell(message, target));
}
/**
* 向容器中所有 Actor 广播消息,使用默认发送者(无发送者)。
*
* @param message 要广播的消息对象
*/
public void broadcast(Object message) {
broadcast(message, ActorRef.noSender());
}
/**
* 实现 AutoCloseable 接口,释放容器资源。
* 内部调用 clear() 方法,停止所有 Actor 并清空存储。
* 建议通过 try-with-resources 语法自动调用,确保资源释放。
*/
@Override
public void close() {
loggingAdapter.info("Container starting to shutdown, cleaning up all Actors...");
clear();
loggingAdapter.info("Container has been shut down");
}
/**
* 获取CDI实例容器
*
* @return 存储依赖实例的容器对象(Optional包装,可能为空)
*/
public Optional<Instance<Object>> instance() {
return Optional.ofNullable(cdiInstance);
}
/**
* 为Actor实例注入依赖字段
* 内部通过反射机制扫描Actor类及其父类中所有带有@Inject注解的字段
* 并从CDI容器中获取对应类型的实例进行注入
*
* @param actor 需要注入依赖的Actor实例
* @param <A> Actor的具体类型
* @throws IllegalAccessException 当反射访问字段时发生权限异常,或CDI容器未初始化
*/
private <A extends Actor> void injectFields(A actor) throws IllegalAccessException {
// 检查CDI容器是否初始化,未初始化则抛出异常
if (cdiInstance == null) {
throw new IllegalAccessException("CDI Instance container is not initialized, unable to inject dependencies for the Actor");
}
Class<?> clazz = actor.getClass();
// 遍历当前类及其所有父类(直到Object类)
while (clazz != Object.class) {
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
// 检查字段是否带有@Inject注解
if (field.isAnnotationPresent(Inject.class)) {
field.setAccessible(true); // 允许访问私有字段
// 从CDI容器中获取匹配字段类型的实例
Object dependency = cdiInstance.select(field.getType()).get();
// 为字段设置值(注入依赖)
field.set(actor, dependency);
loggingAdapter.debug("Successfully injected dependency into field [{}] of Actor [{}]",
actor.self().path().name(), field.getName());
}
}
clazz = clazz.getSuperclass();
}
}
/**
* 创建并返回指定Actor的引用(ActorRef)
* 自动处理Actor的实例化和依赖注入过程
* 与actorOf方法的区别:该方法强制进行依赖注入
*
* @param name Actor的名称
* @param actor Actor的类对象
* @param <T> Actor的具体类型
* @return 新创建的Actor的引用
*/
public <T extends Actor> ActorRef instance(String name, Class<T> actor) {
// 调用父类的actorOf方法,使用自定义的Props创建Actor
return actorOf(name, Props.create(actor, () -> {
try {
// 获取Actor类的无参构造方法
Constructor<T> constructor = actor.getDeclaredConstructor();
constructor.setAccessible(true); // 允许访问非public构造方法
// 创建Actor实例
T actorInstance = constructor.newInstance();
// 为Actor实例注入依赖
injectFields(actorInstance);
return actorInstance;
} catch (Exception e) {
// 记录异常日志并包装为运行时异常抛出
loggingAdapter.warning("Failed to create and inject actor [actorClass: " + actor.getName() + "]", e);
throw new RuntimeException("Failed to create and inject actor [actorClass: " + actor.getName() + "]", e);
}
}));
}
/**
* 创建并返回指定Actor的引用(ActorRef),使用自定义的创建器
* 自动为创建器生成的Actor实例注入依赖
* 适用于需要自定义Actor实例化逻辑但仍需依赖注入的场景
*
* @param name Actor的名称
* @param actor Actor的类对象
* @param creator 用于创建Actor实例的供应者(Supplier)
* @param <T> Actor的具体类型
* @return 新创建的Actor的引用
*/
public <T extends Actor> ActorRef instance(String name, Class<T> actor, Supplier<T> creator) {
return actorOf(name, Props.create(actor, () -> {
try {
// 使用提供的创建器获取Actor实例
T actorInstance = creator.get();
// 为Actor实例注入依赖
injectFields(actorInstance);
return actorInstance;
} catch (Exception e) {
// 记录异常日志并包装为运行时异常抛出
loggingAdapter.warning("Failed to create and inject actor [actorClass: " + actor.getName() + "]", e);
throw new RuntimeException("Failed to create and inject actor [actorClass: " + actor.getName() + "]", e);
}
}));
}
}
之后生成个全局配置类, 等待引入该共用库的开发者自己去实现接口功能:
package io.fortress.actor.configurator;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.fortress.actor.ext.ActorSystemContainer;
import io.smallrye.config.WithDefault;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Terminated;
import scala.util.Try;
import java.util.Map;
import java.util.function.Function;
/**
* Pekko Actor 系统的配置器接口,用于标准化 Actor 系统的初始化流程(配置加载、日志适配、自定义参数注入)。
* <p>核心作用:
* 1. 封装 Pekko 配置的加载逻辑(支持多类型默认配置切换);
* 2. 自动适配 Quarkus 日志系统(复用 Slf4j 日志配置,避免多日志系统冲突);
* 3. 支持外部自定义配置注入(通过 settings 传递 Pekko 配置项);
* 4. 提供 Actor 系统创建的默认实现和终止回调扩展。
* <p>使用场景:Quarkus 环境下 Pekko Actor 系统的初始化,需配合 SmallRye Config 实现配置注入(如 @WithDefault 注解)。
*/
public interface ActorSystemConfigurator {
/**
* Pekko 配置加载类型枚举,对应 {@link ConfigFactory} 提供的四种默认配置加载方式,用于切换不同场景的基础配置。
* <p>枚举值与 ConfigFactory 方法的映射关系:
* - Reference: 加载 Pekko 框架默认参考配置({@link ConfigFactory#defaultReference()});
* - Application: 加载应用自定义配置({@link ConfigFactory#defaultApplication()});
* - Overrides: 加载覆盖配置(优先级最高,用于临时覆盖默认配置,{@link ConfigFactory#defaultOverrides()});
* - ReferenceUnresolved: 加载未解析的默认参考配置(需手动调用 resolve(),{@link ConfigFactory#defaultReferenceUnresolved()})。
*/
enum ActorReferences {
/**
* 对应 {@link ConfigFactory#defaultReference()},加载 Pekko 框架内置的默认参考配置
*/
Reference,
/**
* 对应 {@link ConfigFactory#defaultApplication()},加载应用级自定义配置(如 application.conf)
*/
Application,
/**
* 对应 {@link ConfigFactory#defaultOverrides()},加载覆盖配置(优先级高于 Reference 和 Application)
*/
Overrides,
/**
* 对应 {@link ConfigFactory#defaultReferenceUnresolved()},加载未解析的默认参考配置(需后续手动解析)
*/
ReferenceUnresolved
}
/**
* 获取 Pekko 配置加载类型(通过 SmallRye Config 注入,支持配置文件指定)。
* <p>默认值为 {@link ActorReferences#Reference},即默认加载 Pekko 框架的默认参考配置;
* 可通过配置文件(如 application.properties)指定:{@code io.fortress.actor.configurator.reference=application} 切换为应用配置。
*
* @return 配置加载类型,非 null
*/
@WithDefault("reference")
ActorReferences reference();
/**
* 获取 Actor 系统的名称(用于创建 Actor 系统实例的唯一标识)。
* <p>最终会传递给 {@link ActorSystem#create(String, Config)} 的第一个参数,作为 Actor 系统的全局名称;
* 建议通过配置文件注入,确保不同环境(开发/测试/生产)可灵活配置。
*
* @return Actor 系统名称,非空字符串
*/
String name();
/**
* 获取 Pekko 自定义配置集合(用于注入额外的 Pekko 配置项,覆盖默认配置)。
* <p>配置项格式要求:
* - Key: 需符合 Pekko 配置路径(如 "pekko.actor.serializers.proto");
* - Value: 配置值(如 "org.apache.pekko.remote.serialization.ProtobufSerializer");
* <p>使用场景:外部通过配置文件或代码注入 Pekko 特定配置(如序列化器、线程池参数、远程通信配置等)。
*
* @return 自定义配置键值对,可为空(无额外配置时返回空 Map)
*/
Map<String, String> settings();
/**
* 默认实现:创建 Pekko Actor 系统实例(核心初始化方法,封装完整配置流程)。
* <p>初始化流程:
* 1. 加载基础配置:通过 {@link #reference()} 指定的类型加载默认配置;
* 2. 适配 Quarkus 日志:自动注入 Slf4j 日志相关配置,复用 Quarkus 日志系统(避免 Pekko 自带日志与 Quarkus 冲突);
* 3. 合并自定义配置:将 {@link #settings()} 中的配置项合并到基础配置,覆盖默认值;
* 4. 创建 Actor 系统:使用合并后的配置和 {@link #name()} 生成的系统名,创建 ActorSystem 实例。
*
* @return 初始化完成的 Pekko Actor 系统实例,非 null
*/
default ActorSystem createActorSystem() {
// 1. 加载应用默认基础配置(如 application.conf 中的配置)
Config originalConfig = ConfigFactory.load();
// 2. 根据 reference() 选择的类型,合并对应的 Pekko 默认配置(优先级:Overrides > Application > Reference)
originalConfig = switch (reference()) {
case Application -> originalConfig.withFallback(ConfigFactory.defaultApplication());
case Overrides -> originalConfig.withFallback(ConfigFactory.defaultOverrides());
case ReferenceUnresolved -> originalConfig.withFallback(ConfigFactory.defaultReferenceUnresolved());
// 默认使用 Pekko 框架的默认参考配置
default -> originalConfig.withFallback(ConfigFactory.defaultReference());
};
// 3. 获取外部注入的自定义配置(settings()),适配 Quarkus 日志系统
Map<String, String> settings = settings();
// 若未配置 Pekko 日志过滤器,自动注入 Slf4j 日志相关配置(复用 Quarkus 日志,避免多日志实现冲突)
if (!settings.containsKey("pekko.logging-filter")) {
settings.put("pekko.logging-filter", "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"); // 日志过滤器:使用 Slf4j 实现
settings.put("pekko.loggers.0", "org.apache.pekko.event.slf4j.Slf4jLogger"); // 日志实现:Slf4j logger
settings.put("pekko.log-dead-letters", "off"); // 关闭死信日志(避免开发环境日志冗余)
settings.put("pekko.log-dead-letters-during-shutdown", "off"); // 关闭阶段的死信日志
}
// 4. 合并自定义配置到基础配置(自定义配置优先级最高,覆盖默认配置)
Config mergedConfig = ConfigFactory.parseMap(settings).withFallback(originalConfig);
// 5. 创建并返回 Actor 系统实例(使用配置的系统名和合并后的配置)
return ActorSystem.create(name(), mergedConfig);
}
/**
* 扩展实现:创建 Actor 系统并绑定终止回调(支持在 Actor 系统终止时执行自定义逻辑)。
* <p>适用场景:需要监听 Actor 系统生命周期的场景(如资源释放、日志记录、告警通知等);
* 回调函数会接收 Actor 系统终止的结果({@link Try<Terminated>}),成功终止时 Try 为 Success,异常时为 Failure。
*
* @param closeable Actor 系统终止时的回调函数,入参为终止结果(Try<Terminated>),返回值无特定含义(可忽略)
* @param <U> 回调函数返回值类型(泛型,无实际约束)
* @return 初始化完成的 Actor 系统实例(已绑定终止回调),非 null
*/
default <U> ActorSystem createActorSystem(Function<Try<Terminated>, U> closeable) {
// 1. 复用默认方法创建 Actor 系统
ActorSystem actorSystem = createActorSystem();
// 2. 绑定终止回调:当 Actor 系统终止时,执行 closeable 函数(使用 Actor 系统的调度器线程池)
actorSystem.whenTerminated().onComplete(closeable::apply, actorSystem.dispatcher());
return actorSystem;
}
/**
* 使用指定的ActorSystem和容量创建ActorSystemContainer实例。
* 适用于已有ActorSystem实例且需要指定容器容量的场景。
*
* @param actorSystem 已初始化的ActorSystem实例,作为容器的核心组件
* @param capacity 容器的容量限制,用于控制可容纳的元素数量
* @return 新创建的ActorSystemContainer实例,包含指定的ActorSystem和容量配置
*/
default ActorSystemContainer createActorSystemContainer(ActorSystem actorSystem, int capacity) {
return new ActorSystemContainer(actorSystem, capacity);
}
/**
* 使用指定容量创建ActorSystemContainer实例。
* 内部会先创建默认配置的ActorSystem,再结合指定容量初始化容器。
* 适用于需要自定义容量但使用默认ActorSystem的场景。
*
* @param capacity 容器的容量限制
* @return 新创建的ActorSystemContainer实例,包含默认ActorSystem和指定容量
*/
default ActorSystemContainer createActorSystemContainer(int capacity) {
ActorSystem actorSystem = createActorSystem();
return createActorSystemContainer(actorSystem, capacity);
}
/**
* 使用指定容量和关闭回调函数创建ActorSystemContainer实例。
* 先通过回调函数创建定制化的ActorSystem,再结合容量参数完成容器初始化。
* 适用于需要自定义容量且需处理ActorSystem关闭逻辑的场景。
*
* @param capacity 容器的容量限制
* @param closeable 关闭时的回调函数,用于处理终止(Terminated)状态的尝试结果
* @return 新创建的ActorSystemContainer实例,包含定制化ActorSystem和指定容量
*/
default ActorSystemContainer createActorSystemContainer(int capacity, Function<Try<Terminated>, ?> closeable) {
ActorSystem actorSystem = createActorSystem(closeable);
return createActorSystemContainer(actorSystem, capacity);
}
/**
* 使用指定的ActorSystem创建默认容量的ActorSystemContainer实例。
* 适用于已有ActorSystem实例且使用默认容量的场景。
*
* @param actorSystem 已初始化的ActorSystem实例
* @return 新创建的ActorSystemContainer实例,包含指定的ActorSystem和默认容量
*/
default ActorSystemContainer createActorSystemContainer(ActorSystem actorSystem) {
return new ActorSystemContainer(actorSystem);
}
/**
* 创建默认配置的ActorSystemContainer实例。
* 内部会创建默认的ActorSystem并使用默认容量初始化容器。
* 适用于快速创建基础配置容器的场景。
*
* @return 新创建的ActorSystemContainer实例,包含默认ActorSystem和默认容量
*/
default ActorSystemContainer createActorSystemContainer() {
ActorSystem actorSystem = createActorSystem();
return createActorSystemContainer(actorSystem);
}
/**
* 使用关闭回调函数创建默认容量的ActorSystemContainer实例。
* 先通过回调函数创建定制化的ActorSystem,再使用默认容量初始化容器。
* 适用于需要处理关闭逻辑但使用默认容量的场景。
*
* @param closeable 关闭时的回调函数,用于处理终止(Terminated)状态的尝试结果
* @return 新创建的ActorSystemContainer实例,包含定制化ActorSystem和默认容量
*/
default ActorSystemContainer createActorSystemContainer(Function<Try<Terminated>, ?> closeable) {
ActorSystem actorSystem = createActorSystem(closeable);
return createActorSystemContainer(actorSystem);
}
}
测试单元运行下具体功能, 确认执行无误:
package io.fortress.actor.ext;
import io.fortress.actor.configurator.ActorSystemConfigurator;
import io.quarkus.arc.Unremovable;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.common.constraint.Assert;
import io.smallrye.config.ConfigMapping;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.testkit.TestKit;
import org.junit.jupiter.api.Test;
import scala.concurrent.duration.Duration;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* ActorSystemContainer 功能测试类,基于 QuarkusTest 框架验证容器核心能力。
* 测试范围包括:Actor创建与管理、消息广播、依赖注入、消息发送与响应等关键功能。
* 所有测试依赖 Quarkus 容器环境,自动集成 CDI 和配置系统。
*/
@QuarkusTest
class ActorSystemContainerTest {
/**
* Actor系统配置扩展接口,继承基础配置并支持扩展新属性。
* 通过 @ConfigMapping 绑定配置前缀 "fortress.actor",自动映射配置文件中的属性。
*/
@ConfigMapping(prefix = "fortress.actor")
interface ActorSystemConfiguratorExt extends ActorSystemConfigurator {
// 扩展点:如需添加新配置项,在此定义getter方法(如 String customProp();)
// 配置值会自动从 application.properties/yml 中读取
}
/**
* 注入配置接口实例,用于获取ActorSystem的初始化参数(如系统名称、配置项等)。
*/
@Inject
ActorSystemConfiguratorExt configuration;
/**
* 生产ActorSystemContainer实例的CDI生产者方法。
* 作用:将容器实例纳入CDI管理,确保依赖注入(如Instance<Object>)和生命周期正确绑定。
*
* @param instance CDI实例容器,用于Actor的依赖注入
* @return 配置完成的ActorSystemContainer实例,生命周期为ApplicationScoped
*/
@Produces
@Unremovable // 确保Quarkus优化时不被移除(因测试需显式依赖)
@ApplicationScoped
public ActorSystemContainer createActorRefContainer(Instance<Object> instance) {
// 基于配置创建ActorSystem,再初始化容器并关联CDI实例
ActorSystem system = configuration.createActorSystem();
return new ActorSystemContainer(system, instance);
}
/**
* 测试用Actor:用于验证广播功能和基础消息处理。
* 功能:接收字符串消息并返回固定响应"Overs",便于测试验证消息传递链路。
*/
public static class RandomActor extends AbstractActor {
// 日志适配器:绑定到当前ActorSystem的日志系统,确保日志上下文一致
final LoggingAdapter log = context().system().log();
/**
* Actor启动回调:打印启动日志,便于跟踪测试执行过程。
*/
@Override
public void preStart() {
log.info("RandomActor started : {}", self().path());
}
/**
* 定义消息处理逻辑:仅处理String类型消息,接收后返回固定响应。
*
* @return 消息处理器,匹配String类型消息并执行回复逻辑
*/
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, msg -> {
log.info("RandomActor received : {}", msg);
// 向发送者回复固定消息"Overs",供测试验证
sender().tell("Overs", self());
})
.build();
}
}
/**
* 注入待测试的ActorSystemContainer实例(由上面的生产者方法提供)。
*/
@Inject
ActorSystemContainer container;
/**
* 测试广播功能:验证容器能否向所有托管的Actor发送消息并接收响应。
* 测试步骤:
* 1. 创建多个Actor并注册到容器
* 2. 发送广播消息并指定测试Actor为发送者
* 3. 验证至少有一个Actor处理消息并返回响应
*/
@Test
void broadcast() {
// 1. 创建5个随机命名的Actor(避免名称冲突),注册到容器
for (int i = 0; i < 5; i++) {
container.actorOf(UUID.randomUUID().toString(), RandomActor.class);
}
// 2. 创建TestKit实例:提供测试用Actor和消息接收工具
TestKit testKit = new TestKit(container.system());
// 3. 执行广播:向所有Actor发送"Hello world",发送者设为testKit的测试Actor
container.broadcast("Hello world", testKit.testActor());
// 4. 验证响应:最多等待3秒,期望收到至少一个"Overs"响应
// (因广播是异步的,只要有一个Actor处理即可证明功能正常)
String response = testKit.expectMsg(Duration.create(3, TimeUnit.SECONDS), "Overs");
Assert.assertNotNull(response);
}
/**
* 测试用Actor:用于验证依赖注入和消息回显功能。
* 功能:
* - 接收字符串消息并原样返回(回显)
* - 验证CDI注入是否生效(注入ActorSystemContainer)
*/
public static class EchoActor extends AbstractActor {
// 日志适配器:记录启动信息和消息处理日志
final LoggingAdapter log = context().system().log();
/**
* 注入ActorSystemContainer:验证CDI依赖注入是否正常工作。
* (容器实例由测试类中的生产者方法提供)
*/
@Inject
ActorSystemContainer container;
/**
* Actor启动回调:打印容器关联的系统信息和当前管理的Actor数量,
* 验证注入的容器实例是否可用。
*/
@Override
public void preStart() {
log.info("EchoActor started : {}", self().path());
log.info("关联的ActorSystem名称 : {}", container.system().name());
log.info("容器当前管理的Actor数量 : {}", container.size());
}
/**
* 定义消息处理逻辑:接收String消息并原样回复,验证消息传递准确性。
*
* @return 消息处理器,匹配String类型消息并执行回显逻辑
*/
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, msg -> {
log.info("EchoActor received : {}", msg);
// 向发送者回复原消息,验证消息传递链路
sender().tell(msg, self());
})
.build();
}
}
/**
* 测试消息发送与CDI注入:验证带依赖注入的Actor创建及消息回显功能。
* 测试步骤:
* 1. 创建带CDI注入的EchoActor
* 2. 发送消息并验证回显内容
* 3. 间接验证CDI注入是否成功(Actor启动日志会打印容器信息)
*/
@Test
void echo() {
// 1. 创建EchoActor:使用instance方法(带CDI注入),随机命名避免冲突
ActorRef actorRef = container.instance(UUID.randomUUID().toString(), EchoActor.class);
// 2. 创建TestKit实例:用于发送消息和接收响应
TestKit testKit = new TestKit(container.system());
// 3. 发送消息:向EchoActor发送"Hello world",发送者设为testKit的测试Actor
actorRef.tell("Hello world", testKit.testActor());
// 4. 验证响应:最多等待3秒,期望收到原消息"Hello world",
// 证明消息发送/接收正常,且Actor的CDI注入未影响核心功能
String response = testKit.expectMsg(Duration.create(3, TimeUnit.SECONDS), "Hello world");
Assert.assertNotNull(response);
Assert.assertTrue("Hello world".equals(response));
}
}
这里就是整体的注入 Bean 和 Actor 全局容器的设计.