Vert.x
Eclipse Vert.x 是基于 事件驱动 + 非阻塞 I/O 的响应式应用框架,运行在 JVM 上,多语言支持(Java/Kotlin/Groovy/JS/Ruby)。以 Netty 为底层传输层,通过 Event Loop + Verticle 模型实现高吞吐、低延迟。
适合:API 网关、微服务网关、实时消息推送、IoT 设备通信等高并发场景。
核心概念
| 概念 | 说明 |
|---|---|
| Vertx | 全局入口,管理 Event Loop、Worker 线程池、Event Bus |
| Verticle | 部署单元(类似 Actor),代码在绑定的 Event Loop 线程中运行 |
| Event Loop | 每个核心一个,轮询 I/O 事件,禁止阻塞 |
| Worker Verticle | 运行在独立线程池,专门执行阻塞操作 |
| Event Bus | 轻量级消息总线,Verticle 间通信,支持点对点 / 发布订阅 / 请求-响应 |
| Future / Promise | Vert.x 异步结果的核心抽象,可组合链式调用 |
架构
Netty EventLoop(1 个/核)
│ 处理 I/O 事件(accept / read / write)
▼
Verticle(Standard,绑定单个 Event Loop)
│ 异步调用其他服务 / DB / Event Bus
▼
Worker Verticle(独立线程池,执行阻塞逻辑)
黄金法则:永远不要在 Event Loop 中执行阻塞操作(Thread.sleep、JDBC 同步调用等),否则整个 Event Loop 停顿。
快速上手(HTTP Server)
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
<version>4.5.8</version>
</dependency>public class MainVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
Router router = Router.router(vertx);
// Body 解析(必须在路由前添加)
router.route().handler(BodyHandler.create());
router.get("/api/hello").handler(ctx ->
ctx.json(new JsonObject().put("msg", "Hello Vert.x")));
router.post("/api/orders").handler(ctx -> {
JsonObject body = ctx.body().asJsonObject();
// 异步处理
orderService.create(body)
.onSuccess(id -> ctx.response()
.setStatusCode(201)
.end(new JsonObject().put("id", id).encode()))
.onFailure(ctx::fail);
});
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.<Void>mapEmpty()
.onComplete(startPromise);
}
}
// 启动
public static void main(String[] args) {
Vertx.vertx().deployVerticle(new MainVerticle());
}异步编程模型
Future 链式组合
// 串行异步调用
userService.findById(userId)
.compose(user -> orderService.findByUser(user.getId()))
.compose(orders -> notificationService.sendSummary(orders))
.onSuccess(v -> log.info("完成"))
.onFailure(err -> log.error("失败", err));并行组合
// 并行发起多个请求,等待全部完成
Future<User> userFuture = userService.findById(userId);
Future<List<Order>> orderFuture = orderService.findByUser(userId);
Future.all(userFuture, orderFuture)
.onSuccess(cf -> {
User user = cf.resultAt(0);
List<Order> orders = cf.resultAt(1);
ctx.json(buildResponse(user, orders));
});Kotlin Coroutines(推荐)
// vertx-lang-kotlin-coroutines
vertx.deployVerticle(object : CoroutineVerticle() {
override suspend fun start() {
val router = Router.router(vertx)
router.get("/api/user/:id").coHandler { ctx ->
val user = userService.findById(ctx.pathParam("id").toLong())
ctx.json(user)
}
vertx.createHttpServer().requestHandler(router).listen(8080).await()
}
})Event Bus 通信
// 发布者:广播
vertx.eventBus().publish("news.updates", new JsonObject().put("title", "Breaking"));
// 消费者:订阅
vertx.eventBus().<JsonObject>consumer("news.updates", msg -> {
System.out.println("收到: " + msg.body());
});
// 请求-响应
vertx.eventBus().<JsonObject>request("order.create", orderJson)
.onSuccess(reply -> System.out.println("创建成功: " + reply.body()))
.onFailure(err -> System.out.println("创建失败: " + err.getMessage()));
// 处理方(必须 reply)
vertx.eventBus().<JsonObject>consumer("order.create", msg -> {
orderService.create(msg.body())
.onSuccess(id -> msg.reply(new JsonObject().put("id", id)))
.onFailure(err -> msg.fail(500, err.getMessage()));
});Worker Verticle(阻塞操作)
public class JdbcWorker extends AbstractVerticle {
@Override
public void start() {
// 运行在 Worker 线程,可以阻塞
vertx.eventBus().<Long>consumer("user.findById", msg -> {
User user = jdbcTemplate.queryForObject( // 同步 JDBC,此处允许阻塞
"SELECT * FROM users WHERE id=?", userRowMapper, msg.body());
msg.reply(JsonObject.mapFrom(user));
});
}
}
// 以 Worker 模式部署
vertx.deployVerticle(new JdbcWorker(),
new DeploymentOptions().setWorker(true).setInstances(4));响应式 PostgreSQL 客户端
PgPool pool = PgPool.pool(vertx, "postgresql://user:pass@localhost/mydb",
new PoolOptions().setMaxSize(10));
pool.withConnection(conn ->
conn.preparedQuery("SELECT * FROM orders WHERE user_id=$1")
.execute(Tuple.of(userId))
).onSuccess(rows -> {
for (Row row : rows) {
System.out.println(row.getLong("id") + " - " + row.getString("status"));
}
});Vert.x vs Spring WebFlux
| 特性 | Vert.x | Spring WebFlux |
|---|---|---|
| 底层 | Netty(直接控制) | Netty / Undertow / Jetty |
| 编程模型 | Future / Callback / Kotlin Coroutine | Reactor(Mono/Flux) |
| 学习曲线 | 较陡(需理解 Event Loop 约束) | 中(Reactor 操作符) |
| 生态 | Vert.x 独立生态 | Spring 全家桶 |
| 性能 | 极高(更接近底层) | 高 |
| 适用场景 | 独立高性能服务 / 网关 | 已有 Spring 项目迁移响应式 |