RabbitMQ
RabbitMQ 是基于 AMQP 协议的开源消息代理,以灵活的路由、可靠投递和低延迟著称,适合业务解耦、任务队列、延迟消息等场景。
核心模型
Producer → Exchange → (Binding + RoutingKey) → Queue → Consumer
消息不直接发到队列,必须经过 Exchange 路由。
Exchange 类型
| 类型 | 路由规则 | 适用场景 |
|---|---|---|
| direct | RoutingKey 完全匹配 | 点对点、任务队列 |
| fanout | 广播到所有绑定队列 | 日志广播、事件通知 |
| topic | RoutingKey 通配符(* 单词,# 多词) | 多维度消息分发 |
| headers | 按消息 Header 匹配 | 复杂条件路由(少用) |
快速接入(Spring Boot)
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated # Publisher Confirm
publisher-returns: true # Publisher Return声明 Exchange / Queue / Binding
@Configuration
public class RabbitConfig {
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.exchange");
}
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.build();
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange()).with("order.created");
}
}发送
rabbitTemplate.convertAndSend("order.exchange", "order.created", orderDTO);消费(手动 ACK)
@RabbitListener(queues = "order.queue")
public void onOrderCreated(OrderDTO order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
processOrder(order);
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, false); // 拒绝 → 进死信队列
}
}消息可靠性三保障
| 阶段 | 机制 | 说明 |
|---|---|---|
| 生产者 → Broker | Publisher Confirm | Broker 收到后异步回调确认 |
| Broker 持久化 | 队列 durable + 消息 persistent | 重启不丢 |
| Broker → 消费者 | 手动 ACK | 处理成功后再确认,失败可重投 |
延迟队列(TTL + DLX)
Producer → normal.exchange → normal.queue (TTL=30s)
│ 消息超时
▼
dlx.exchange → delay.queue → Consumer
Queue normalQueue = QueueBuilder.durable("normal.queue")
.withArgument("x-message-ttl", 30_000)
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.withArgument("x-dead-letter-routing-key", "delay.key")
.build();死信队列(DLX)
消息进入死信队列的三种情况:
- 消息被拒绝(
basicNack)且requeue=false - 消息 TTL 超时
- 队列长度超过
x-max-length
消息幂等
RabbitMQ 不保证仅投递一次,消费者需自行保证幂等:
- 数据库唯一索引(消息 ID 去重)
- Redis
SET NX防重 - 业务状态机判断(已处理则直接 ACK)
与 Kafka 对比
| 特性 | RabbitMQ | Kafka |
|---|---|---|
| 消息路由 | 灵活(Exchange 类型) | 简单(Topic) |
| 延迟消息 | 原生支持(TTL+DLX) | 需插件 |
| 消息回溯 | 不支持 | 支持(offset) |
| 吞吐量 | 万级/s | 百万级/s |
| 适用场景 | 业务解耦、延迟任务 | 日志、流处理 |