消息驱动

返回 Spring Cloud

Spring Cloud Stream 是 Spring Cloud 的消息驱动框架,通过 Binder 屏蔽底层 MQ(Kafka、RabbitMQ)的差异,以统一的编程模型实现微服务间的异步消息通信。

核心概念

概念说明
Binder与具体 MQ 的适配层(Kafka Binder / RabbitMQ Binder)
Binding消息通道与 MQ Topic/Queue 的绑定关系
Consumer Group消费者组,同组内只有一个实例消费同一条消息

架构

Producer App              Consumer App
┌──────────────┐          ┌──────────────┐
│ output chan  │──Topic──►│ input chan    │
└──────────────┘          └──────────────┘
      ↕ Binder                   ↕ Binder
        Kafka / RabbitMQ Broker

依赖

<!-- Kafka Binder -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
 
<!-- RabbitMQ Binder -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

函数式编程模型(3.x 推荐)

@SpringBootApplication
public class OrderApp {
 
    // 消费者
    @Bean
    public Consumer<OrderEvent> orderSink() {
        return event -> {
            log.info("收到订单事件: {}", event);
            processOrder(event);
        };
    }
 
    // 处理器:输入 → 处理 → 输出
    @Bean
    public Function<OrderEvent, NotifyEvent> orderProcessor() {
        return event -> new NotifyEvent(event.getOrderId(), "发货通知");
    }
}

手动发送消息(StreamBridge)

@Service
public class OrderService {
    @Autowired
    private StreamBridge streamBridge;
 
    public void createOrder(Order order) {
        orderRepository.save(order);
        // 发送到 orderSource-out-0 binding
        streamBridge.send("orderSource-out-0",
            new OrderEvent(order.getId(), "CREATED"));
    }
}

配置

spring:
  cloud:
    stream:
      # Binding 名称规则:${functionName}-in-${index} / ${functionName}-out-${index}
      bindings:
        orderSource-out-0:
          destination: order-events      # Kafka Topic / RabbitMQ Exchange
          content-type: application/json
        orderSink-in-0:
          destination: order-events
          group: notification-group      # 消费者组(相同 group 只消费一次)
          content-type: application/json
      kafka:
        binder:
          brokers: localhost:9092
          auto-create-topics: true

消息重试

spring:
  cloud:
    stream:
      bindings:
        orderSink-in-0:
          consumer:
            max-attempts: 3
            back-off-initial-interval: 1000
            back-off-multiplier: 2.0
            back-off-max-interval: 10000

死信队列(RabbitMQ)

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          orderSink-in-0:
            consumer:
              auto-bind-dlq: true
              republish-to-dlq: true

消息分区(保序)

同一 Key 的消息路由到同一分区,保证顺序消费:

spring:
  cloud:
    stream:
      bindings:
        orderSource-out-0:
          producer:
            partition-key-expression: payload.userId
            partition-count: 3
        orderSink-in-0:
          consumer:
            partitioned: true
            instance-count: 3
            instance-index: 0

与直接使用 Kafka/RabbitMQ 对比

特性Spring Cloud Stream直接使用 MQ SDK
MQ 耦合低(换 MQ 只改依赖和配置)高(绑定 API)
功能覆盖常用场景完整功能
适用场景微服务异步解耦需要高级 MQ 特性

相关链接

Spring Cloud

架构与中间件