消息队列
消息队列(MQ)用于服务间异步解耦、流量削峰和可靠投递。Spring Boot 通过 Spring AMQP(RabbitMQ)和 Spring Kafka 提供开箱即用的集成。
方案对比
| 特性 | RabbitMQ | Kafka | RocketMQ |
|---|---|---|---|
| 协议 | AMQP | 自定义(TCP) | 自定义 |
| 消息模型 | Exchange → Queue | Topic → Partition | Topic → Queue |
| 吞吐量 | 万级 | 百万级 | 十万级 |
| 消息顺序 | Queue 内有序 | Partition 内有序 | Queue 内有序 |
| 延迟 | 微秒级 | 毫秒级 | 毫秒级 |
| 消息堆积 | 差(内存为主) | 强(磁盘存储) | 强 |
| 延迟消息 | 插件支持 | 不原生支持 | 原生支持 |
| 事务消息 | 支持 | 支持 | 支持 |
| 适用场景 | 业务解耦、RPC | 日志、流处理、大数据 | 电商、金融 |
RabbitMQ(Spring AMQP)
依赖与配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 发布确认(生产者可靠性)
publisher-confirm-type: correlated
publisher-returns: true
# 消费者确认
listener:
simple:
acknowledge-mode: manual # 手动确认
prefetch: 10 # 每次预取 10 条
concurrency: 3 # 并发消费者数
max-concurrency: 10
retry:
enabled: true
max-attempts: 3
initial-interval: 1000msExchange、Queue、Binding 声明
RabbitMQ 的路由模型:Producer → Exchange → (Binding + RoutingKey) → Queue → Consumer
@Configuration
public class RabbitConfig {
// ---- 交换机类型 ----
// Direct:精确匹配 routingKey
@Bean
public DirectExchange orderExchange() {
return ExchangeBuilder.directExchange("order.exchange")
.durable(true).build();
}
// Topic:通配符匹配(* 匹配一个词,# 匹配多个词)
@Bean
public TopicExchange notifyExchange() {
return ExchangeBuilder.topicExchange("notify.exchange")
.durable(true).build();
}
// Fanout:广播,忽略 routingKey
@Bean
public FanoutExchange broadcastExchange() {
return new FanoutExchange("broadcast.exchange", true, false);
}
// ---- 队列 ----
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
// 绑定死信交换机
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.withArgument("x-dead-letter-routing-key", "order.dead")
// 消息 TTL(毫秒)
.withArgument("x-message-ttl", 30000)
// 最大长度
.withArgument("x-max-length", 10000)
.build();
}
// ---- 绑定 ----
@Bean
public Binding orderBinding(Queue orderQueue, DirectExchange orderExchange) {
return BindingBuilder.bind(orderQueue)
.to(orderExchange)
.with("order.created"); // routingKey
}
// Topic 绑定示例:匹配 notify.email.* 和 notify.sms.*
@Bean
public Queue emailQueue() {
return QueueBuilder.durable("email.queue").build();
}
@Bean
public Binding emailBinding(Queue emailQueue, TopicExchange notifyExchange) {
return BindingBuilder.bind(emailQueue)
.to(notifyExchange)
.with("notify.email.*");
}
}生产者:发送消息
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderProducer {
private final RabbitTemplate rabbitTemplate;
// 基本发送
public void sendOrderCreated(Order order) {
rabbitTemplate.convertAndSend(
"order.exchange", // exchange
"order.created", // routingKey
order // payload(自动序列化)
);
}
// 带消息属性(优先级、TTL、Header 等)
public void sendWithProperties(Order order) {
rabbitTemplate.convertAndSend("order.exchange", "order.created", order, message -> {
MessageProperties props = message.getMessageProperties();
props.setExpiration("60000"); // 单条消息 TTL 60 秒
props.setPriority(5);
props.setHeader("source", "api");
props.setMessageId(UUID.randomUUID().toString());
return message;
});
}
// 发布确认(Publisher Confirm):确保消息到达 Broker
public void sendReliable(Order order) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(
result -> {
if (result != null && result.isAck()) {
log.info("消息已到达 Broker: {}", correlationData.getId());
} else {
log.error("消息未到达 Broker: {}", result != null ? result.getReason() : "unknown");
// 重新发送或告警
}
},
ex -> log.error("发送异常", ex)
);
rabbitTemplate.convertAndSend("order.exchange", "order.created", order, correlationData);
}
// RPC 模式(同步等待响应)
public String rpcCall(String request) {
return (String) rabbitTemplate.convertSendAndReceive(
"rpc.exchange", "rpc.queue", request);
}
}
// 配置 JSON 序列化(默认 JDK 序列化,可读性差)
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory factory,
MessageConverter messageConverter) {
RabbitTemplate template = new RabbitTemplate(factory);
template.setMessageConverter(messageConverter);
// 消息路由失败时回调(mandatory = true)
template.setReturnsCallback(returned ->
log.error("消息路由失败: exchange={}, routingKey={}, replyCode={}",
returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode()));
return template;
}消费者:接收消息
@Component
@Slf4j
@RequiredArgsConstructor
public class OrderConsumer {
private final OrderService orderService;
// 基本消费(自动确认模式)
@RabbitListener(queues = "order.queue")
public void handleOrder(Order order) {
orderService.process(order);
}
// 手动确认(acknowledge-mode: manual)
@RabbitListener(queues = "order.queue")
public void handleOrderManual(Order order,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
orderService.process(order);
channel.basicAck(tag, false); // 确认消费
} catch (BusinessException e) {
// 业务异常:拒绝并不重新入队(转入死信队列)
channel.basicReject(tag, false);
} catch (Exception e) {
// 临时异常:重新入队重试
log.error("消费异常,重新入队", e);
channel.basicNack(tag, false, true);
}
}
// 批量消费
@RabbitListener(queues = "order.queue", containerFactory = "batchContainerFactory")
public void handleBatch(List<Order> orders) {
log.info("批量消费 {} 条", orders.size());
orderService.batchProcess(orders);
}
// 消息头信息
@RabbitListener(queues = "email.queue")
public void handleEmail(@Payload EmailMessage message,
@Headers Map<String, Object> headers,
@Header("source") String source) {
log.info("收到邮件消息,来源: {}", source);
emailService.send(message);
}
}死信队列(DLQ)
消息在以下情况进入死信队列:消息被拒绝(basicReject/basicNack 且不重新入队)、消息 TTL 过期、队列达到最大长度。
@Configuration
public class DeadLetterConfig {
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx.exchange", true, false);
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("order.dead.queue").build();
}
@Bean
public Binding dlxBinding(Queue deadLetterQueue, DirectExchange dlxExchange) {
return BindingBuilder.bind(deadLetterQueue).to(dlxExchange).with("order.dead");
}
}
// 死信消费者(告警 / 人工处理 / 重试)
@RabbitListener(queues = "order.dead.queue")
public void handleDeadLetter(Order order,
@Header(RabbitHeaders.X_DEATH) List<Map<String, Object>> xDeath) {
log.error("死信消息: order={}, death={}", order, xDeath);
alertService.notifyOps(order);
}Kafka(Spring Kafka)
依赖与配置
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all # 等待所有副本确认(最强可靠性)
retries: 3
batch-size: 16384 # 批次大小(字节)
linger-ms: 5 # 等待攒批时间
compression-type: snappy
consumer:
group-id: order-service
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest # 首次消费从最早开始
enable-auto-commit: false # 关闭自动提交,手动控制
max-poll-records: 100
properties:
spring.json.trusted.packages: "com.example.*"
listener:
ack-mode: manual_immediate # 手动提交 offset
concurrency: 3 # 并发消费者数(不超过分区数)Topic 创建
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic orderTopic() {
return TopicBuilder.name("order-events")
.partitions(6) // 分区数(影响并发消费能力)
.replicas(3) // 副本数(影响可用性)
.config(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(7 * 24 * 3600 * 1000L))
.build();
}
}生产者
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderKafkaProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
// 异步发送(默认)
public void sendOrderEvent(Order order) {
kafkaTemplate.send("order-events", order.getId().toString(), order)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("发送失败: order={}", order.getId(), ex);
} else {
RecordMetadata meta = result.getRecordMetadata();
log.info("发送成功: topic={}, partition={}, offset={}",
meta.topic(), meta.partition(), meta.offset());
}
});
}
// 同步发送(等待确认)
public void sendSync(Order order) throws Exception {
SendResult<String, Object> result = kafkaTemplate
.send("order-events", order.getId().toString(), order)
.get(3, TimeUnit.SECONDS);
log.info("同步发送成功: offset={}", result.getRecordMetadata().offset());
}
// 指定分区发送(保证同一用户的消息有序)
public void sendToPartition(Order order) {
int partition = (int) (order.getUserId() % 6); // 按用户 ID 路由到固定分区
kafkaTemplate.send("order-events", partition, order.getId().toString(), order);
}
}消费者
@Component
@Slf4j
public class OrderKafkaConsumer {
// 基本消费
@KafkaListener(topics = "order-events", groupId = "order-service")
public void handleOrder(ConsumerRecord<String, Order> record, Acknowledgment ack) {
try {
Order order = record.value();
log.info("消费消息: offset={}, key={}", record.offset(), record.key());
orderService.process(order);
ack.acknowledge(); // 手动提交 offset
} catch (Exception e) {
log.error("消费失败", e);
// 不 ack,消息会重新投递
}
}
// 批量消费
@KafkaListener(topics = "order-events", groupId = "batch-group",
containerFactory = "batchContainerFactory")
public void handleBatch(List<ConsumerRecord<String, Order>> records, Acknowledgment ack) {
log.info("批量消费 {} 条", records.size());
List<Order> orders = records.stream().map(ConsumerRecord::value).toList();
orderService.batchProcess(orders);
ack.acknowledge();
}
// 多 Topic / 分区监听
@KafkaListener(
topicPartitions = @TopicPartition(
topic = "order-events",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")
)
)
public void handlePartition0(Order order) {
// 只消费分区 0 的消息
}
// 消费者组再平衡监听
@KafkaListener(topics = "order-events", groupId = "order-service")
@SendTo("order-result-topic") // 处理结果发送到另一个 Topic
public OrderResult handleAndForward(Order order) {
return orderService.processAndReturn(order);
}
}错误处理与重试
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 固定间隔重试
factory.setCommonErrorHandler(new DefaultErrorHandler(
new FixedBackOff(1000L, 3L) // 间隔 1s,最多重试 3 次
));
// 指数退避重试 + 死信 Topic
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
recoverer,
new ExponentialBackOffWithMaxRetries(3)
);
// 非重试异常(业务异常直接进死信)
errorHandler.addNotRetryableExceptions(BusinessException.class);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}事务消息
RabbitMQ 事务
// RabbitMQ 原生事务(性能差,不推荐。推荐用 Publisher Confirm 替代)
@Transactional
public void sendInTransaction(Order order) {
rabbitTemplate.execute(channel -> {
channel.txSelect();
try {
channel.basicPublish("order.exchange", "order.created", null,
objectMapper.writeValueAsBytes(order));
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
throw new RuntimeException(e);
}
return null;
});
}Kafka 事务
// 开启 Kafka 事务(producer 配置需设置 transactional-id-prefix)
@Transactional("kafkaTransactionManager")
public void sendTransactional(Order order) {
kafkaTemplate.send("order-events", order.getId().toString(), order);
kafkaTemplate.send("inventory-events", order.getId().toString(), order);
// 两条消息原子发送,要么全成功要么全回滚
}本地消息表(推荐可靠方案)
将消息写入数据库与业务操作在同一事务中,再由定时任务轮询发送:
@Transactional
public void createOrder(Order order) {
// 1. 保存业务数据
orderRepo.save(order);
// 2. 在同一事务中写入待发送消息
OutboxMessage msg = OutboxMessage.builder()
.topic("order-events")
.key(order.getId().toString())
.payload(toJson(order))
.status(MessageStatus.PENDING)
.build();
outboxRepo.save(msg);
}
// 定时扫描 + 发送(与 [[定时任务]] 配合)
@Scheduled(fixedDelay = 1000)
public void processOutbox() {
List<OutboxMessage> pending = outboxRepo.findByStatus(MessageStatus.PENDING, 100);
pending.forEach(msg -> {
try {
kafkaTemplate.send(msg.getTopic(), msg.getKey(), msg.getPayload()).get();
outboxRepo.updateStatus(msg.getId(), MessageStatus.SENT);
} catch (Exception e) {
outboxRepo.incrementRetryCount(msg.getId());
}
});
}消息顺序性保证
| 场景 | 方案 |
|---|---|
| 同一队列(RabbitMQ) | 单个消费者,禁用并发 |
| Kafka 同一分区 | 相同 key 路由到同一分区,单线程消费该分区 |
| 全局顺序(Kafka) | 单分区,性能受限(不推荐) |
| 业务顺序 | 消息携带序号,消费端幂等处理乱序 |
消息可靠性总结
| 阶段 | RabbitMQ | Kafka |
|---|---|---|
| 生产者 → Broker | Publisher Confirm | acks=all + 重试 |
| Broker 持久化 | 队列/消息 durable | 磁盘存储 + 副本 |
| Broker → 消费者 | 手动 ACK + 死信队列 | 手动提交 offset + 死信 Topic |
| 业务幂等 | 消息 ID 去重 | 幂等消费 + offset 去重 |