MeteorCat / 学习Flink开源库源码(二)

Created Wed, 02 Jul 2025 02:37:19 +0800 Modified Wed, 29 Oct 2025 23:25:00 +0800
8184 Words

接下来我们想看下 POMmodules 节点上面的子模块, 首先 flink-annotations 可以先排除, 里面基本都是一些实验性注解和版本信息, 不过如果参与开源开发的话这个功能就比较有用.

可以学习下 FlinkVersion 版本文件设计, 这个文件在其他面向对象编程语言都能改动下就能使用(建议学习)

那么可以参照官方文档的例子来一步步解构内部源码:

// pojo class WordWithCount
public class WordWithCount {
    public String word;
    public int count;

    public WordWithCount() {}
    
    public WordWithCount(String word, int count) {
        this.word = word;
        this.count = count;
    }
}

// main method
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream(host, port);
DataStream<WordWithCount> windowCounts = text
    .flatMap(
        (FlatMapFunction<String, String>) (line, collector) 
            -> Arrays.stream(line.split("\\s")).forEach(collector::collect)
    ).returns(String.class)
    .map(word -> new WordWithCount(word, 1)).returns(TypeInformation.of(WordWithCount.class))
    .keyBy(wordWithCnt -> wordWithCnt.word)
    .window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)))
    .sum("count").returns(TypeInformation.of(WordWithCount.class));

windowCounts.print();
env.execute();
}

这里 main method 里面提出首先获取全局的执行环境(StreamExecutionEnvironment), 需要首先展开解析下 flink 怎么去设计全局配置.

StreamExecutionEnvironment

这里官方文档打开之后主要定位到关键静态方法:

import org.apache.flink.configuration.Configuration; // 配置结构体

public class StreamExecutionEnvironment implements AutoCloseable {
    // 其他略

    /** The ThreadLocal used to store {@link StreamExecutionEnvironmentFactory}. */
    private static final ThreadLocal<StreamExecutionEnvironmentFactory>
            threadLocalContextEnvironmentFactory = new ThreadLocal<>();

    // --------------------------------------------------------------------------------------------
    //  Factory methods for ExecutionEnvironments
    // --------------------------------------------------------------------------------------------

    /**
     * Creates an execution environment that represents the context in which the program is
     * currently executed. If the program is invoked standalone, this method returns a local
     * execution environment, as returned by {@link #createLocalEnvironment()}.
     *
     * @return The execution environment of the context in which the program is executed.
     */
    public static StreamExecutionEnvironment getExecutionEnvironment() {
        return getExecutionEnvironment(new Configuration());
    }

    /**
     * Creates an execution environment that represents the context in which the program is
     * currently executed. If the program is invoked standalone, this method returns a local
     * execution environment, as returned by {@link #createLocalEnvironment(Configuration)}.
     *
     * <p>When executed from the command line the given configuration is stacked on top of the
     * global configuration which comes from the {@code config.yaml}, potentially overriding
     * duplicated options.
     *
     * @param configuration The configuration to instantiate the environment with.
     * @return The execution environment of the context in which the program is executed.
     */
    public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
        return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
                .map(factory -> factory.createExecutionEnvironment(configuration))
                .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
    }

    /**
     * Creates a {@link LocalStreamEnvironment}. The local execution environment will run the
     * program in a multi-threaded fashion in the same JVM as the environment was created in. The
     * default parallelism of the local environment is the number of hardware contexts (CPU cores /
     * threads), unless it was specified differently by {@link #setParallelism(int)}.
     *
     * @return A local execution environment.
     */
    public static LocalStreamEnvironment createLocalEnvironment() {
        return createLocalEnvironment(defaultLocalParallelism);
    }

    /**
     * Creates a {@link LocalStreamEnvironment}. The local execution environment will run the
     * program in a multi-threaded fashion in the same JVM as the environment was created in. It
     * will use the parallelism specified in the parameter.
     *
     * @param parallelism The parallelism for the local environment.
     * @return A local execution environment with the specified parallelism.
     */
    public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
        return createLocalEnvironment(
                new Configuration().set(CoreOptions.DEFAULT_PARALLELISM, parallelism));
    }

    /**
     * Creates a {@link LocalStreamEnvironment}. The local execution environment will run the
     * program in a multi-threaded fashion in the same JVM as the environment was created in. It
     * will use the parallelism specified in the parameter.
     *
     * @param parallelism The parallelism for the local environment.
     * @param configuration Pass a custom configuration into the cluster
     * @return A local execution environment with the specified parallelism.
     */
    public static LocalStreamEnvironment createLocalEnvironment(
            int parallelism, Configuration configuration) {
        Configuration copyOfConfiguration = new Configuration();
        copyOfConfiguration.addAll(configuration);
        copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
        return createLocalEnvironment(copyOfConfiguration);
    }

    /**
     * Creates a {@link LocalStreamEnvironment}. The local execution environment will run the
     * program in a multi-threaded fashion in the same JVM as the environment was created in.
     *
     * @param configuration Pass a custom configuration into the cluster
     * @return A local execution environment with the specified parallelism.
     */
    public static LocalStreamEnvironment createLocalEnvironment(Configuration configuration) {
        if (configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).isPresent()) {
            return new LocalStreamEnvironment(configuration);
        } else {
            Configuration copyOfConfiguration = new Configuration();
            copyOfConfiguration.addAll(configuration);
            copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, defaultLocalParallelism);
            return new LocalStreamEnvironment(copyOfConfiguration);
        }
    }
}

这里内部已经看到全局配置是怎么保存的, 那就是利用 ThreadLocal 当作本地线程上下文做全局保存; 这里的 Utils.resolveFactory 加载工厂方法也是建议学习, 其实就是利用泛型来验证当前 ThreadLocal 时候有该对象 如果存在该对象就代表目前上下文已经可用, 否则就覆盖替换成当 ThreadLocal 上下文:

public final class Utils {
    /**
     * Resolves the given factories. The thread local factory has preference over the static
     * factory. If none is set, the method returns {@link Optional#empty()}.
     *
     * @param threadLocalFactory containing the thread local factory
     * @param staticFactory containing the global factory
     * @param <T> type of factory
     * @return Optional containing the resolved factory if it exists, otherwise it's empty
     */
    public static <T> Optional<T> resolveFactory(
            ThreadLocal<T> threadLocalFactory, @Nullable T staticFactory) {
        final T localFactory = threadLocalFactory.get();
        final T factory = localFactory == null ? staticFactory : localFactory;

        return Optional.ofNullable(factory);
    }
}

建议学习一下这里的代码逻辑, 可以借鉴设计出日常功能核心: 全局配置

这里面的环境涉及之后能够看到主要初始化的对象为 Configuration 类结构, 我们直接检索下全项目确认该类属于 flink-core, 这个包正好我们 modules 加载的前排核心包之一, 也就是我们需要学习主要底层源码代码.

Configuration

这里面的配置结构相关的类内部自定义的 flink 结构扩展:

  • ExecutionConfig.GlobalJobParameters
  • ReadableConfig
  • WritableConfig
import java.io.Serial;

/**
 * 这里面的  ReadableConfig 和 WritableConfig 就是很值得学习的抽象通用读写对象
 */
public class Configuration extends ExecutionConfig.GlobalJobParameters
        implements IOReadableWritable,
        java.io.Serializable,
        Cloneable,
        ReadableConfig,
        WritableConfig {


    @Serial
    private static final long serialVersionUID = 1L;

    /**
     * Stores the concrete key/value pairs of this configuration object.
     *
     * <p>NOTE: This map stores the values that are actually used, and does not include any escaping
     * that is required by the standard YAML syntax.
     */
    protected final HashMap<String, Object> confData;


    /** Creates a new empty configuration. */
    public Configuration() {
        this.confData = new HashMap<>();
    }

    /**
     * Creates a new configuration with the copy of the given configuration.
     *
     * @param other The configuration to copy the entries from.
     */
    public Configuration(Configuration other) {
        this.confData = new HashMap<>(other.confData);
    }


    @Override
    public Configuration clone() {
        Configuration config = new Configuration();
        config.addAll(this);
        return config;
    }
}

获取全局环境 StreamExecutionEnvironment.getExecutionEnvironment() 就是判断当前本地 ThreadLocal 是否存在 Configuration 对象, 如果不存在就调用初始化挂载在 ThreadLocal 之中, 存在就是保持该对象返回.

那么现在就考虑我们怎么去设计配置类对象, 学习下怎么构建个全局配置类对象; 这里最好对照开源代码的逻辑自己去实现, 边编写边思考为什么要这样处理才能有利于吸收这些知识.

这里是我参照 flink 自己升级的 POM 根文件配置:

<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at
  http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<!-- 因为引用 apache-2.0, 所以在创建项目之中必须在显眼地方附带上开源声明 -->
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
         xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">

    <!-- 常规的包信息 -->
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.meteorcat.fusion</groupId>
    <artifactId>fusion-parent</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- 详情信息 -->
    <name>Fusion :</name>
    <packaging>pom</packaging>
    <url>https://www.meteorcat.net/tags/flink/</url>
    <inceptionYear>2025</inceptionYear>


    <!-- 开源许可 -->
    <licenses>
        <license>
            <name>The Apache Software License, Version 2.0</name>
            <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
            <distribution>repo</distribution>
        </license>
    </licenses>

    <!-- 源代码信息 -->
    <scm>
        <url>https://github.com/MeteorGX/fusion-parent</url> <!-- 源代码管理地址 -->
        <connection>[email protected]:MeteorGX/fusion-parent.git</connection> <!-- 源代码公共管理仓库 -->
        <developerConnection>scm:git:https://github.com/MeteorGX/fusion-parent.git</developerConnection>
    </scm>


    <!-- 远程项目依赖, 也就是第三方包会优先从这些第三方获取依赖项 -->
    <!-- 注: 因为中国特殊的网络原因, 所以有时候第三方拉取会报错-->
    <repositories>
        <repository>
            <id>redhat</id>
            <url>https://maven.repository.redhat.com/ga/</url>
        </repository>

        <repository>
            <id>repository.jboss.org</id>
            <url>https://repository.jboss.org/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>false</enabled>
            </releases>
        </repository>
    </repositories>


    <!-- POM子项目依赖 -->
    <modules>
        <!-- 这里的子库省略等后续展开说明 -->
        <module>fusion-core</module>
    </modules>


    <!-- 全局的 Maven 属性定义 -->
    <properties>
        <!-- 比较常规的字符集声明 -->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <!-- Java平台版本声明, 这里我直接采用 Java17LTS 构建 -->
        <source.java.version>17</source.java.version><!-- 编译选择的版本 -->
        <target.java.version>17</target.java.version><!-- 运行生成的版本 -->
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>

        <!-- Apache工具集版本 -->
        <commons-lang.version>3.17.0</commons-lang.version>
        <commons-text.version>1.13.1</commons-text.version>
        <commons-math.version>3.6.1</commons-math.version>
        <commons-compress.version>1.27.1</commons-compress.version>
        <commons-collections.version>4.4</commons-collections.version>

        <!-- 日志库版本锁定 -->
        <!-- Java日志 Facade 和实现 -->
        <slf4j.version>2.0.17</slf4j.version>
        <log4j.version>2.25.0</log4j.version>

        <!-- 引入 scala 方便支持 actor 组件 -->
        <!-- 这里我主要参照 pekko 官方处理: https://pekko.apache.org/docs/pekko/current/typed/guide/tutorial_1.html -->
        <!-- 还有结合 scala 官方文档的配置: https://docs.scala-lang.org/tutorials/scala-with-maven.html -->
        <scala.version>2.13</scala.version>
        <scala.binary.version>2.13</scala.binary.version>
        <scala.test.version>3.2.19</scala.test.version>

        <!-- 引入测试单元框架 -->
        <!-- 这里锁定官方比较多人用的 JUint5, 抛弃 flink 内部支持的 JUint4: https://github.com/junit-team/junit-framework/ -->
        <!-- 这里可以查询锁定比较多人用且没有漏洞的版本: https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-api -->
        <junit5.version>5.13.2</junit5.version>


        <!-- 这里我是采用本地个人开发, 所以不配置 spotless 的代码风格统一检查 -->
    </properties>

    <!-- 通用的第三方依赖库 -->
    <dependencies>

        <!-- Logging 基础 API -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </dependency>

        <!-- JUint的核心模块 -->
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- JUint的测试调度模块 -->
        <dependency>
            <groupId>org.junit.vintage</groupId>
            <artifactId>junit-vintage-engine</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- 把 slf4j 日志连接到 log4j 实现, scope 则是声明限定为 test 测试框架之中 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- log4j 核心框架接口实现, 提供基础的抽象通用功能接口实现, scope 则是声明限定为 test 测试框架之中 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- log4j 核心的基础模块, scope 则是声明限定为 test 测试框架之中 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>


    <!-- 第三方的包版本控制, 用来定义锁定包版本 -->
    <dependencyManagement>
        <dependencies>

            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${slf4j.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
                <version>${log4j.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>${log4j.version}</version>
            </dependency>


            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>${log4j.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-layout-template-json</artifactId>
                <version>${log4j.version}</version>
            </dependency>

            <!-- For dependency convergence -->
            <dependency>
                <groupId>org.junit</groupId>
                <artifactId>junit-bom</artifactId>
                <version>${junit5.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>

            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-reflect</artifactId>
                <version>${scala.version}</version>
            </dependency>

            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-compiler</artifactId>
                <version>${scala.version}</version>
            </dependency>

            <dependency>
                <groupId>org.scalatest</groupId>
                <artifactId>scalatest_${scala.binary.version}</artifactId>
                <version>${scala.test.version}</version>
                <scope>test</scope>
            </dependency>


            <!-- apache 工具集 -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>${commons-lang.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-text</artifactId>
                <version>${commons-text.version}</version>
            </dependency>


            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-math3</artifactId>
                <version>${commons-math.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-compress</artifactId>
                <version>${commons-compress.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-collections4</artifactId>
                <version>${commons-collections.version}</version>
            </dependency>

        </dependencies>
    </dependencyManagement>


    <!-- 打包设置 -->
    <build>

        <!-- 插件引入 -->
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
            </plugin>

        </plugins>


        <!-- 插件库版本锁定 -->
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                    <configuration>
                        <source>${source.java.version}</source>
                        <target>${target.java.version}</target>
                        <!-- 是否启动增量编译(false或者不配置默认就是禁用), 虽然能够减少编译时间, 但是有时候莫名其妙会导致编译少了东西 -->
                        <useIncrementalCompilation>false</useIncrementalCompilation>
                        <compilerArgs>
                            <arg>-Xpkginfo:always</arg>
                            <!-- 用来避免Java9以上的反射模块在调用内部包是否的权限问题 -->
                            <!-- JDK9(java.management|java.rmi 等)默认对内部包(以 sun. 或 com.sun. 开头)进行封装, 外部代码无法通过反射、类加载等方式访问, 否则会抛出权限异常 -->
                            <!-- 注: 许多第三方库|框架|旧代码等 hook 监控工具依赖于 JDK 内部 API, 如果没有加入这种配置可能会导致权限不足报错 -->
                            <arg>--add-exports=java.management/sun.management=ALL-UNNAMED</arg>
                            <arg>--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED</arg>
                            <arg>--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED</arg>
                        </compilerArgs>
                    </configuration>
                </plugin>
            </plugins>

        </pluginManagement>
    </build>

</project>

之后我们按照 flink 的规范, 去设计我们自己的 core 底层依赖; 这里我取名设定为 fusion-core, 用来编写自己的底层工具库, 内部的 POM 文件我参照 flink-core 如下:

<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <!-- 基础信息 -->
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.meteorcat.fusion</groupId>
        <artifactId>fusion-parent</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <!-- 包详情信息 -->
    <artifactId>fusion-core</artifactId>
    <name>Fusion : Core</name>
    <packaging>jar</packaging>


    <!-- 全局属性 -->
    <properties>
        <!-- 这里不需要处理多余属性 -->
    </properties>


    <!-- 第三方包 -->
    <dependencies>

        <!-- standard utilities -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <!-- managed version -->
        </dependency>

        <!-- standard utilities -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-text</artifactId>
            <!-- managed version -->
        </dependency>

        <!-- The common collections are needed for some hash tables used in the collection execution -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <!-- managed version -->
        </dependency>

        <!-- Commons compression, for additional decompressors -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-compress</artifactId>
            <!-- managed version -->
        </dependency>

    </dependencies>


    <!-- 打包设置 -->
    <build>
        <!-- 刚开始没什么需要处理 -->
    </build>

</project>

flink 配置 Configuration 类设计很巧妙, 但是理解起来有点绕; 需要注意 flink 的的配置文件是采用 HOCON(Human-Optimized Config Object Notation) 方式处理, 格式如下:

# 配置文件格式如下
akka {
  actor {
    loglevel = "INFO"  # 日志级别
  }
}

# 而类功能加载读写如下
akka.actor.loglevel=INFO

所以要求内部配置必须采用 xxx.yyy.zzz=value 这种方式来构建, 那就显而易见这个类其实就是用来做语法解析(抽象语法树AST).

其实就是手写以点分割的配置解析器, 这种配置也是方便与 pekko 的配置同步

这里面配置分为以下部分, 可以通过 flink-core 内部类文件学习:

  • org.apache.flink.configuration.Configuration: 主要配置类
  • org.apache.flink.configuration.WritableConfig: 抽象写入接口
  • org.apache.flink.configuration.ReadableConfig: 抽象读取接口
  • org.apache.flink.configuration.ConfigOption: 配置容器对象

以上就是构建配置语法树的基本, 特别是 ConfigOption 就是主要解析处理工具类, 建议深入理解下这个类思路和功能, 我这里参照这个功能类并按照自己思考来设计属于自己的配置类:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.meteorcat.fusion.configuration;

import org.meteorcat.fusion.configuration.description.Description;

import java.util.Arrays;
import java.util.Collections;
import java.util.stream.Stream;

/**
 * 全局配置项, 用于保存配置数据对象
 * 内部 fallback keys 主要用于向后兼容处理
 * 假如老配置是 aaa.bbb.ccc 切换为 ddd.bbb.ccc 的时候, 可以通过配置其别名达到兼容处理
 * <pre>
 * // 这里就是 fallback keys 用来做兼容处理的配置
 * ConfigOption<String> NEW_CONFIG = ConfigOptions.key("new.config.name")
 *     .stringType()
 *     .fallbackKeys("old.config.name")  // 旧键名作为备选
 *     .defaultValue("default");
 * </pre>
 *
 * @param <T>
 */
public class ConfigOption<T> {

    /**
     * 空的备用兼容性配置
     */
    private static final FallbackKey[] EMPTY = new FallbackKey[0];

    /**
     * 配置详情默认内容为空
     */
    static final Description EMPTY_DESCRIPTION = Description.builder().text("").build();


    // ------------------------------------------------------------------------

    /**
     * 当前配置的KEY对象
     */
    private final String key;


    /**
     * 当前配置的默认值对象
     */
    private final T defaultValue;


    /**
     * 当前配置的详细信息
     */
    private final Description description;


    /**
     * 保持兼容性的配置列表
     */
    private final FallbackKey[] fallbackKeys;


    /**
     * 具体的配置匹配类, 这里通过泛型匹配获取到配置类型
     *
     * <ul>
     *   <li>typeClass == atomic class (e.g. {@code Integer.class}) -> {@code ConfigOption<Integer>}
     *   <li>typeClass == {@code Map.class} -> {@code ConfigOption<Map<String, String>>}
     *   <li>typeClass == atomic class and isList == true for {@code ConfigOption<List<Integer>>}
     * </ul>
     */
    private final Class<?> clazz;


    /**
     * 判断配置的内部数据结构是否为列表对象
     */
    private final boolean isList;

    // ------------------------------------------------------------------------

    /**
     * 获取配置当中记录的值类型
     */
    Class<?> getClazz() {
        return clazz;
    }

    /**
     * 判断配置当中的值对象是否为列表对象
     */
    boolean isList() {
        return isList;
    }

    /**
     * Creates a new config option with fallback keys.
     * 创建一个新的配置对象, 如果配置变动过需要追加 fallback key 兼容配置列表
     *
     * @param key          The current key for that config option
     * @param clazz        describes type of the ConfigOption, see description of the clazz field
     * @param description  Description for that option
     * @param defaultValue The default value for this option
     * @param isList       tells if the ConfigOption describes a list option, see description of the clazz
     *                     field
     * @param fallbackKeys The list of fallback keys, in the order to be checked
     */
    ConfigOption(
            String key,
            Class<?> clazz,
            Description description,
            T defaultValue,
            boolean isList,
            FallbackKey... fallbackKeys
    ) {

        // 这里我和 flink 不一样, 官方专门设计 checkNotNull 全局函数用于判断是否空指针, 我这里直接编写
        if (key == null) throw new NullPointerException();
        if (clazz == null) throw new NullPointerException();
        this.key = key;
        this.description = description;
        this.defaultValue = defaultValue;
        this.fallbackKeys = fallbackKeys == null || fallbackKeys.length == 0 ? EMPTY : fallbackKeys;
        this.clazz = clazz;
        this.isList = isList;
    }

    // ------------------------------------------------------------------------

    /**
     * Creates a new config option, using this option's key and default value, and adding the given
     * fallback keys.
     * 创建写入 fallback key 标识的配置项
     *
     * <p>When obtaining a value from the configuration via {@link
     * Configuration#getValue(ConfigOption)}, the fallback keys will be checked in the order
     * provided to this method. The first key for which a value is found will be used - that value
     * will be returned.
     *
     * @param fallbackKeys The fallback keys, in the order in which they should be checked.
     * @return A new config options, with the given fallback keys.
     */
    public ConfigOption<T> withFallbackKeys(String... fallbackKeys) {
        final Stream<FallbackKey> newFallbackKeys =
                Arrays.stream(fallbackKeys).map(FallbackKey::createFallbackKey);
        final Stream<FallbackKey> currentAlternativeKeys = Arrays.stream(this.fallbackKeys);

        // put fallback keys first so that they are prioritized
        final FallbackKey[] mergedAlternativeKeys =
                Stream.concat(newFallbackKeys, currentAlternativeKeys).toArray(FallbackKey[]::new);
        return new ConfigOption<>(
                key, clazz, description, defaultValue, isList, mergedAlternativeKeys);
    }

    /**
     * Creates a new config option, using this option's key and default value, and adding the given
     * deprecated keys.
     * 用于追加过时的配置, 用于保持兼容性配置
     *
     * <p>When obtaining a value from the configuration via {@link
     * Configuration#getValue(ConfigOption)}, the deprecated keys will be checked in the order
     * provided to this method. The first key for which a value is found will be used - that value
     * will be returned.
     *
     * @param deprecatedKeys The deprecated keys, in the order in which they should be checked.
     * @return A new config options, with the given deprecated keys.
     */
    public ConfigOption<T> withDeprecatedKeys(String... deprecatedKeys) {
        final Stream<FallbackKey> currentAlternativeKeys = Arrays.stream(this.fallbackKeys);
        final Stream<FallbackKey> newDeprecatedKeys =
                Arrays.stream(deprecatedKeys).map(FallbackKey::createDeprecatedKey);

        // put deprecated keys last so that they are de-prioritized
        final FallbackKey[] mergedAlternativeKeys =
                Stream.concat(currentAlternativeKeys, newDeprecatedKeys)
                        .toArray(FallbackKey[]::new);
        return new ConfigOption<>(
                key, clazz, description, defaultValue, isList, mergedAlternativeKeys);
    }

    /**
     * Creates a new config option, using this option's key and default value, and adding the given
     * description. The given description is used when generation the configuration documentation.
     * 对配置项追加附带上详情内容
     *
     * @param description The description for this option.
     * @return A new config option, with given description.
     */
    public ConfigOption<T> withDescription(final String description) {
        return withDescription(Description.builder().text(description).build());
    }

    /**
     * Creates a new config option, using this option's key and default value, and adding the given
     * description. The given description is used when generation the configuration documentation.
     *
     * @param description The description for this option.
     * @return A new config option, with given description.
     */
    public ConfigOption<T> withDescription(final Description description) {
        return new ConfigOption<>(key, clazz, description, defaultValue, isList, fallbackKeys);
    }

    // ------------------------------------------------------------------------

    /**
     * 获取配置的Key对象
     *
     * @return The configuration key
     */
    public String key() {
        return key;
    }

    /**
     * 判断配置时候有默认值
     *
     * @return True if it has a default value, false if not.
     */
    public boolean hasDefaultValue() {
        return defaultValue != null;
    }

    /**
     * 获取默认值对象
     *
     * @return The default value, or null.
     */
    public T defaultValue() {
        return defaultValue;
    }

    /**
     * 判断是否带有向后兼容配置
     *
     * @return True if the option has fallback keys, false if not.
     */
    public boolean hasFallbackKeys() {
        return fallbackKeys != EMPTY;
    }

    /**
     * 获取当前配置之中所有兼容配置项
     *
     * @return The option's fallback keys.
     */
    public Iterable<FallbackKey> fallbackKeys() {
        return (fallbackKeys == EMPTY) ? Collections.emptyList() : Arrays.asList(fallbackKeys);
    }

    /**
     * 获取目前配置当中的详情说明
     *
     * @return The option's description.
     */
    public Description description() {
        return description;
    }

    // ------------------------------------------------------------------------

    /**
     * 重载对象比较功能
     */
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        } else if (o != null && o.getClass() == ConfigOption.class) {
            ConfigOption<?> that = (ConfigOption<?>) o;
            return this.key.equals(that.key)
                    && Arrays.equals(this.fallbackKeys, that.fallbackKeys)
                    && (this.defaultValue == null
                    ? that.defaultValue == null
                    : (that.defaultValue != null
                    && this.defaultValue.equals(that.defaultValue)));
        } else {
            return false;
        }
    }

    /**
     * 重载HashCode功能
     */
    @Override
    public int hashCode() {
        return 31 * key.hashCode()
                + 17 * Arrays.hashCode(fallbackKeys)
                + (defaultValue != null ? defaultValue.hashCode() : 0);
    }

    /**
     * 重载ToString
     */
    @Override
    public String toString() {
        return String.format(
                "Key: '%s' , default: %s (fallback keys: %s)",
                key, defaultValue, Arrays.toString(fallbackKeys));
    }
}

这里的关键类对象, 对比 flink 的模块功能是以下文件:

  • org.apache.flink.configuration.FallbackKey: 先后兼容的KEY配置
  • org.apache.flink.configuration.description.Description: 配置的详情内容, 这个类编写也比较复杂, 需要去理解处理

FallbackKey 配置类功能如下, 比较简单的代码:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.meteorcat.fusion.configuration;

/**
 * 兼容性的配置Key配置, 用来标识配置是变动修改还是已过时的配置
 * 配置类当中基本会遇到的情况: 1.配置KEY名称变动 2.配置已经过时不再支持
 * <pre>{@code
 *     // 使用方法
 *     FallbackKey fallbackKey = FallbackKey.createFallbackKey("server.hostname");
 *     System.out.println(fallbackKey);
 * }</pre>
 */
public class FallbackKey {

    // -------------------------
    //  工厂方法
    // -------------------------

    /**
     * 静态构建变动的配置KEY
     */
    static FallbackKey createFallbackKey(String key) {
        return new FallbackKey(key, false);
    }

    /**
     * 静态构建过时的配置KEY
     */
    static FallbackKey createDeprecatedKey(String key) {
        return new FallbackKey(key, true);
    }


    // ------------------------------------------------------------------------


    /**
     * 修改重载的配置KEY
     */
    private final String key;

    /**
     * 配置KEY是否已经过时淘汰
     */
    private final boolean isDeprecated;

    /**
     * 获取重载配置KEY
     */
    public String getKey() {
        return key;
    }

    /**
     * 获取是否配置已经过时
     */
    public boolean isDeprecated() {
        return isDeprecated;
    }

    /**
     * 私有化配置构造
     */
    private FallbackKey(String key, boolean isDeprecated) {
        this.key = key;
        this.isDeprecated = isDeprecated;
    }

    /**
     * 重载比较方法
     */
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        } else if (o != null && o.getClass() == FallbackKey.class) {
            FallbackKey that = (FallbackKey) o;
            return this.key.equals(that.key) && (this.isDeprecated == that.isDeprecated);
        } else {
            return false;
        }
    }

    /**
     * 重载HashCode方法
     */
    @Override
    public int hashCode() {
        return 31 * key.hashCode() + (isDeprecated ? 1 : 0);
    }

    /**
     * 重载ToString方法
     */
    @Override
    public String toString() {
        return String.format("{key=%s, isDeprecated=%s}", key, isDeprecated);
    }
}

Description 其实就是给配置项加上注释说明, 因为配置类基本集中于各自包体之中, 所以有时候需要告诉调用者配置具体的作用信息.

也就是避免配置是黑箱, 防止只知道 key 名称(甚至有时候连 key 都不清楚)

其实这个可以省略掉处理, 如果是私人的内部配置基本都是固定的配置, 而且都有单独内部项目文档; 所以如果要节省功夫的话可以直接不管 Description 相关功能, 不过为了学习就需要了解下.

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.meteorcat.fusion.configuration.description;

import java.util.ArrayList;
import java.util.List;

/**
 * 配置详情信息, 内部关键信息需要元素类记录:
 * - BlockElement: 抽象的通用详情内容块, 提供详情内容文本块
 * - InlineElement: 抽象的通用详情内容, 提供以下元素(Element)实现
 * - TextElement: 实现的单行文本内容信息, 单段详情内容
 * - LineBreakElement:实现的内容换行信息, 就是多段 TextElement 追加换行符号
 * - ListElement: 实现的多行列表内容内容, 单纯的多段 TextElement 列表对象
 * - LinkElement: 实现的附带外部URL的内容信息, 也就是有外部访问连接地址
 * 这其实就代表文档的元素: 1.内容(text), 2.换行(linebreak), 3.段落(block)
 * 以上元素之中, 扩展追加的外部链接资料(link)的功能
 * -------------------------------------------
 * 后续可以思考下, 如果要设计 LicenseElement 该怎么处理? 建议试试自己思考编写
 */
public class Description {

    /**
     * 获取详情的所有段落, 包含有文本|换行等特殊Element节点
     */
    private final List<BlockElement> blocks;

    /**
     * 返回详情内容构建器
     */
    public static DescriptionBuilder builder() {
        return new DescriptionBuilder();
    }

    /**
     * 返回详情的所有段落
     */
    public List<BlockElement> getBlocks() {
        return blocks;
    }


    /**
     * 详情内容构建器
     */
    public static class DescriptionBuilder {

        /**
         * 文本段落容器
         */
        private final List<BlockElement> blocks = new ArrayList<>();

        /**
         * Adds a block of text with placeholders ("%s") that will be replaced with proper string
         * representation of given {@link InlineElement}. For example:
         * 格式化文本节点
         * <p>{@code text("This is a text with a link %s", link("https://somepage", "to here"))}
         *
         * @param format   text with placeholders for elements
         * @param elements elements to be put in the text
         * @return description with added block of text
         */
        public DescriptionBuilder text(String format, InlineElement... elements) {
            blocks.add(TextElement.text(format, elements));
            return this;
        }

        /**
         * Creates a simple block of text.
         * 追加一段文本
         *
         * @param text a simple block of text
         * @return block of text
         */
        public DescriptionBuilder text(String text) {
            blocks.add(TextElement.text(text));
            return this;
        }

        /**
         * Block of description add.
         * 追加文本段落
         *
         * @param block block of description to add
         * @return block of description
         */
        public DescriptionBuilder add(BlockElement block) {
            blocks.add(block);
            return this;
        }

        /**
         * Creates a line break in the description.
         * 追加文本换行
         */
        public DescriptionBuilder linebreak() {
            blocks.add(LineBreakElement.linebreak());
            return this;
        }

        /**
         * Adds a bulleted list to the description.
         * 把文本内容列表追加进来
         */
        public DescriptionBuilder list(InlineElement... elements) {
            blocks.add(ListElement.list(elements));
            return this;
        }

        /**
         * Creates description representation.
         * 构建最终的文本内容详情对象
         */
        public Description build() {
            return new Description(blocks);
        }
    }

    /**
     * 私有化构建方法, 不允许外部实例化
     */
    private Description(List<BlockElement> blocks) {
        this.blocks = blocks;
    }
}

这里面注释说明内部采用 XXXElement 对详情内容做扩展衍生, 这里主要先总结抽象节点接口, 他们就是底层基类:

  • DescriptionElement: 所有对象的基类
  • BlockElement: 段落文本详情的基类
  • InlineElement: 单行文本详情的基类
  • Formatter: 具体的格式化器虚类

这里的抽象类其实没什么需要说需要说的, 都是直接定义空接口等待实现就行:

/**
 * 格式化详情的抽象, 用于传递格式化类进行详情格式化
 */
public interface DescriptionElement {
    /**
     * 核心的格式化处理
     */
    void format(Formatter formatter);
}

/**
 * 详情的段落元素节点, 通用的抽象接口用于被继承衍生扩展
 */
public interface BlockElement extends DescriptionElement {
}

/**
 * 详情的单行(行内)元素节点, 通用的抽象接口用于被继承衍生扩展
 */
public interface InlineElement extends DescriptionElement {
}

关键的文本内容格式化 Formatter 类, 这个类就是核心功能类, 用于把所有 Element 做拼接构建成最后的文本内容然后输出:

import java.util.EnumSet;

/**
 * 详情格式化工具
 */
public abstract class Formatter {

    /**
     * 字符串构建器
     */
    private final StringBuilder state = new StringBuilder();


    /**
     * Formats the description into a String using format specific tags.
     * 主要的核心函数入口, 详情类内部的数据最后打包输出成文本内容
     *
     * @param description description to be formatted
     * @return string representation of the description
     */
    public String format(Description description) {
        // 提取所有的段落内容移交给内部衍生做文本格式化
        for (BlockElement blockElement : description.getBlocks()) {
            blockElement.format(this);
        }
        // 完成格式化之后重置字符串构建器
        return finalizeFormatting();
    }

    /**
     * 转发 LinkElement 处理
     */
    public void format(LinkElement element) {
        formatLink(state, element.getLink(), element.getText());
    }

    /**
     * 转发 TextElement 处理
     */
    public void format(TextElement element) {
        String[] inlineElements =
                element.getElements().stream()
                        .map(
                                // 这里就是直接递归每行文本内容构建新的字符串之后合并成数组
                                el -> {
                                    Formatter formatter = newInstance();
                                    el.format(formatter);
                                    return formatter.finalizeFormatting();
                                })
                        .toArray(String[]::new);

        // 最后合并成对应的文本格式
        formatText(
                state,
                escapeFormatPlaceholder(element.getFormat()),
                inlineElements,
                element.getStyles());
    }

    /**
     * 转发 LineBreakElement 处理
     */
    public void format(LineBreakElement ignore) {
        formatLineBreak(state);
    }

    /**
     * 转发 ListElement 处理
     */
    public void format(ListElement element) {
        String[] inlineElements =
                element.getEntries().stream()
                        .map(
                                el -> {
                                    Formatter formatter = newInstance();
                                    el.format(formatter);
                                    return formatter.finalizeFormatting();
                                })
                        .toArray(String[]::new);
        formatList(state, inlineElements);
    }

    /**
     * 输出文本内容构建器并重置, 最后返回格式化内容
     */
    private String finalizeFormatting() {
        String result = state.toString();
        state.setLength(0);
        return result.replaceAll("%%", "%");
    }

    /**
     * 要求继承的抽象格式化必须实现 LinkElement 的节点处理(外部连接)
     */
    protected abstract void formatLink(StringBuilder state, String link, String description);

    /**
     * 要求继承的抽象格式化必须实现 LineBreakElement 的节点处理(换行)
     */
    protected abstract void formatLineBreak(StringBuilder state);

    /**
     * 要求继承的抽象格式化必须实现 TextElement 的节点处理(文本)
     */
    protected abstract void formatText(
            StringBuilder state,
            String format,
            String[] elements,
            EnumSet<TextElement.TextStyle> styles);

    /**
     * 要求继承的抽象格式化必须实现 ListElement 的节点处理(多行文本)
     */
    protected abstract void formatList(StringBuilder state, String[] entries);

    /**
     * 获取构建全新实例化对象, 用于迭代遍历所有 Element 的是否生成新段落
     */
    protected abstract Formatter newInstance();


    /**
     * 内部随机占位符内容, 用于一些声明 %s 占位替代
     */
    private static final String TEMPORARY_PLACEHOLDER = "randomPlaceholderForStringFormat";

    /**
     * 对 %s 内容占位符进行检查并且替换
     */
    private static String escapeFormatPlaceholder(String value) {
        return value.replaceAll("%s", TEMPORARY_PLACEHOLDER)
                .replaceAll("%", "%%")
                .replaceAll(TEMPORARY_PLACEHOLDER, "%s");
    }
}

这里其实就是对 BlockElement|InlineElement 的衍生类做格式化转发, 具体的内容在 TextElementListElement 实现类; 最绕的逻辑都在 Formatter 之中, 所以主要理解成本都在其中, 这里篇幅有限所以只讲解 TextElement 的类功能:

import org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;

/**
 * 文本节点实现
 * 注意: 涉及到单行文本+多行文本的实现必须要继承 BlockElement+InlineElement, 如果是相对简单的内容可以直接声明 InlineElement
 */
public class TextElement implements BlockElement, InlineElement {

    private final String format;
    private final List<InlineElement> elements;
    private final EnumSet<TextStyle> textStyles = EnumSet.noneOf(TextStyle.class);

    /**
     * Creates a block of text with placeholders ("%s") that will be replaced with proper string
     * representation of given {@link InlineElement}. For example:
     * 创建支持 %s 格式化的文本
     *
     * <p>{@code text("This is a text with a link %s", link("https://somepage", "to here"))}
     *
     * @param format   text with placeholders for elements
     * @param elements elements to be put in the text
     * @return block of text
     */
    public static TextElement text(String format, InlineElement... elements) {
        return new TextElement(format, Arrays.asList(elements));
    }

    /**
     * Creates a simple block of text.
     * 创建简单的单行文本
     *
     * @param text a simple block of text
     * @return block of text
     */
    public static TextElement text(String text) {
        return new TextElement(text, Collections.emptyList());
    }

    /**
     * Wraps a list of {@link InlineElement}s into a single {@link TextElement}.
     * 对文本的 %s 占位符进行批量替换
     */
    public static InlineElement wrap(InlineElement... elements) {
        return text(StringUtils.repeat("%s", elements.length), elements);
    }

    /**
     * Creates a block of text formatted as code.
     * 这里其实就是声明文本追加 <code></code> 自定义标签
     *
     * @param text a block of text that will be formatted as code
     * @return block of text formatted as code
     */
    public static TextElement code(String text) {
        TextElement element = text(text);
        element.textStyles.add(TextStyle.CODE);
        return element;
    }

    /**
     * 获取格式化文本
     */
    public String getFormat() {
        return format;
    }

    /**
     * 获取文本段落列表内容
     */
    public List<InlineElement> getElements() {
        return elements;
    }

    /**
     * 获取文本的风格
     */
    public EnumSet<TextStyle> getStyles() {
        return textStyles;
    }

    /**
     * 私有构建方法
     */
    private TextElement(String format, List<InlineElement> elements) {
        this.format = format;
        this.elements = elements;
    }

    /**
     * 重载格式化处理, 其实就是转发到 Formatter 让其帮忙格式化输出文本
     */
    @Override
    public void format(Formatter formatter) {
        formatter.format(this);
    }

    /**
     * Styles that can be applied to {@link TextElement} e.g. code, bold etc.
     * 声明文本详情的风格, 这里其实就是参考 HTML 风格的 <code></code> 节点, 后续如果想要追加更多外部节点可以手动扩展
     */
    public enum TextStyle {
        CODE
    }
}

这里编写个测试单元测试下 TextElement|Description|Formatter 是怎么结合生成最后的详情内容:

import org.junit.Test;
import org.meteorcat.fusion.configuration.description.Description;
import org.meteorcat.fusion.configuration.description.Formatter;
import org.meteorcat.fusion.configuration.description.LinkElement;
import org.meteorcat.fusion.configuration.description.TextElement;

import java.util.EnumSet;

/**
 * 详情内容构建器测试
 */
public class DescriptionTests {

    /**
     * 生成文本节点
     */
    @Test
    public void textElement() {
        // 生成构建器
        final String text = "Hello World!";
        final Description.DescriptionBuilder builder = Description.builder();

        // 创建文本节点
        builder.text(text);// 其实内部是做 blocks.add(TextElement.text(text)) 添加节点

        // 上面的其实等效于下面语法, 其实就是追加 list 内容
        builder.add(TextElement.text("oh!!!!!"));

        // 最后生成详情对象
        Description description = builder.build();
        assert !description.getBlocks().isEmpty();
    }

    /**
     * 衍生的文档格式化工具类
     */
    public static class HtmlFormatter extends Formatter {

        @Override
        protected void formatLink(StringBuilder state, String link, String description) {
            state.append(String.format("<a href=\"%s\">%s</a>", link, description));
        }

        @Override
        protected void formatLineBreak(StringBuilder state) {
            state.append("<br />");
        }

        @Override
        protected void formatText(
                StringBuilder state,
                String format,
                String[] elements,
                EnumSet<TextElement.TextStyle> styles) {
            String escapedFormat = escapeCharacters(format);

            String prefix = "";
            String suffix = "";
            if (styles.contains(TextElement.TextStyle.CODE)) {
                prefix = "<code class=\"highlighter-rouge\">";
                suffix = "</code>";
            }
            state.append(prefix);
            state.append(String.format(escapedFormat, (Object) elements));
            state.append(suffix);
        }

        @Override
        protected void formatList(StringBuilder state, String[] entries) {
            state.append("<ul>");
            for (String entry : entries) {
                state.append(String.format("<li>%s</li>", entry));
            }
            state.append("</ul>");
        }

        @Override
        protected Formatter newInstance() {
            return new HtmlFormatter();
        }

        private static String escapeCharacters(String value) {
            return value.replaceAll("&", "&amp;").replaceAll("<", "&lt;").replaceAll(">", "&gt;");
        }
    }

    /**
     * 节点格式化输出文档
     */
    @Test
    public void textFormat() {
        // 生成节点对象
        final Description.DescriptionBuilder builder = Description.builder();
        builder.add(TextElement.text("Hello"));
        builder.add(TextElement.text("World"));
        builder.add(TextElement.text("!"));
        builder.linebreak(); // 换行
        builder.list(LinkElement.link("https://www.meterorcat.net", "meteorcat"));
        final Description description = builder.build();

        // 输出所有详情文档
        final HtmlFormatter formatter = new HtmlFormatter();
        final String htmlContent = formatter.format(description);

        // 这里主要是打印看下内容效果而不采用断言
        System.out.println(htmlContent);
    }
}

最后 HTML 测试输出内容如下, 这里是手动实现个 HtmlFormatter 来对内容输出成 HTML:

HelloWorld!<br /><ul><li><a href="https://www.meterorcat.net">meteorcat</a></li></ul>

他里面的封装实现虽然比较绕, 但是却带来很不错的可扩展性, 但是如果日常工作的情况大部分不需要这种详情内容输出, 所以一般如果项目时间要求紧张是可以不管这方面的内容.