消息队列

消息队列(MQ)用于服务间异步解耦流量削峰可靠投递。Spring Boot 通过 Spring AMQP(RabbitMQ)和 Spring Kafka 提供开箱即用的集成。


方案对比

特性RabbitMQKafkaRocketMQ
协议AMQP自定义(TCP)自定义
消息模型Exchange → QueueTopic → PartitionTopic → Queue
吞吐量万级百万级十万级
消息顺序Queue 内有序Partition 内有序Queue 内有序
延迟微秒级毫秒级毫秒级
消息堆积差(内存为主)强(磁盘存储)
延迟消息插件支持不原生支持原生支持
事务消息支持支持支持
适用场景业务解耦、RPC日志、流处理、大数据电商、金融

RabbitMQ(Spring AMQP)

依赖与配置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # 发布确认(生产者可靠性)
    publisher-confirm-type: correlated
    publisher-returns: true
    # 消费者确认
    listener:
      simple:
        acknowledge-mode: manual    # 手动确认
        prefetch: 10               # 每次预取 10 条
        concurrency: 3             # 并发消费者数
        max-concurrency: 10
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 1000ms

Exchange、Queue、Binding 声明

RabbitMQ 的路由模型:Producer → Exchange → (Binding + RoutingKey) → Queue → Consumer

@Configuration
public class RabbitConfig {
 
    // ---- 交换机类型 ----
    // Direct:精确匹配 routingKey
    @Bean
    public DirectExchange orderExchange() {
        return ExchangeBuilder.directExchange("order.exchange")
            .durable(true).build();
    }
 
    // Topic:通配符匹配(* 匹配一个词,# 匹配多个词)
    @Bean
    public TopicExchange notifyExchange() {
        return ExchangeBuilder.topicExchange("notify.exchange")
            .durable(true).build();
    }
 
    // Fanout:广播,忽略 routingKey
    @Bean
    public FanoutExchange broadcastExchange() {
        return new FanoutExchange("broadcast.exchange", true, false);
    }
 
    // ---- 队列 ----
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order.queue")
            // 绑定死信交换机
            .withArgument("x-dead-letter-exchange", "dlx.exchange")
            .withArgument("x-dead-letter-routing-key", "order.dead")
            // 消息 TTL(毫秒)
            .withArgument("x-message-ttl", 30000)
            // 最大长度
            .withArgument("x-max-length", 10000)
            .build();
    }
 
    // ---- 绑定 ----
    @Bean
    public Binding orderBinding(Queue orderQueue, DirectExchange orderExchange) {
        return BindingBuilder.bind(orderQueue)
            .to(orderExchange)
            .with("order.created");  // routingKey
    }
 
    // Topic 绑定示例:匹配 notify.email.* 和 notify.sms.*
    @Bean
    public Queue emailQueue() {
        return QueueBuilder.durable("email.queue").build();
    }
 
    @Bean
    public Binding emailBinding(Queue emailQueue, TopicExchange notifyExchange) {
        return BindingBuilder.bind(emailQueue)
            .to(notifyExchange)
            .with("notify.email.*");
    }
}

生产者:发送消息

@Service
@RequiredArgsConstructor
@Slf4j
public class OrderProducer {
 
    private final RabbitTemplate rabbitTemplate;
 
    // 基本发送
    public void sendOrderCreated(Order order) {
        rabbitTemplate.convertAndSend(
            "order.exchange",    // exchange
            "order.created",     // routingKey
            order                // payload(自动序列化)
        );
    }
 
    // 带消息属性(优先级、TTL、Header 等)
    public void sendWithProperties(Order order) {
        rabbitTemplate.convertAndSend("order.exchange", "order.created", order, message -> {
            MessageProperties props = message.getMessageProperties();
            props.setExpiration("60000");      // 单条消息 TTL 60 秒
            props.setPriority(5);
            props.setHeader("source", "api");
            props.setMessageId(UUID.randomUUID().toString());
            return message;
        });
    }
 
    // 发布确认(Publisher Confirm):确保消息到达 Broker
    public void sendReliable(Order order) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        correlationData.getFuture().addCallback(
            result -> {
                if (result != null && result.isAck()) {
                    log.info("消息已到达 Broker: {}", correlationData.getId());
                } else {
                    log.error("消息未到达 Broker: {}", result != null ? result.getReason() : "unknown");
                    // 重新发送或告警
                }
            },
            ex -> log.error("发送异常", ex)
        );
        rabbitTemplate.convertAndSend("order.exchange", "order.created", order, correlationData);
    }
 
    // RPC 模式(同步等待响应)
    public String rpcCall(String request) {
        return (String) rabbitTemplate.convertSendAndReceive(
            "rpc.exchange", "rpc.queue", request);
    }
}
 
// 配置 JSON 序列化(默认 JDK 序列化,可读性差)
@Bean
public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}
 
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory factory,
                                     MessageConverter messageConverter) {
    RabbitTemplate template = new RabbitTemplate(factory);
    template.setMessageConverter(messageConverter);
    // 消息路由失败时回调(mandatory = true)
    template.setReturnsCallback(returned ->
        log.error("消息路由失败: exchange={}, routingKey={}, replyCode={}",
            returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode()));
    return template;
}

消费者:接收消息

@Component
@Slf4j
@RequiredArgsConstructor
public class OrderConsumer {
 
    private final OrderService orderService;
 
    // 基本消费(自动确认模式)
    @RabbitListener(queues = "order.queue")
    public void handleOrder(Order order) {
        orderService.process(order);
    }
 
    // 手动确认(acknowledge-mode: manual)
    @RabbitListener(queues = "order.queue")
    public void handleOrderManual(Order order,
                                  Channel channel,
                                  @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            orderService.process(order);
            channel.basicAck(tag, false);   // 确认消费
        } catch (BusinessException e) {
            // 业务异常:拒绝并不重新入队(转入死信队列)
            channel.basicReject(tag, false);
        } catch (Exception e) {
            // 临时异常:重新入队重试
            log.error("消费异常,重新入队", e);
            channel.basicNack(tag, false, true);
        }
    }
 
    // 批量消费
    @RabbitListener(queues = "order.queue", containerFactory = "batchContainerFactory")
    public void handleBatch(List<Order> orders) {
        log.info("批量消费 {} 条", orders.size());
        orderService.batchProcess(orders);
    }
 
    // 消息头信息
    @RabbitListener(queues = "email.queue")
    public void handleEmail(@Payload EmailMessage message,
                            @Headers Map<String, Object> headers,
                            @Header("source") String source) {
        log.info("收到邮件消息,来源: {}", source);
        emailService.send(message);
    }
}

死信队列(DLQ)

消息在以下情况进入死信队列:消息被拒绝(basicReject/basicNack 且不重新入队)、消息 TTL 过期、队列达到最大长度。

@Configuration
public class DeadLetterConfig {
 
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlx.exchange", true, false);
    }
 
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable("order.dead.queue").build();
    }
 
    @Bean
    public Binding dlxBinding(Queue deadLetterQueue, DirectExchange dlxExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(dlxExchange).with("order.dead");
    }
}
 
// 死信消费者(告警 / 人工处理 / 重试)
@RabbitListener(queues = "order.dead.queue")
public void handleDeadLetter(Order order,
                             @Header(RabbitHeaders.X_DEATH) List<Map<String, Object>> xDeath) {
    log.error("死信消息: order={}, death={}", order, xDeath);
    alertService.notifyOps(order);
}

Kafka(Spring Kafka)

依赖与配置

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all            # 等待所有副本确认(最强可靠性)
      retries: 3
      batch-size: 16384    # 批次大小(字节)
      linger-ms: 5         # 等待攒批时间
      compression-type: snappy
    consumer:
      group-id: order-service
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest   # 首次消费从最早开始
      enable-auto-commit: false     # 关闭自动提交,手动控制
      max-poll-records: 100
      properties:
        spring.json.trusted.packages: "com.example.*"
    listener:
      ack-mode: manual_immediate    # 手动提交 offset
      concurrency: 3                # 并发消费者数(不超过分区数)

Topic 创建

@Configuration
public class KafkaTopicConfig {
 
    @Bean
    public NewTopic orderTopic() {
        return TopicBuilder.name("order-events")
            .partitions(6)      // 分区数(影响并发消费能力)
            .replicas(3)        // 副本数(影响可用性)
            .config(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(7 * 24 * 3600 * 1000L))
            .build();
    }
}

生产者

@Service
@RequiredArgsConstructor
@Slf4j
public class OrderKafkaProducer {
 
    private final KafkaTemplate<String, Object> kafkaTemplate;
 
    // 异步发送(默认)
    public void sendOrderEvent(Order order) {
        kafkaTemplate.send("order-events", order.getId().toString(), order)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("发送失败: order={}", order.getId(), ex);
                } else {
                    RecordMetadata meta = result.getRecordMetadata();
                    log.info("发送成功: topic={}, partition={}, offset={}",
                        meta.topic(), meta.partition(), meta.offset());
                }
            });
    }
 
    // 同步发送(等待确认)
    public void sendSync(Order order) throws Exception {
        SendResult<String, Object> result = kafkaTemplate
            .send("order-events", order.getId().toString(), order)
            .get(3, TimeUnit.SECONDS);
        log.info("同步发送成功: offset={}", result.getRecordMetadata().offset());
    }
 
    // 指定分区发送(保证同一用户的消息有序)
    public void sendToPartition(Order order) {
        int partition = (int) (order.getUserId() % 6);  // 按用户 ID 路由到固定分区
        kafkaTemplate.send("order-events", partition, order.getId().toString(), order);
    }
}

消费者

@Component
@Slf4j
public class OrderKafkaConsumer {
 
    // 基本消费
    @KafkaListener(topics = "order-events", groupId = "order-service")
    public void handleOrder(ConsumerRecord<String, Order> record, Acknowledgment ack) {
        try {
            Order order = record.value();
            log.info("消费消息: offset={}, key={}", record.offset(), record.key());
            orderService.process(order);
            ack.acknowledge();   // 手动提交 offset
        } catch (Exception e) {
            log.error("消费失败", e);
            // 不 ack,消息会重新投递
        }
    }
 
    // 批量消费
    @KafkaListener(topics = "order-events", groupId = "batch-group",
                   containerFactory = "batchContainerFactory")
    public void handleBatch(List<ConsumerRecord<String, Order>> records, Acknowledgment ack) {
        log.info("批量消费 {} 条", records.size());
        List<Order> orders = records.stream().map(ConsumerRecord::value).toList();
        orderService.batchProcess(orders);
        ack.acknowledge();
    }
 
    // 多 Topic / 分区监听
    @KafkaListener(
        topicPartitions = @TopicPartition(
            topic = "order-events",
            partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")
        )
    )
    public void handlePartition0(Order order) {
        // 只消费分区 0 的消息
    }
 
    // 消费者组再平衡监听
    @KafkaListener(topics = "order-events", groupId = "order-service")
    @SendTo("order-result-topic")  // 处理结果发送到另一个 Topic
    public OrderResult handleAndForward(Order order) {
        return orderService.processAndReturn(order);
    }
}

错误处理与重试

@Configuration
public class KafkaConfig {
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
            ConsumerFactory<String, Object> consumerFactory) {
 
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
 
        // 固定间隔重试
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new FixedBackOff(1000L, 3L)  // 间隔 1s,最多重试 3 次
        ));
 
        // 指数退避重试 + 死信 Topic
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            recoverer,
            new ExponentialBackOffWithMaxRetries(3)
        );
        // 非重试异常(业务异常直接进死信)
        errorHandler.addNotRetryableExceptions(BusinessException.class);
        factory.setCommonErrorHandler(errorHandler);
 
        return factory;
    }
}

事务消息

RabbitMQ 事务

// RabbitMQ 原生事务(性能差,不推荐。推荐用 Publisher Confirm 替代)
@Transactional
public void sendInTransaction(Order order) {
    rabbitTemplate.execute(channel -> {
        channel.txSelect();
        try {
            channel.basicPublish("order.exchange", "order.created", null,
                objectMapper.writeValueAsBytes(order));
            channel.txCommit();
        } catch (Exception e) {
            channel.txRollback();
            throw new RuntimeException(e);
        }
        return null;
    });
}

Kafka 事务

// 开启 Kafka 事务(producer 配置需设置 transactional-id-prefix)
@Transactional("kafkaTransactionManager")
public void sendTransactional(Order order) {
    kafkaTemplate.send("order-events", order.getId().toString(), order);
    kafkaTemplate.send("inventory-events", order.getId().toString(), order);
    // 两条消息原子发送,要么全成功要么全回滚
}

本地消息表(推荐可靠方案)

将消息写入数据库与业务操作在同一事务中,再由定时任务轮询发送:

@Transactional
public void createOrder(Order order) {
    // 1. 保存业务数据
    orderRepo.save(order);
 
    // 2. 在同一事务中写入待发送消息
    OutboxMessage msg = OutboxMessage.builder()
        .topic("order-events")
        .key(order.getId().toString())
        .payload(toJson(order))
        .status(MessageStatus.PENDING)
        .build();
    outboxRepo.save(msg);
}
 
// 定时扫描 + 发送(与 [[定时任务]] 配合)
@Scheduled(fixedDelay = 1000)
public void processOutbox() {
    List<OutboxMessage> pending = outboxRepo.findByStatus(MessageStatus.PENDING, 100);
    pending.forEach(msg -> {
        try {
            kafkaTemplate.send(msg.getTopic(), msg.getKey(), msg.getPayload()).get();
            outboxRepo.updateStatus(msg.getId(), MessageStatus.SENT);
        } catch (Exception e) {
            outboxRepo.incrementRetryCount(msg.getId());
        }
    });
}

消息顺序性保证

场景方案
同一队列(RabbitMQ)单个消费者,禁用并发
Kafka 同一分区相同 key 路由到同一分区,单线程消费该分区
全局顺序(Kafka)单分区,性能受限(不推荐)
业务顺序消息携带序号,消费端幂等处理乱序

消息可靠性总结

阶段RabbitMQKafka
生产者 → BrokerPublisher Confirmacks=all + 重试
Broker 持久化队列/消息 durable磁盘存储 + 副本
Broker → 消费者手动 ACK + 死信队列手动提交 offset + 死信 Topic
业务幂等消息 ID 去重幂等消费 + offset 去重

消息幂等处理参见 接口幂等性,分布式事务参见 事务管理


相关链接