MeteorCat / VertX使用

Created Sat, 03 May 2025 22:41:05 +0800 Modified Wed, 29 Oct 2025 23:24:53 +0800
3509 Words

对于日常 Web 应用来说 Spring 全家桶已经足够用, 但是涉及到某些底层操作(TCP|UDP|WebSocket)的时候就感觉冗余且捉襟见肘.

Spring 封装过头导致及其冗余, 很多组件其实没有那么必要引入, 所以需要重新选择技术栈; Spring 作为框架太复杂在网上检索到 VertX 来做工具集:

  • Spring:作为 框架(Framework) 集成大量日常通用工具并且屏蔽底层封装高级接口
  • VertX: 内部采用 netty 做底层将对应方法简略封装, 所以更像是 工具集(Utilities)

如果需要编写网关层, 推荐采用该方案, 原生 netty 虽然也可以但是缺失很多关键部件

对于底层网络基础来说 VertX 就更加符合需求, 所以在考虑之后决定引入:


<dependencies>
    <!-- 最基础的核心底层, 如果功能单一基本上只需要该第三方库就行了 -->
    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-core</artifactId>
        <version>4.5.14</version>
    </dependency>


    <!-- Web功能服务 -->
    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-web</artifactId>
        <version>4.5.14</version>
    </dependency>

    <!-- MYSQL数据库客户端,支持池处理 -->
    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-mysql-client</artifactId>
        <version>4.5.14</version>
    </dependency>

    <!-- JUnit 测试框架参照: https://vertx.io/docs/vertx-junit5/java/ -->
</dependencies>

建议配合 JCommander 编写成命令行工具, 抛弃原来集成 Spring 臃肿的组件

VertX 内置大量异步处理, 基本上整体流程就是异步操作, 对开发者有更加高的要求.

因为采用异步设计, 所以需要有 运行时(Runtime) 概念, 创建运行时也非常简单:

// 创建异步运行时
Vertx vertx = Vertx.vertx();

// 也可以按照自定义参数, setWorkerPoolSize 代表初始化异步线程池大小为40
// new VertxOptions() 就是创建运行时具体配置
// https://vertx.io/docs/apidocs/io/vertx/core/VertxOptions.html
// 除非你知道自己在干什么, 否则最好采用默认参数即可
Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));

千万注意: 异步线程池自定义配置必须完全了解你正在干什么, 否则乱写配置会导致性能急剧下降

运行时有 函数面向对象 不同的构造方法, 分别可以按照个人风格来选择配置:

// 创建运行时
Vertx vertx = Vertx.vertx();

// 创建函数化服务
HttpServer server = vertx.createHttpServer();
server.requestHandler(request ->{
    // 响应获取
    HttpServerResponse response = request.response();
    
    // 设置响应头信息
    response.putHeader("content-type", "text/plain");
    
    // 写入响应数据结构
    response.write("some text");
    
    // 结束响应
    response.end();
});
server.listen(8080); // 设定监听端口

之后就是 面向对象 的定义和使用方法:

import io.vertx.core.AbstractVerticle;

/**
 * 面向对象初始化
 */
public class MainVerticle extends AbstractVerticle {

    /**
     * 入口方法
     */
    @Override
    public void start(Promise<Void> startPromise) throws Exception {

        // 创建HTTP服务
        HttpServer server = vertx.createHttpServer();
        server.requestHandler(request -> {
            // 响应获取
            HttpServerResponse response = request.response();

            // 设置响应头信息
            response.putHeader("content-type", "text/plain");

            // 写入响应数据结构
            response.write("some text");

            // 结束响应
            response.end();
        });

        // 设定监听端口
        Future<HttpServer> future = server.listen(8080);

        // 设置启动成功回调
        future.onSuccess(s -> {
            System.out.println("HTTP server started on port " + s.actualPort());
            startPromise.complete();
        });

        // 设置异常回调
        future.onFailure(throwable -> {
            throwable.printStackTrace();
            startPromise.fail(throwable);
        });
    }

//    // 有的初始化需要追加以下静态入口方法注入启动
//    // https://vertx.io/docs/
//    public static void main(String[] args) {
//        Vertx vertx = Vertx.vertx();
//        // 自定义启动注入
//        vertx.deployVerticle(MainVerticle.class.getName());
//    }
}

这两种方法按照个人风格喜好可以自己去选择各自风格就行了.

Web

VertX 带有网络组件可以直接调用:

<!-- Web功能服务 -->
<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-web</artifactId>
    <version>4.5.14</version>
</dependency>

之后定义一下入口和成功|失败回调功能就行, 其实功能相对来说比较简单:

import io.vertx.core.AbstractVerticle;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;

/**
 * 面向对象初始化
 */
public class MainVerticle extends AbstractVerticle {

    private final Logger logger = LoggerFactory.getLogger(MainVerticle.class);

    /**
     * 启动入口
     */
    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        // 自定义启动注入
        vertx.deployVerticle(MainVerticle.class.getName());
    }


    /**
     * 回调处理
     */
    @Override
    public void start(Promise<Void> startPromise) throws Exception {

        // 创建HTTP服务
        HttpServer server = vertx.createHttpServer();
        server.requestHandler(request -> {
            // 响应获取
            HttpServerResponse response = request.response();

            // 设置响应头信息
            response.putHeader("content-type", "text/plain");

            // 写入响应数据结构
            response.write("Hello.World");

            // 结束响应
            response.end();
        });

        // 设定监听端口
        Future<HttpServer> future = server.listen(8080);

        // 设置启动成功回调
        future.onSuccess(s -> {
            logger.info("HTTP server started on port " + s.actualPort());
            startPromise.complete();
        });

        // 设置异常回调
        future.onFailure(throwable -> {
            logger.error(throwable);
            startPromise.fail(throwable);
        });
    }
}

实际上配置运行起来没什么难度, 最后打包运行起来即可.

WebSocket

注意: 对于 VertX 来说 WebSocket 只是作为 Web 模块的扩展延伸而已, 所以基本上和 HTTP 没什么差别:

import io.vertx.core.AbstractVerticle;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;

/**
 * 面向对象初始化
 */
public class MainVerticle extends AbstractVerticle {

    private final Logger logger = LoggerFactory.getLogger(MainVerticle.class);

    /**
     * 启动入口
     */
    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        // 自定义启动注入
        vertx.deployVerticle(MainVerticle.class.getName());
    }

    /**
     * 回调处理
     * 和 Web 功能不一样的就是这里初始化 WebSocket
     * <a href="https://vertx.io/docs/vertx-core/java/#_websockets">WebSocket服务</a>
     */
    @Override
    public void start(Promise<Void> startPromise) throws Exception {

        // 创建HTTP服务, 注意 WebSocket 是架构在 Http上
        HttpServer server = vertx.createHttpServer();

        // 设置WebSocket句柄
        server.webSocketHandshakeHandler(handshake -> {
            // 注意这里需要确认是否追加 path
            if (handshake.path().equals("/owner")) {
                // 允许连接
                handshake.accept();
            } else {
                // 拒绝连接
                handshake.reject();
            }
        });

        // 对WebSocket句柄回调处理
        server.webSocketHandler(webSocket -> {
            // 这里就是具体的 WebSocket回调
            // 返回数据分为两种: 字符串 和 字节流
            // Buffer buffer = Buffer.buffer().appendInt(123).appendFloat(1.23f);
            // webSocket.writeBinaryMessage(buffer);
            //
            // String message = "hello";
            // webSocket.writeTextMessage(message);
            //
            // 另外还有帧数据发送方法, 用于自己对数据做分帧处理
            webSocket.writeTextMessage("hello.world");


            // 设置断开回调
            webSocket.closeHandler(ignored -> {
                logger.info("websocket closed");
            });


            // 后面需要就直接手动断开连接
            // webSocket.close();
        });


        // 设定监听端口
        Future<HttpServer> future = server.listen(8080);

        // 设置启动成功回调
        future.onSuccess(s -> {
            logger.info("WebSocket server started on port " + s.actualPort());
            startPromise.complete();
        });

        // 设置异常回调
        future.onFailure(throwable -> {
            logger.error(throwable);
            startPromise.fail(throwable);
        });
    }
}

十分简单的工具库集成实例化, 这样就能挂载起一个简单 WebSocket 网络服务

WebSocketFrame

这里需要说明, 无论 WebSocket|TCP|UDP 需要注意自己把数据流分帧.

只有 UDP 报文有粘包概念, 其他协议则是基于 流(Stream) 概念和 UDP 有本质差别

这里默认以 流(Stream) 需要把数据流切片分帧, 就像数据以下方式处理:

[             发送多次消息包                ]
[   1   ][   2   ][  3   ][  4 ][    5    ]   
[ hello ][ world ][ this ][ is ][ messgae ]

# 上面其实发了5条信息, 可能因为网络卡顿或者切换导致消息一次性积压推送到服务器
# 这时候就需要应用层面把消息切片分帧

WebSocketTCP 再度高级封装, 所以采用 message-based 而不是 streaming-based

VertX 当中也提供这种数据帧处理方式: WebSocketFrame

// 对WebSocket句柄回调处理
server.webSocketHandler(webSocket -> {

    // 设置断开回调
    webSocket.closeHandler(ignored -> {
        logger.info("websocket closed");
    });

    // 数据帧处理
    // WebSocket其实内部协议携带有 `Payload Length` 数据长度字段(长度: 65535), 所以消息结构接受的时候就已经知道消息长度
    // 如果超出的话消息会对此自动底层切分, 所以单条消息最好控制在 65535 之中
    // 对于大于 65535 的消息最好追加 bit(bool) 位标识确定消息时候完整, 1|0 = 确定是否需要把后面消息合并
    // 另外还有说法就是压缩位, 也就是需要额外再加个 uint8 数值位确定数据时候采用压缩算法, 0 默认不采用压缩
    // 不过私有协议一般没必要太过较真可以直接简单点采用紧凑的消息结构
    // 如果可以尽量保持 int32 + int32 这种方式, 如果客户端挂载脚本语言可能会不支持 Unsigned, 所以为了避免不必要麻烦推荐数值采用非 Unsigned
    webSocket.frameHandler(frame -> {

        // 只接受二进制数据方式传入
        if (frame.isBinary()) {

            // 获取数据缓冲区
            Buffer buffer = frame.binaryData();

            // 首先要知道一个消息包长度 [uint32(后续内容长度)] [uint32(消息id)][byte...]
            // 那么可以确定消息最小也是 > sizeof(uint32) = Integer.BYTES * 2
            // 为了效率可以外部定义常量加载, 这里演示就直接在内部定义
            int sizeof = Integer.BYTES * 2; // 4 * 2 = 8

            // 数据格式错误就跳过该消息包
            if (buffer.length() < sizeof) return;

            // 首先必须要获取消息体长度
            long uint32Length = buffer.getUnsignedInt(0);

            // 次位必须要消息id
            long uint32Id = buffer.getUnsignedInt(Integer.BYTES);

            // 后续就是消息体内容
            int dataLength = Math.toIntExact(uint32Length - Integer.BYTES);
            byte[] data = new byte[dataLength];

            // 这里第二位参数长度 = len - 第一位长度
            // 第一第二位数据参数关系为从源消息二进制之中截取 [Integer.BYTES * 2, len - Integer.BYTES * 2]
            // 这里看个人习惯来做解析, 主要最后还是要对流切片分帧
            buffer.getBytes(
                    Integer.BYTES * 2, // 消息内容起始 idx
                    Math.toIntExact(uint32Length + Integer.BYTES), // 消息包总长
                    data, // 消息实例化缓冲区
                    0 // 从缓冲区 idx 位置开始写入
            );

            logger.info("Message Length: %d, Id: %d, Data: %s".formatted(uint32Length, uint32Id, Arrays.toString(data)));
        }
    });
});

TCP

TCP|UDP 之类是属于基础核心功能, 所以只需要 core 库就行

后续不满足 WebSocket 网络性能的时候, 就要开始从底层实现所以也就开始涉及到 TCP 的服务端设计:

/**
 * 面向对象初始化
 * 其他不变, VertX万变不离其宗就是要生成运行时对象
 */
public class MainVerticle extends AbstractVerticle {

    private final Logger logger = LoggerFactory.getLogger(MainVerticle.class);

    /**
     * 启动入口
     */
    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        // 自定义启动注入
        vertx.deployVerticle(MainVerticle.class.getName());
    }

    /**
     * 回调处理
     * 和 Web 功能不一样的就是这里初始化 Net 服务
     * <a href="https://vertx.io/docs/vertx-micrometer-metrics/java/#_tcp_server">TCP服务</a>
     */
    @Override
    public void start(Promise<Void> startPromise) throws Exception {
        NetServer server = vertx.createNetServer();

        // 消息回调处理
        server.connectHandler(socket -> {
            socket.handler(buffer -> {
                logger.info("I received some bytes: " + buffer.length());

                // 数据写入也是分为字符串和字节流
                // Buffer buffer = Buffer.buffer().appendFloat(12.34f).appendInt(123);
                // socket.write(buffer);
                //
                // socket.write("some data");// 默认UTF-8
                //socket.write("some data", "UTF-16"); // 采用其他编码字符串

            });

            // 关闭句柄
            socket.closeHandler(v -> {
                logger.error("The socket has been closed");
            });
        });


        // 监听端口和地址, 端口设为0则默认随机端口
        Future<NetServer> future = server.listen(8080, "localhost");


        // 监听连接回调
        future.onComplete(res -> {
            if (res.succeeded()) {
                logger.info("Server is now listening!");
            } else {
                logger.error("Failed to bind!");
            }
        });

        // 设置启动成功回调
        future.onSuccess(s -> {
            logger.info("Net server started on port " + s.actualPort());
            startPromise.complete();
        });

        // 设置异常回调
        future.onFailure(throwable -> {
            logger.error(throwable);
            startPromise.fail(throwable);
        });
    }
}

采用 Linux 的命令行就能测试下, 没问题就能代表 TCP 网络服务挂载完成.

# 以TCP连接请求, 输入内容回车自动发送文本
telnet 127.0.0.1 8080

UDP

UDP 比较常用于高效游戏交换和局域网广播等服务

常规来说 TCP 已经能满足日常的使用, 但是有些要求更底层实现报文就需要采用 UDP 协议处理; 其实配置起来也很简单, 流程也是一样:

/**
 * 面向对象初始化
 * 其他不变, VertX万变不离其宗就是要生成运行时对象
 */
public class MainVerticle extends AbstractVerticle {
    private final Logger logger = LoggerFactory.getLogger(MainVerticle.class);

    /**
     * 启动入口
     */
    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        // 自定义启动注入
        vertx.deployVerticle(MainVerticle.class.getName());
    }

    /**
     * 回调处理
     * 和 Web 功能不一样的就是这里初始化 Net 服务
     * <a href="https://vertx.io/docs/vertx-core/java/#_datagram_sockets_udp">UDP服务</a>
     */
    @Override
    public void start(Promise<Void> startPromise) throws Exception {
        DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());

        // 消息回调
        socket.handler(packet -> {
            SocketAddress client = packet.sender();

            // 报文直接发送数据回客户端地址
            socket.send("hello.world", client.port(), client.host());

        });


        // 监听端口和地址, 端口设为0则默认随机端口
        Future<DatagramSocket> future = socket.listen(8080, "localhost");


        // 监听连接回调
        future.onComplete(res -> {
            if (res.succeeded()) {
                logger.info("Server is now listening!");
            } else {
                logger.error("Failed to bind!");
            }
        });


        // 设置启动成功回调
        future.onSuccess(s -> {
            startPromise.complete();
        });

        // 设置异常回调
        future.onFailure(throwable -> {
            logger.error(throwable);
            startPromise.fail(throwable);
        });
    }
}

注意 UDP 作为报文协议和 TCP 差别很大, 这里需要 Linux 测试连接:

# 以 UDP 方式连接服务
nc -u 127.0.0.1 8080

之后回车输入内容都会返回 Hello.World 数据.