RocketMQ

Apache RocketMQ 是阿里巴巴开源的分布式消息中间件,脱胎于双十一大促场景,在事务消息、顺序消息、延迟消息方面提供原生支持,是国内 Java 微服务生态中与 Kafka 并列的主流选择。


核心概念

概念说明
NameServer轻量级注册中心,存储 Topic 路由信息,无状态可多实例部署
Broker消息存储节点,Master/Slave 架构,负责消息读写
Topic消息逻辑分类,一个 Topic 可跨多个 Broker
MessageQueueTopic 的物理分片(类似 Kafka 的 Partition)
Producer消息生产者,通过 NameServer 发现 Broker
Consumer Group消费者组,广播模式或集群模式消费
TagTopic 下的子类型过滤标签,无需建新 Topic

架构

Producer
   │  1. 从 NameServer 获取 Topic 路由
   │  2. 按策略选择 MessageQueue 发送
   ▼
Broker Master ──同步/异步复制──▶ Broker Slave
   │  CommitLog(顺序追加写)
   │  ConsumeQueue(每个 Topic+Queue 一个索引文件)
   ▼
Consumer Group(集群模式:每个 Queue 只分配一个 Consumer)

快速上手(Spring Boot)

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.0</version>
</dependency>
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: order-producer-group
    send-message-timeout: 3000

发送普通消息

@Autowired
private RocketMQTemplate rocketMQTemplate;
 
// 同步发送
SendResult result = rocketMQTemplate.syncSend("order-topic:CREATE", orderDTO);
 
// 异步发送
rocketMQTemplate.asyncSend("order-topic", orderDTO, new SendCallback() {
    @Override public void onSuccess(SendResult r) { log.info("sent: {}", r.getMsgId()); }
    @Override public void onException(Throwable e) { log.error("send failed", e); }
});
 
// 单向发送(日志场景,不关心结果)
rocketMQTemplate.sendOneWay("log-topic", logDTO);

消费消息

@Component
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer-group",
    selectorExpression = "CREATE || UPDATE"   // Tag 过滤
)
public class OrderConsumer implements RocketMQListener<OrderDTO> {
    @Override
    public void onMessage(OrderDTO order) {
        orderService.process(order);           // 抛异常则重试
    }
}

延迟消息

RocketMQ 内置 18 个延迟级别(RocketMQ 5.x 支持任意精度延迟):

Level: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// 发送延迟消息(Level 3 = 10s 后投递)
Message<OrderDTO> msg = MessageBuilder.withPayload(orderDTO).build();
rocketMQTemplate.syncSend("order-topic", msg, 3000, 3);

RocketMQ 5.x 支持任意时间点:

rocketMQTemplate.syncSendDelayTimeMills("order-topic", orderDTO, 
    System.currentTimeMillis() + 30 * 60 * 1000);  // 30 分钟后

顺序消息

同一业务 Key 路由到同一 MessageQueue,Consumer 单线程处理:

// 生产者:按 orderId 哈希选队列
rocketMQTemplate.syncSendOrderly("order-topic", orderDTO, orderId.toString());
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer-group",
    consumeMode = ConsumeMode.ORDERLY    // 顺序消费模式
)
public class OrderConsumer implements RocketMQListener<OrderDTO> { ... }

事务消息

解决本地事务与消息发送的原子性问题(先发消息,后执行本地事务):

Producer
  │  1. 发送半消息(Broker 暂不投递)
  ▼
Broker ── 半消息存储
  │  2. 本地事务执行
  ├── COMMIT  → Broker 投递消息给消费者
  └── ROLLBACK → Broker 删除半消息
  │  3. 超时未决 → Broker 回查本地事务状态
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
 
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            orderService.createOrder((OrderDTO) arg);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
 
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String orderId = msg.getHeaders().get("orderId", String.class);
        return orderService.exists(orderId)
            ? RocketMQLocalTransactionState.COMMIT
            : RocketMQLocalTransactionState.ROLLBACK;
    }
}
 
// 发送事务消息
rocketMQTemplate.sendMessageInTransaction("order-topic",
    MessageBuilder.withPayload(orderDTO)
        .setHeader("orderId", orderDTO.getId())
        .build(), orderDTO);

消费重试与死信队列

  • 集群模式下消费失败(抛异常)会自动重试,默认最多 16 次,间隔按延迟级别递增
  • 超过最大重试次数后消息进入死信 Topic%DLQ%<ConsumerGroup>
// 监听死信队列,人工处理或告警
@RocketMQMessageListener(
    topic = "%DLQ%order-consumer-group",
    consumerGroup = "dlq-handler-group"
)
public class DLQHandler implements RocketMQListener<MessageExt> { ... }

RocketMQ vs Kafka

特性RocketMQKafka
事务消息原生支持支持(Producer 事务,不含本地事务)
延迟消息原生支持需插件(kafka-delay-queue)
顺序消息原生支持分区内有序
Tag 过滤支持不支持(需按 Topic 拆分)
吞吐量高(十万级/s)极高(百万级/s)
消息回溯按时间/Offset按 Offset
适用场景业务消息、电商、金融日志流、大数据管道

相关链接

  • Kafka — 高吞吐流式平台,适合日志与大数据
  • RabbitMQ — 灵活路由,适合复杂业务拓扑
  • 消息队列 ← 返回消息队列目录

Spring Cloud 实战