Kafka 集成

返回 Spring Cloud

微服务里使用 Kafka 常见两条路径:Spring Cloud Stream(统一 Binding,推荐多 MQ 场景)与 Spring for Apache KafkaKafkaTemplate / @KafkaListener,控制更细)。原理与 Broker 概念见 Kafka消息队列架构


选型

方式依赖适用
Spring Cloud Streamspring-cloud-starter-stream-kafka与 Rabbit 可切换、函数式模型
Spring Kafkaspring-kafka精确控制 Producer/Consumer、事务消息

通用编程模型见 消息驱动


Spring Cloud Stream(Kafka Binder)

依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

配置

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: kafka1:9092,kafka2:9092
          auto-create-topics: false
      bindings:
        order-out-0:
          destination: order.events
          content-type: application/json
        order-in-0:
          destination: order.events
          group: inventory-service
          consumer:
            max-attempts: 3

生产与消费

@Bean
public Consumer<OrderEvent> orderEvents() {
    return event -> inventoryService.deduct(event);
}
 
// 发送
streamBridge.send("order-out-0", new OrderEvent(orderId, "CREATED"));

关键生产参数(对照中间件文档)

参数含义
acks=all全 ISR 确认,可靠性最高
enable.idempotence=true幂等生产者
retries发送失败重试

Spring Kafka(原生)

依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

生产者

@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        return new DefaultKafkaProducerFactory<>(props);
    }
 
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

消费者(手动 ACK)

@KafkaListener(topics = "order.events", groupId = "inventory-service")
public void onMessage(OrderEvent event, Acknowledgment ack) {
    try {
        inventoryService.deduct(event);
        ack.acknowledge();
    } catch (Exception e) {
        // 不 ack → 重试;或投递 DLQ
        throw e;
    }
}
spring:
  kafka:
    consumer:
      enable-auto-commit: false
    listener:
      ack-mode: manual

微服务实践要点

主题做法
分区键同一 orderId 进同一分区,保证顺序
幂等消费业务唯一键 + DB 唯一约束 / Redis SETNX
死信重试耗尽后写 *.DLT 或独立 Topic
与 Feign优先事件驱动替代长 Feign 链
削峰削峰填谷

相关链接

Spring Cloud

中间件与架构