Kafka 集成
→ 返回 Spring Cloud
微服务里使用 Kafka 常见两条路径:Spring Cloud Stream(统一 Binding,推荐多 MQ 场景)与 Spring for Apache Kafka(KafkaTemplate / @KafkaListener,控制更细)。原理与 Broker 概念见 Kafka、消息队列架构。
选型
| 方式 | 依赖 | 适用 |
|---|
| Spring Cloud Stream | spring-cloud-starter-stream-kafka | 与 Rabbit 可切换、函数式模型 |
| Spring Kafka | spring-kafka | 精确控制 Producer/Consumer、事务消息 |
通用编程模型见 消息驱动。
Spring Cloud Stream(Kafka Binder)
依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
配置
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka1:9092,kafka2:9092
auto-create-topics: false
bindings:
order-out-0:
destination: order.events
content-type: application/json
order-in-0:
destination: order.events
group: inventory-service
consumer:
max-attempts: 3
生产与消费
@Bean
public Consumer<OrderEvent> orderEvents() {
return event -> inventoryService.deduct(event);
}
// 发送
streamBridge.send("order-out-0", new OrderEvent(orderId, "CREATED"));
关键生产参数(对照中间件文档)
| 参数 | 含义 |
|---|
acks=all | 全 ISR 确认,可靠性最高 |
enable.idempotence=true | 幂等生产者 |
retries | 发送失败重试 |
Spring Kafka(原生)
依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
生产者
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
消费者(手动 ACK)
@KafkaListener(topics = "order.events", groupId = "inventory-service")
public void onMessage(OrderEvent event, Acknowledgment ack) {
try {
inventoryService.deduct(event);
ack.acknowledge();
} catch (Exception e) {
// 不 ack → 重试;或投递 DLQ
throw e;
}
}
spring:
kafka:
consumer:
enable-auto-commit: false
listener:
ack-mode: manual
微服务实践要点
| 主题 | 做法 |
|---|
| 分区键 | 同一 orderId 进同一分区,保证顺序 |
| 幂等消费 | 业务唯一键 + DB 唯一约束 / Redis SETNX |
| 死信 | 重试耗尽后写 *.DLT 或独立 Topic |
| 与 Feign | 优先事件驱动替代长 Feign 链 |
| 削峰 | 见 削峰填谷 |
相关链接
Spring Cloud
中间件与架构