消息驱动
Spring Cloud Stream 是 Spring Cloud 的消息驱动框架,通过 Binder 屏蔽底层 MQ(Kafka、RabbitMQ)的差异,以统一的编程模型实现微服务间的异步消息通信。
核心概念
| 概念 | 说明 |
|---|---|
| Binder | 与具体 MQ 的适配层(Kafka Binder / RabbitMQ Binder) |
| Binding | 消息通道与 MQ Topic/Queue 的绑定关系 |
| Consumer Group | 消费者组,同组内只有一个实例消费同一条消息 |
架构
Producer App Consumer App
┌──────────────┐ ┌──────────────┐
│ output chan │──Topic──►│ input chan │
└──────────────┘ └──────────────┘
↕ Binder ↕ Binder
Kafka / RabbitMQ Broker
依赖
<!-- Kafka Binder -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<!-- RabbitMQ Binder -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>函数式编程模型(3.x 推荐)
@SpringBootApplication
public class OrderApp {
// 消费者
@Bean
public Consumer<OrderEvent> orderSink() {
return event -> {
log.info("收到订单事件: {}", event);
processOrder(event);
};
}
// 处理器:输入 → 处理 → 输出
@Bean
public Function<OrderEvent, NotifyEvent> orderProcessor() {
return event -> new NotifyEvent(event.getOrderId(), "发货通知");
}
}手动发送消息(StreamBridge)
@Service
public class OrderService {
@Autowired
private StreamBridge streamBridge;
public void createOrder(Order order) {
orderRepository.save(order);
// 发送到 orderSource-out-0 binding
streamBridge.send("orderSource-out-0",
new OrderEvent(order.getId(), "CREATED"));
}
}配置
spring:
cloud:
stream:
# Binding 名称规则:${functionName}-in-${index} / ${functionName}-out-${index}
bindings:
orderSource-out-0:
destination: order-events # Kafka Topic / RabbitMQ Exchange
content-type: application/json
orderSink-in-0:
destination: order-events
group: notification-group # 消费者组(相同 group 只消费一次)
content-type: application/json
kafka:
binder:
brokers: localhost:9092
auto-create-topics: true消息重试
spring:
cloud:
stream:
bindings:
orderSink-in-0:
consumer:
max-attempts: 3
back-off-initial-interval: 1000
back-off-multiplier: 2.0
back-off-max-interval: 10000死信队列(RabbitMQ)
spring:
cloud:
stream:
rabbit:
bindings:
orderSink-in-0:
consumer:
auto-bind-dlq: true
republish-to-dlq: true消息分区(保序)
同一 Key 的消息路由到同一分区,保证顺序消费:
spring:
cloud:
stream:
bindings:
orderSource-out-0:
producer:
partition-key-expression: payload.userId
partition-count: 3
orderSink-in-0:
consumer:
partitioned: true
instance-count: 3
instance-index: 0与直接使用 Kafka/RabbitMQ 对比
| 特性 | Spring Cloud Stream | 直接使用 MQ SDK |
|---|---|---|
| MQ 耦合 | 低(换 MQ 只改依赖和配置) | 高(绑定 API) |
| 功能覆盖 | 常用场景 | 完整功能 |
| 适用场景 | 微服务异步解耦 | 需要高级 MQ 特性 |