RocketMQ
Apache RocketMQ 是阿里巴巴开源的分布式消息中间件,脱胎于双十一大促场景,在事务消息、顺序消息、延迟消息方面提供原生支持,是国内 Java 微服务生态中与 Kafka 并列的主流选择。
核心概念
| 概念 | 说明 |
|---|---|
| NameServer | 轻量级注册中心,存储 Topic 路由信息,无状态可多实例部署 |
| Broker | 消息存储节点,Master/Slave 架构,负责消息读写 |
| Topic | 消息逻辑分类,一个 Topic 可跨多个 Broker |
| MessageQueue | Topic 的物理分片(类似 Kafka 的 Partition) |
| Producer | 消息生产者,通过 NameServer 发现 Broker |
| Consumer Group | 消费者组,广播模式或集群模式消费 |
| Tag | Topic 下的子类型过滤标签,无需建新 Topic |
架构
Producer
│ 1. 从 NameServer 获取 Topic 路由
│ 2. 按策略选择 MessageQueue 发送
▼
Broker Master ──同步/异步复制──▶ Broker Slave
│ CommitLog(顺序追加写)
│ ConsumeQueue(每个 Topic+Queue 一个索引文件)
▼
Consumer Group(集群模式:每个 Queue 只分配一个 Consumer)
快速上手(Spring Boot)
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>rocketmq:
name-server: 127.0.0.1:9876
producer:
group: order-producer-group
send-message-timeout: 3000发送普通消息:
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 同步发送
SendResult result = rocketMQTemplate.syncSend("order-topic:CREATE", orderDTO);
// 异步发送
rocketMQTemplate.asyncSend("order-topic", orderDTO, new SendCallback() {
@Override public void onSuccess(SendResult r) { log.info("sent: {}", r.getMsgId()); }
@Override public void onException(Throwable e) { log.error("send failed", e); }
});
// 单向发送(日志场景,不关心结果)
rocketMQTemplate.sendOneWay("log-topic", logDTO);消费消息:
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
selectorExpression = "CREATE || UPDATE" // Tag 过滤
)
public class OrderConsumer implements RocketMQListener<OrderDTO> {
@Override
public void onMessage(OrderDTO order) {
orderService.process(order); // 抛异常则重试
}
}延迟消息
RocketMQ 内置 18 个延迟级别(RocketMQ 5.x 支持任意精度延迟):
Level: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// 发送延迟消息(Level 3 = 10s 后投递)
Message<OrderDTO> msg = MessageBuilder.withPayload(orderDTO).build();
rocketMQTemplate.syncSend("order-topic", msg, 3000, 3);RocketMQ 5.x 支持任意时间点:
rocketMQTemplate.syncSendDelayTimeMills("order-topic", orderDTO,
System.currentTimeMillis() + 30 * 60 * 1000); // 30 分钟后顺序消息
同一业务 Key 路由到同一 MessageQueue,Consumer 单线程处理:
// 生产者:按 orderId 哈希选队列
rocketMQTemplate.syncSendOrderly("order-topic", orderDTO, orderId.toString());@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
consumeMode = ConsumeMode.ORDERLY // 顺序消费模式
)
public class OrderConsumer implements RocketMQListener<OrderDTO> { ... }事务消息
解决本地事务与消息发送的原子性问题(先发消息,后执行本地事务):
Producer
│ 1. 发送半消息(Broker 暂不投递)
▼
Broker ── 半消息存储
│ 2. 本地事务执行
├── COMMIT → Broker 投递消息给消费者
└── ROLLBACK → Broker 删除半消息
│ 3. 超时未决 → Broker 回查本地事务状态
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
orderService.createOrder((OrderDTO) arg);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderId = msg.getHeaders().get("orderId", String.class);
return orderService.exists(orderId)
? RocketMQLocalTransactionState.COMMIT
: RocketMQLocalTransactionState.ROLLBACK;
}
}
// 发送事务消息
rocketMQTemplate.sendMessageInTransaction("order-topic",
MessageBuilder.withPayload(orderDTO)
.setHeader("orderId", orderDTO.getId())
.build(), orderDTO);消费重试与死信队列
- 集群模式下消费失败(抛异常)会自动重试,默认最多 16 次,间隔按延迟级别递增
- 超过最大重试次数后消息进入死信 Topic:
%DLQ%<ConsumerGroup>
// 监听死信队列,人工处理或告警
@RocketMQMessageListener(
topic = "%DLQ%order-consumer-group",
consumerGroup = "dlq-handler-group"
)
public class DLQHandler implements RocketMQListener<MessageExt> { ... }RocketMQ vs Kafka
| 特性 | RocketMQ | Kafka |
|---|---|---|
| 事务消息 | 原生支持 | 支持(Producer 事务,不含本地事务) |
| 延迟消息 | 原生支持 | 需插件(kafka-delay-queue) |
| 顺序消息 | 原生支持 | 分区内有序 |
| Tag 过滤 | 支持 | 不支持(需按 Topic 拆分) |
| 吞吐量 | 高(十万级/s) | 极高(百万级/s) |
| 消息回溯 | 按时间/Offset | 按 Offset |
| 适用场景 | 业务消息、电商、金融 | 日志流、大数据管道 |