RabbitMQ

RabbitMQ 是基于 AMQP 协议的开源消息代理,以灵活的路由、可靠投递和低延迟著称,适合业务解耦、任务队列、延迟消息等场景。

核心模型

Producer → Exchange → (Binding + RoutingKey) → Queue → Consumer

消息不直接发到队列,必须经过 Exchange 路由。

Exchange 类型

类型路由规则适用场景
directRoutingKey 完全匹配点对点、任务队列
fanout广播到所有绑定队列日志广播、事件通知
topicRoutingKey 通配符(* 单词,# 多词)多维度消息分发
headers按消息 Header 匹配复杂条件路由(少用)

快速接入(Spring Boot)

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated  # Publisher Confirm
    publisher-returns: true             # Publisher Return

声明 Exchange / Queue / Binding

@Configuration
public class RabbitConfig {
 
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange("order.exchange");
    }
 
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order.queue")
            .withArgument("x-dead-letter-exchange", "dlx.exchange")
            .build();
    }
 
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
            .to(orderExchange()).with("order.created");
    }
}

发送

rabbitTemplate.convertAndSend("order.exchange", "order.created", orderDTO);

消费(手动 ACK)

@RabbitListener(queues = "order.queue")
public void onOrderCreated(OrderDTO order, Channel channel,
                           @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    try {
        processOrder(order);
        channel.basicAck(tag, false);
    } catch (Exception e) {
        channel.basicNack(tag, false, false); // 拒绝 → 进死信队列
    }
}

消息可靠性三保障

阶段机制说明
生产者 → BrokerPublisher ConfirmBroker 收到后异步回调确认
Broker 持久化队列 durable + 消息 persistent重启不丢
Broker → 消费者手动 ACK处理成功后再确认,失败可重投

延迟队列(TTL + DLX)

Producer → normal.exchange → normal.queue (TTL=30s)
                                    │ 消息超时
                                    ▼
                              dlx.exchange → delay.queue → Consumer
Queue normalQueue = QueueBuilder.durable("normal.queue")
    .withArgument("x-message-ttl", 30_000)
    .withArgument("x-dead-letter-exchange", "dlx.exchange")
    .withArgument("x-dead-letter-routing-key", "delay.key")
    .build();

死信队列(DLX)

消息进入死信队列的三种情况:

  1. 消息被拒绝(basicNack)且 requeue=false
  2. 消息 TTL 超时
  3. 队列长度超过 x-max-length

消息幂等

RabbitMQ 不保证仅投递一次,消费者需自行保证幂等:

  • 数据库唯一索引(消息 ID 去重)
  • Redis SET NX 防重
  • 业务状态机判断(已处理则直接 ACK)

与 Kafka 对比

特性RabbitMQKafka
消息路由灵活(Exchange 类型)简单(Topic)
延迟消息原生支持(TTL+DLX)需插件
消息回溯不支持支持(offset)
吞吐量万级/s百万级/s
适用场景业务解耦、延迟任务日志、流处理

相关链接

Spring Cloud 实战