MeteorCat / 学习Flink开源库源码(三)

Created Thu, 03 Jul 2025 19:58:07 +0800 Modified Wed, 29 Oct 2025 23:25:00 +0800
4022 Words

前面比较长时间篇幅去说明 Description, 主要开始对开源项目的复杂性有基础认识, 可以看到开源项目当中做了大量妥协和扩展, 导致有时候很简单的功能都要为了扩展而写的很复杂.

这还仅仅是作为很小的组件就要这么多依赖类, 其他的网络|文件|Actor库依赖更加庞大, 所以需要总结适合自己理解方法

这里回过头来说明配置 Configuration 类的相关工具类:

  • org.apache.flink.configuration.WritableConfig
  • org.apache.flink.configuration.ReadableConfig
  • org.apache.flink.configuration.ConfigOption
  • org.apache.flink.configuration.ConfigOptions: 主要对之前的 ConfigOption 进行构建的 Builder

上个篇章已经说明 ConfigOption 是作为 Configuration 单独配置存放内部集合中, 而 ConfigOptions 就是相当于创建器:

// 这里引用官方文档的构建器例子
// simple string-valued option with a default value
ConfigOption<String> tempDirs = ConfigOptions
  .key("tmp.dir")
  .stringType()
  .defaultValue("/tmp");

// simple integer-valued option with a default value
ConfigOption<Integer> parallelism = ConfigOptions
  .key("application.parallelism")
  .intType()
  .defaultValue(100);

// option of list of integers with a default value
ConfigOption<Integer> parallelism = ConfigOptions
  .key("application.ports")
  .intType()
  .asList()
  .defaultValue(8000, 8001, 8002);

// option with no default value
ConfigOption<String> userName = ConfigOptions
  .key("user.name")
  .stringType()
  .noDefaultValue();

// option with deprecated keys to check
ConfigOption<Double> threshold = ConfigOptions
  .key("cpu.utilization.threshold")
  .doubleType()
  .defaultValue(0.9)
  .withDeprecatedKeys("cpu.threshold");

这看出很精妙类型分配 Builder 设计, 这里是很值得学习的设计, 所以我这里主要讲解这个类.

这个可以当作学习怎么设计个泛型链式构建器, 学会怎么把泛型包装 Builder 之中

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
 * 构建 ConfigOption 配置
 * <pre>{@code
 * // simple string-valued option with a default value
 * ConfigOption<String> tempDirs = ConfigOptions
 *     .key("tmp.dir")
 *     .stringType()
 *     .defaultValue("/tmp");
 *
 * // simple integer-valued option with a default value
 * ConfigOption<Integer> parallelism = ConfigOptions
 *     .key("application.parallelism")
 *     .intType()
 *     .defaultValue(100);
 *
 * // option of list of integers with a default value
 * ConfigOption<Integer> parallelism = ConfigOptions
 *     .key("application.ports")
 *     .intType()
 *     .asList()
 *     .defaultValue(8000, 8001, 8002);
 *
 * // option with no default value
 * ConfigOption<String> userName = ConfigOptions
 *     .key("user.name")
 *     .stringType()
 *     .noDefaultValue();
 *
 * // option with deprecated keys to check
 * ConfigOption<Double> threshold = ConfigOptions
 *     .key("cpu.utilization.threshold")
 *     .doubleType()
 *     .defaultValue(0.9)
 *     .withDeprecatedKeys("cpu.threshold");
 * }</pre>
 */
public class ConfigOptions {

    /**
     * 禁止实例化
     */
    private ConfigOptions() {
    }

    /**
     * 静态生成初始化配置构建器
     */
    public static OptionBuilder key(String key) {
        if (key == null) throw new NullPointerException();
        return new OptionBuilder(key);
    }

    /**
     * 配置 Builder
     */
    public static final class OptionBuilder {

        /**
         * 配置KEY名称
         */
        @SuppressWarnings("FieldCanBeLocal")
        private final String key;

        /**
         * 构造方法, 注意: 只允许同级包被实例化
         */
        OptionBuilder(String key) {
            this.key = key;
        }


        /**
         * 默认的强类型映射
         */
        @SuppressWarnings("unchecked")
        private static final Class<Map<String, String>> PROPERTIES_MAP_CLASS =
                (Class<Map<String, String>>) (Class<?>) Map.class;


        /**
         * 包装强类型 Boolean Builder
         */
        public TypedConfigOptionBuilder<Boolean> booleanType() {
            return new TypedConfigOptionBuilder<>(key, Boolean.class);
        }

        /**
         * 包装强类型 Integer Builder
         */
        public TypedConfigOptionBuilder<Integer> intType() {
            return new TypedConfigOptionBuilder<>(key, Integer.class);
        }

        /**
         * 包装强类型 Long Builder
         */
        public TypedConfigOptionBuilder<Long> longType() {
            return new TypedConfigOptionBuilder<>(key, Long.class);
        }

        /**
         * 包装强类型 Float Builder
         */
        public TypedConfigOptionBuilder<Float> floatType() {
            return new TypedConfigOptionBuilder<>(key, Float.class);
        }

        /**
         * 包装强类型 Double Builder
         */
        public TypedConfigOptionBuilder<Double> doubleType() {
            return new TypedConfigOptionBuilder<>(key, Double.class);
        }

        /**
         * 包装强类型 String Builder
         */
        public TypedConfigOptionBuilder<String> stringType() {
            return new TypedConfigOptionBuilder<>(key, String.class);
        }

        /**
         * 包装强类型 Duration Builder
         */
        public TypedConfigOptionBuilder<Duration> durationType() {
            return new TypedConfigOptionBuilder<>(key, Duration.class);
        }

        /**
         * 包装强类型 Enum Builder
         */
        public <T extends Enum<T>> TypedConfigOptionBuilder<T> enumType(Class<T> enumType) {
            return new TypedConfigOptionBuilder<>(key, enumType);
        }

        /**
         * 包装强类型 Map Builder
         */
        public TypedConfigOptionBuilder<Map<String, String>> mapType() {
            return new TypedConfigOptionBuilder<>(key, PROPERTIES_MAP_CLASS);
        }
    }

    /**
     * 通用强类型配置 Builder
     */
    public static class TypedConfigOptionBuilder<T> {
        /**
         * 强类型的配置KEY名称
         */
        private final String key;

        /**
         * 强类型的配置保存的原生类型
         */
        private final Class<T> clazz;

        /**
         * 只允许同级包内部初始化
         */
        TypedConfigOptionBuilder(String key, Class<T> clazz) {
            this.key = key;
            this.clazz = clazz;
        }

        /**
         * 生成带默认值的配置值
         */
        public ConfigOption<T> defaultValue(T value) {
            return new ConfigOption<>(key, clazz, ConfigOption.EMPTY_DESCRIPTION, value, false);
        }


        /**
         * 生成不带默认值的配置值
         */
        public ConfigOption<T> noDefaultValue() {
            return new ConfigOption<>(key, clazz, ConfigOption.EMPTY_DESCRIPTION, null, true);
        }


        /**
         * 生成列表的配置项
         */
        public ListConfigOptionBuilder<T> asList() {
            return new ListConfigOptionBuilder<>(key, clazz);
        }
    }

    /**
     * 列表对象配置
     */
    public static class ListConfigOptionBuilder<E> {
        private final String key;
        private final Class<E> clazz;

        /**
         * 不允许外部初始化实例化
         */
        ListConfigOptionBuilder(String key, Class<E> clazz) {
            this.key = key;
            this.clazz = clazz;
        }

        /**
         * 构建带默认值的列表配置项
         */
        @SafeVarargs
        public final ConfigOption<List<E>> defaultValues(E... values) {
            return new ConfigOption<>(
                    key,
                    clazz,
                    ConfigOption.EMPTY_DESCRIPTION,
                    Arrays.asList(values),
                    true
            );
        }


        /**
         * 构建不含默认值的列表配置项
         */
        public ConfigOption<List<E>> noDefaultValues() {
            return new ConfigOption<>(key, clazz, ConfigOption.EMPTY_DESCRIPTION, null, true);
        }
    }
}

上面的构建器(Builder)就是很有设计感的代码, 之后这里测试单元来确认是否可用:

package org.meteorcat.fusion;


import org.junit.Test;
import org.meteorcat.fusion.configuration.ConfigOption;
import org.meteorcat.fusion.configuration.ConfigOptions;

import java.util.List;

/**
 * 配置项的测试单元
 */
public class ConfigOptionTests {


    /**
     * 文本配置
     */
    @Test
    public void createStringOptions() {
        ConfigOption<String> tempDirs = ConfigOptions
                .key("tmp.dir")
                .stringType()
                .defaultValue("/tmp");
        System.out.println(tempDirs);
    }

    /**
     * 数值配置Integer
     */
    @Test
    public void createIntegerOptions() {
        ConfigOption<Integer> parallelism = ConfigOptions
                .key("application.parallelism")
                .intType()
                .defaultValue(100);
        System.out.println(parallelism);
    }

    /**
     * 数值配置列表Integer, 注: 官方文档没更新, 所以这里可能不太一样
     */
    @Test
    public void createIntegersOptions() {
        ConfigOption<List<Integer>> parallelism = ConfigOptions
                .key("application.ports")
                .intType()
                .asList()
                .defaultValues(8000, 8001, 8002);
        System.out.println(parallelism);
    }


    /**
     * 数值配置Boolean
     */
    @Test
    public void createBooleanOptions() {
        ConfigOption<Boolean> allowWebSocket = ConfigOptions
                .key("service.websocket.enabled")
                .booleanType()
                .defaultValue(true);
        System.out.println(allowWebSocket);
    }


    /**
     * 数值配置Double, 附带配置变动匹配
     */
    @Test
    public void createDoubleOptions() {
        ConfigOption<Double> threshold = ConfigOptions
                .key("cpu.utilization.threshold")
                .doubleType()
                .defaultValue(0.9)
                .withDeprecatedKeys("cpu.threshold");
        System.out.println(threshold);
    }
}

这种设计方法在很多开源项目当中都有涉及, 是很好用的泛型 Builder 设计模式, 这里需要回顾下 WritableConfig:

/**
 * 让 configuration 配置支持写入功能
 * 依赖 ConfigOption 配置类
 */
public interface WritableConfig {

    /**
     * Stores a given value using the metadata included in the {@link ConfigOption}. The value
     * should be readable back through {@link ReadableConfig}.
     *
     * @param option metadata information
     * @param value  value to be stored
     * @param <T>    type of the value to be stored
     * @return instance of this configuration for fluent API
     */
    <T> Configuration set(ConfigOption<T> option, T value);

}

这就抽象出来写入工具类, 这个用于暴露给外部实现, 其他就是核心实现就是实现类 Configuration, 还有另外的工具类 ConfigurationUtil, 这几个类构建很复杂所以必须自己去全局理解下.

ConfigurationUtil 功能我尽可能精简用法, 里面有的设计方法主要就是类型擦除并且还原类型

import org.meteorcat.fusion.configuration.ConfigOption;
import org.meteorcat.fusion.configuration.ConfigOptions;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * Configuration工具集
 */
public class ConfigurationUtils {
    /**
     * 不允许实例化
     */
    private ConfigurationUtils() { /* 不允许实例化 */}


    // 构建配置项目 -------------------------------------

    /**
     * 获取初始化配置值项: Boolean
     */
    public static ConfigOption<Boolean> getBooleanConfigOption(String key) {
        return ConfigOptions.key(key).booleanType().noDefaultValue();
    }

    /**
     * 获取初始化配置值项: Double
     */
    public static ConfigOption<Double> getDoubleConfigOption(String key) {
        return ConfigOptions.key(key).doubleType().noDefaultValue();
    }

    /**
     * 获取初始化配置值项: Float
     */
    public static ConfigOption<Float> getFloatConfigOption(String key) {
        return ConfigOptions.key(key).floatType().noDefaultValue();
    }

    /**
     * 获取初始化配置值项: Integer
     */
    public static ConfigOption<Integer> getIntegerConfigOption(String key) {
        return ConfigOptions.key(key).intType().noDefaultValue();
    }

    /**
     * 获取初始化配置值项: Long
     */
    public static ConfigOption<Long> getLongConfigOption(String key) {
        return ConfigOptions.key(key).longType().noDefaultValue();
    }

    // -------------------------------------------------------


    // 配置匹配增删改查匹配 -------------------------------------

    /**
     * 用于比较判断配置是否为列表对象
     */
    public static boolean canBePrefixMap(ConfigOption<?> option) {
        return option.getClazz() == Map.class && !option.isList();
    }


    /**
     * 用来比较配置相似度
     * <pre>
     * // 比如一下配置都是 avro-confluent.properties. 开头, 那么应该都算是同种类配置:
     *  avro-confluent.properties.schema = 1
     *  avro-confluent.properties.other-prop = 2
     * // 都是归属于 avro-confluent.properties 配置, 所以能够被一下判断符合种类
     * filterPrefixMapKey("avro-confluent.properties.schema","avro-confluent.properties")
     * </pre>
     */
    public static boolean filterPrefixMapKey(String key, String candidate) {
        final String prefix = key + ".";
        return candidate.startsWith(prefix);
    }

    /**
     * 对整个Map元素做比较, 判断某个Key配置是否在其中
     */
    public static boolean containsPrefixMap(Map<String, Object> configs, String key) {
        return configs.keySet().stream().anyMatch(candidate -> filterPrefixMapKey(key, candidate));
    }


    // -------------------------------------------------------


    // 配置匹配类型, 将 Object 还原成原生功能 -------------------


    /**
     * 转化为枚举对象
     */
    @SuppressWarnings("unchecked")
    public static <E extends Enum<?>> E convertToEnum(Object o, Class<E> clazz) {
        if (o.getClass().equals(clazz)) {
            return (E) o;
        }
        return Arrays.stream(clazz.getEnumConstants())
                .filter(e -> e.toString()
                        .toLowerCase(Locale.ROOT)
                        .equals(o.toString().toUpperCase(Locale.ROOT))
                ).findAny()
                .orElseThrow(() -> new IllegalArgumentException(
                        String.format(
                                "Could not parse value for enum %s. Expected one of: [%s]",
                                clazz, Arrays.toString(clazz.getEnumConstants()))
                ));
    }


    /**
     * 强制转化为字符串
     * 注: 这里我和官方不一致, 官方内部内部嵌套 YAML 类格式化转化; 额外扩展出来又是很多知识点, 所以这里做精简处理
     */
    public static String convertToString(Object o) {
        if (o.getClass() == String.class) {
            return (String) o;
        } else {
            throw new ClassCastException("Convert to String failed: " + o.getClass().getName());
        }
    }


    /**
     * 强制转化为Integer
     */
    public static Integer convertToInteger(Object o) {
        if (o.getClass() == Integer.class) {
            return (Integer) o;
        } else if (o.getClass() == Long.class) {
            long v = (Long) o;
            if (v < Integer.MIN_VALUE || v > Integer.MAX_VALUE) {
                return (int) v;
            } else {
                throw new IllegalArgumentException(
                        String.format(
                                "Configuration value %s overflows/underflow the integer type.",
                                v));
            }
        }
        return Integer.parseInt(o.toString());
    }

    /**
     * 强制转化为Long
     */
    public static Long convertToLong(Object o) {
        if (o.getClass() == Long.class) {
            return (Long) o;
        } else if (o.getClass() == Integer.class) {
            return ((Integer) o).longValue();
        }
        return Long.parseLong(o.toString());
    }

    /**
     * 强制转化为Float
     */
    public static Float convertToFloat(Object o) {
        if (o.getClass() == Float.class) {
            return (Float) o;
        } else if (o.getClass() == Double.class) {
            double v = (Double) o;
            if (v == 0.0
                    || (v >= Float.MIN_VALUE && v <= Float.MAX_VALUE)
                    || (v >= -Float.MAX_VALUE && v <= -Float.MIN_VALUE)) {
                return (float) v;
            } else {
                throw new IllegalArgumentException(
                        String.format(
                                "Configuration value %s overflows/underflow the float type.",
                                v));
            }
        }
        return Float.parseFloat(o.toString());
    }

    /**
     * 强制转化为Double
     */
    public static Double convertToDouble(Object o) {
        if (o.getClass() == Double.class) {
            return (Double) o;
        } else if (o.getClass() == Float.class) {
            return ((Float) o).doubleValue();
        }
        return Double.parseDouble(o.toString());
    }


    /**
     * 强制转化为Double
     * 注: 这里采用了 Java17 的 switch 匹配方法
     */
    public static Boolean convertToBoolean(Object o) {
        if (o.getClass() == Boolean.class) {
            return (Boolean) o;
        }
        return switch (o.toString().toUpperCase()) {
            case "TRUE" -> true;
            case "FALSE" -> false;
            default -> throw new IllegalArgumentException(
                    String.format(
                            "Unrecognized option for boolean: %s. Expected either true or false(case insensitive)",
                            o));
        };
    }


    /**
     * 把指定配置KEY的值还原成原生 String 类型
     */
    public static Map<String, String> convertToPropertiesPrefixed(
            Map<String, Object> configs,
            String key
    ) {
        final String prefixKey = key + ".";
        return configs.keySet().stream()
                .filter(k -> k.startsWith(prefixKey))
                .collect(Collectors.toMap(
                        k -> k.substring(prefixKey.length()),
                        k -> convertToString(configs.get(k))
                ));
    }


    /**
     * 匹配关联的配置并且删除
     */
    public static boolean removePrefixMap(Map<String, Object> configs, String key) {
        // 提取配置当中相似KEY并构成列表
        final List<String> prefixKeys = configs
                .keySet()
                .stream()
                .filter(candidate -> filterPrefixMapKey(key, candidate))
                .toList();

        // 删除关联的KEY, 并确认匹配满足删除条件
        prefixKeys.forEach(configs::remove);
        return !prefixKeys.isEmpty();
    }

    // -------------------------------------------------------
}

这里是自己提取出来的自定义配置工具, 官方的比较复杂不好剖析内部代码学习, 所以最后处理几个必要方法, 最后粗略编写 Configuration

import org.meteorcat.fusion.util.ConfigurationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serial;
import java.io.Serializable;
import java.util.*;
import java.util.function.BiFunction;


/**
 * 通用配置功能类
 */
public class Configuration implements
        Serializable, // Java序列化实现
        Cloneable, // Java Clone 对象复制
        //ReadableConfig,  // 自定义的读取配置接口
        WritableConfig // 自定义的写入配置接口

{

    /**
     * 序列化的版本ID
     */
    @Serial
    private static final long serialVersionUID = 1L;

    /**
     * 日志打印对象
     */
    private static final Logger LOG = LoggerFactory.getLogger(Configuration.class);


    /**
     * 加载的配置列表
     */
    protected final HashMap<String, Object> configs;


    /**
     * 将配置序列化之后的附加上的值
     * ordinal() 从 0 - Integer.MAX_VALUE 开始, 所以可以提取内部 int 值作为类型标识
     */
    public enum Type {
        String,
        Integer,
        Long,
        Boolean,
        Float,
        Double,
        Bytes;
    }

    /**
     * 默认无参数初始化构造方法
     */
    public Configuration() {
        this.configs = new HashMap<>();
    }

    /**
     * 带初始化容量的构造方法
     */
    public Configuration(int initialCapacity) {
        this.configs = new HashMap<>(initialCapacity);
    }

    /**
     * 复制其他类初始化
     */
    public Configuration(Configuration other) {
        this.configs = new HashMap<>(other.configs);
    }


    /**
     * 静态复制集合类对象
     */
    public static Configuration fromMap(Map<String, String> map) {
        final Configuration config = new Configuration(map.size());
        map.forEach(config::setString);
        return config;
    }

    /**
     * 获取所有配置Set
     */
    public Set<String> getKeys() {
        synchronized (this.configs) {
            return new HashSet<>(this.configs.keySet());
        }
    }


    /**
     * 设置配置值
     */
    public void setString(String key, String value) {
        setValueInternal(key, value);
    }

    /**
     * 获取配置值
     */
    public String getString(String key, String defaultValue) {
        return getRawValue(key)
                .map(ConfigurationUtils::convertToString)
                .orElse(defaultValue);
    }


    /**
     * Cloneable 需要实现的的对象复制
     */
    @Override
    public Configuration clone() {
        Configuration config = new Configuration(this.configs.size());
        config.addAll(this);
        return config;
    }

    /**
     * 填充配置到内部集合
     * 注: 小心不要 addAll(Configuration other, String prefix) 和 addAll(Configuration other) 同时调用, 会导致线程死锁
     */
    public void addAll(Configuration other, String prefix) {
        final StringBuilder builder = new StringBuilder();
        builder.append(prefix);
        final int pl = builder.length();


        synchronized (this.configs) {
            synchronized (other.configs) {
                for (Map.Entry<String, Object> entry : other.configs.entrySet()) {
                    builder.setLength(pl);
                    builder.append(entry.getKey());
                    this.configs.put(builder.toString(), entry.getValue());
                }
            }
        }
    }

    /**
     * 填充配置到内部集合
     */
    public void addAll(Configuration other) {
        synchronized (this.configs) {
            synchronized (other.configs) {
                this.configs.putAll(other.configs);
            }
        }
    }


    /**
     * 配置项写入内部配置类内部
     */
    @Override
    public <T> Configuration set(ConfigOption<T> option, T value) {
        final boolean canBePrefixMap = ConfigurationUtils.canBePrefixMap(option);
        setValueInternal(option.key(), value, canBePrefixMap);
        return this;
    }

    /**
     * 匹配对象
     */
    public boolean containsKey(String key) {
        synchronized (this.configs) {
            return this.configs.containsKey(key);
        }
    }


    /**
     * 遍历所有配置回调
     */
    private <T> Optional<T> applyWithOption(
            ConfigOption<?> option,
            BiFunction<String, Boolean, Optional<T>> applier
    ) {
        final boolean canBePrefixMap = ConfigurationUtils.canBePrefixMap(option);
        final Optional<T> valueFromExactKey = applier.apply(option.key(), canBePrefixMap);
        if (valueFromExactKey.isPresent()) {
            return valueFromExactKey;
        } else if (option.hasFallbackKeys()) {
            // try the fallback keys
            for (FallbackKey fallbackKey : option.fallbackKeys()) {
                final Optional<T> valueFromFallbackKey =
                        applier.apply(fallbackKey.getKey(), canBePrefixMap);
                if (valueFromFallbackKey.isPresent()) {
                    loggingFallback(fallbackKey, option);
                    return valueFromFallbackKey;
                }
            }
        }
        return Optional.empty();
    }


    /**
     * 配置过时的异常日志记录
     */
    private void loggingFallback(FallbackKey fallbackKey, ConfigOption<?> configOption) {
        if (fallbackKey.isDeprecated()) {
            LOG.warn(
                    "Config uses deprecated configuration key '{}' instead of proper key '{}'",
                    fallbackKey.getKey(),
                    configOption.key());
        } else {
            LOG.info(
                    "Config uses fallback configuration key '{}' instead of key '{}'",
                    fallbackKey.getKey(),
                    configOption.key());
        }
    }


    // 原生类转化 ------------------------------------------------------

    /**
     * 获取配置当中的原生值
     */
    public Optional<Object> getRawValue(String key) {
        return getRawValue(key, false);
    }

    /**
     * 获取匹配KEY的原生值
     */
    public Optional<Object> getRawValue(String key, boolean canBePrefixMap) {
        if (key == null) throw new NullPointerException("Configuration Key not be null.");

        // 注意, 这里要做好线程同步操作, 配置类是会被多线程调用的
        synchronized (this.configs) {
            final Object valueFromExactKey = this.configs.get(key);
            if (!canBePrefixMap || valueFromExactKey != null) {
                return Optional.ofNullable(valueFromExactKey);
            }

            // 匹配出关联配置, 如果匹配到返回对应配置列表
            final Map<String, String> valueFromPrefixMap = ConfigurationUtils.convertToPropertiesPrefixed(configs, key);
            if (valueFromPrefixMap.isEmpty()) {
                return Optional.empty();
            }
            return Optional.of(valueFromPrefixMap);
        }
    }


    /**
     * 替换对应配置项的值
     */
    <T> void setValueInternal(String key, T value, boolean canBePrefixMap) {
        if (key == null) throw new NullPointerException("Key not be null.");
        if (value == null) throw new NullPointerException("Value cannot be null.");

        // 保持跨线程安全
        synchronized (this.configs) {
            if (canBePrefixMap) {
                ConfigurationUtils.removePrefixMap(this.configs, key);
            }
            this.configs.put(key, value);
        }
    }

    /**
     * 替换对应单独配置项的值
     */
    <T> void setValueInternal(String key, T value) {
        setValueInternal(key, value, false);
    }

    // ----------------------------------------------------------------


    // 重载系统所需 -----------------------------------------------------

    /**
     * 按照KEY字符串所有列表生成实例化唯一哈希值
     * 注: 这里可以学习下, 用于设计后续包装列表对象的的唯一哈希值生成
     */
    @Override
    public int hashCode() {
        int hash = 0;
        for (String s : this.configs.keySet()) {
            hash ^= s.hashCode();
        }
        return hash;
    }

    /**
     * 比较匹配重载功能
     */
    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        } else if (obj instanceof Configuration) {
            // 用来做 Configuration 内部配置比较, 用于处理相同配置项但是不同实例化的情况
            Map<String, Object> otherConfigs = ((Configuration) obj).configs;
            for (Map.Entry<String, Object> entry : otherConfigs.entrySet()) {
                Object thisValue = entry.getValue();
                Object otherValue = otherConfigs.get(entry.getKey());

                if (!thisValue.getClass().equals(byte[].class)) {
                    if (!thisValue.equals(otherValue)) {
                        return false;
                    }
                } else if (otherValue.getClass().equals(byte[].class)) {
                    if (!Arrays.equals((byte[]) thisValue, (byte[]) otherValue)) {
                        return false;
                    }
                } else {
                    return false;
                }
            }
            return true;
        } else {
            return false;
        }
    }

    /**
     * 转化字符串
     * 注: 这里 flink采用另外的包装模式, 如果要单独拿出来讲也很麻烦, 所以这里也精简处理
     */
    @Override
    public String toString() {
        return this.configs.toString();
    }

    // ----------------------------------------------------------------


}

这里就是最后做了许多精简化的配置功能, 实际上内部还有很多强关联依赖的功能类需要去阅读查看; 包括配置项的 addAll 在多线程情况要小心死锁的问题, 有些是必须阅读源码才能了解.

最后放上测试单元确认配置:


import org.junit.Test;
import org.meteorcat.fusion.configuration.ConfigOption;
import org.meteorcat.fusion.configuration.Configuration;
import org.meteorcat.fusion.util.ConfigurationUtils;

import java.util.HashMap;

/**
 * 配置测试单元
 */
public class ConfigurationTest {

    /**
     * 写入单项配置到内部配置集合
     */
    @Test
    public void writeConfigOption() {
        // 可以忽略 No SLF4J providers were found 错误, 因为想没有直接配置 SLF4J 日志对象绑定
        Configuration config = new Configuration();
        ConfigOption<Boolean> enabled = ConfigurationUtils.getBooleanConfigOption("fs.search.enabled");
        config.set(enabled, true);
        assert config.containsKey("fs.search.enabled");
        System.out.println(config);
    }


    /**
     * 列表对象转化为配置
     */
    @Test
    public void writeConfigOptions() {
        Configuration configuration = Configuration.fromMap(new HashMap<>() {{
            put("fs.search.enabled", "true");
            put("net.hostname", "localhost");
            put("net.port", "8080");
        }});
        assert configuration.getKeys().size() == 3;
        System.out.println(configuration);
    }
}

这个配置类虽然做了很多精简, 但是内部的编程思想很值得借鉴, 但是回过头来就可以看到有些复杂, 但这就是为了通用性所必须要做的牺牲.