Vert.x

Eclipse Vert.x 是基于 事件驱动 + 非阻塞 I/O响应式应用框架,运行在 JVM 上,多语言支持(Java/Kotlin/Groovy/JS/Ruby)。以 Netty 为底层传输层,通过 Event Loop + Verticle 模型实现高吞吐、低延迟。

适合:API 网关、微服务网关、实时消息推送、IoT 设备通信等高并发场景。


核心概念

概念说明
Vertx全局入口,管理 Event Loop、Worker 线程池、Event Bus
Verticle部署单元(类似 Actor),代码在绑定的 Event Loop 线程中运行
Event Loop每个核心一个,轮询 I/O 事件,禁止阻塞
Worker Verticle运行在独立线程池,专门执行阻塞操作
Event Bus轻量级消息总线,Verticle 间通信,支持点对点 / 发布订阅 / 请求-响应
Future / PromiseVert.x 异步结果的核心抽象,可组合链式调用

架构

Netty EventLoop(1 个/核)
    │  处理 I/O 事件(accept / read / write)
    ▼
Verticle(Standard,绑定单个 Event Loop)
    │  异步调用其他服务 / DB / Event Bus
    ▼
Worker Verticle(独立线程池,执行阻塞逻辑)

黄金法则:永远不要在 Event Loop 中执行阻塞操作(Thread.sleep、JDBC 同步调用等),否则整个 Event Loop 停顿。


快速上手(HTTP Server)

<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-web</artifactId>
    <version>4.5.8</version>
</dependency>
public class MainVerticle extends AbstractVerticle {
 
    @Override
    public void start(Promise<Void> startPromise) {
        Router router = Router.router(vertx);
 
        // Body 解析(必须在路由前添加)
        router.route().handler(BodyHandler.create());
 
        router.get("/api/hello").handler(ctx ->
            ctx.json(new JsonObject().put("msg", "Hello Vert.x")));
 
        router.post("/api/orders").handler(ctx -> {
            JsonObject body = ctx.body().asJsonObject();
            // 异步处理
            orderService.create(body)
                .onSuccess(id -> ctx.response()
                    .setStatusCode(201)
                    .end(new JsonObject().put("id", id).encode()))
                .onFailure(ctx::fail);
        });
 
        vertx.createHttpServer()
            .requestHandler(router)
            .listen(8080)
            .<Void>mapEmpty()
            .onComplete(startPromise);
    }
}
 
// 启动
public static void main(String[] args) {
    Vertx.vertx().deployVerticle(new MainVerticle());
}

异步编程模型

Future 链式组合

// 串行异步调用
userService.findById(userId)
    .compose(user -> orderService.findByUser(user.getId()))
    .compose(orders -> notificationService.sendSummary(orders))
    .onSuccess(v -> log.info("完成"))
    .onFailure(err -> log.error("失败", err));

并行组合

// 并行发起多个请求,等待全部完成
Future<User>    userFuture    = userService.findById(userId);
Future<List<Order>> orderFuture = orderService.findByUser(userId);
 
Future.all(userFuture, orderFuture)
    .onSuccess(cf -> {
        User user        = cf.resultAt(0);
        List<Order> orders = cf.resultAt(1);
        ctx.json(buildResponse(user, orders));
    });

Kotlin Coroutines(推荐)

// vertx-lang-kotlin-coroutines
vertx.deployVerticle(object : CoroutineVerticle() {
    override suspend fun start() {
        val router = Router.router(vertx)
        router.get("/api/user/:id").coHandler { ctx ->
            val user = userService.findById(ctx.pathParam("id").toLong())
            ctx.json(user)
        }
        vertx.createHttpServer().requestHandler(router).listen(8080).await()
    }
})

Event Bus 通信

// 发布者:广播
vertx.eventBus().publish("news.updates", new JsonObject().put("title", "Breaking"));
 
// 消费者:订阅
vertx.eventBus().<JsonObject>consumer("news.updates", msg -> {
    System.out.println("收到: " + msg.body());
});
 
// 请求-响应
vertx.eventBus().<JsonObject>request("order.create", orderJson)
    .onSuccess(reply -> System.out.println("创建成功: " + reply.body()))
    .onFailure(err -> System.out.println("创建失败: " + err.getMessage()));
 
// 处理方(必须 reply)
vertx.eventBus().<JsonObject>consumer("order.create", msg -> {
    orderService.create(msg.body())
        .onSuccess(id -> msg.reply(new JsonObject().put("id", id)))
        .onFailure(err -> msg.fail(500, err.getMessage()));
});

Worker Verticle(阻塞操作)

public class JdbcWorker extends AbstractVerticle {
 
    @Override
    public void start() {
        // 运行在 Worker 线程,可以阻塞
        vertx.eventBus().<Long>consumer("user.findById", msg -> {
            User user = jdbcTemplate.queryForObject(  // 同步 JDBC,此处允许阻塞
                "SELECT * FROM users WHERE id=?", userRowMapper, msg.body());
            msg.reply(JsonObject.mapFrom(user));
        });
    }
}
 
// 以 Worker 模式部署
vertx.deployVerticle(new JdbcWorker(),
    new DeploymentOptions().setWorker(true).setInstances(4));

响应式 PostgreSQL 客户端

PgPool pool = PgPool.pool(vertx, "postgresql://user:pass@localhost/mydb",
    new PoolOptions().setMaxSize(10));
 
pool.withConnection(conn ->
    conn.preparedQuery("SELECT * FROM orders WHERE user_id=$1")
        .execute(Tuple.of(userId))
).onSuccess(rows -> {
    for (Row row : rows) {
        System.out.println(row.getLong("id") + " - " + row.getString("status"));
    }
});

Vert.x vs Spring WebFlux

特性Vert.xSpring WebFlux
底层Netty(直接控制)Netty / Undertow / Jetty
编程模型Future / Callback / Kotlin CoroutineReactor(Mono/Flux)
学习曲线较陡(需理解 Event Loop 约束)中(Reactor 操作符)
生态Vert.x 独立生态Spring 全家桶
性能极高(更接近底层)
适用场景独立高性能服务 / 网关已有 Spring 项目迁移响应式

相关链接