Kafka

消息队列 · 消息队列架构

Apache Kafka 是分布式流式平台(不仅是 MQ):高吞吐、持久化日志、按 offset 可回溯、水平扩展。典型场景:日志/指标采集事件驱动微服务CDC 管道Flink/Spark 数据源流式 ETL

RabbitMQ(灵活路由、低延迟任务队列)、RocketMQ(事务/定时消息)对比选型见下文;Java 接入见 Kafka 集成Spring Boot 消息队列


核心概念

概念说明
Topic消息逻辑分类;生产者写入、消费者订阅
PartitionTopic 的物理分片;并行度与顺序性的基本单位
Offset分区内单调递增序号;消费者用 offset 记录进度
Record一条消息:keyvaluetimestampheaders
Broker存储与服务的节点;集群由多个 Broker 组成
Replica分区副本;Leader 处理读写,Follower 拉取同步
ISRIn-Sync Replicas;与 Leader 落后在阈值内的副本集合
Producer发布者;按 key 哈希或自定义分区器选分区
Consumer Group组内消费者分摊分区;不同组各自独立消费同一 Topic
Controller集群元数据与协调(Leader 选举、分区分配);KRaft 模式下由 Quorum 承担
                    Topic: order-events
          ┌─────────────┬─────────────┬─────────────┐
          │ Partition 0 │ Partition 1 │ Partition 2 │
          │ Leader: B1  │ Leader: B2  │ Leader: B0  │
          │ ISR: B1,B2  │ ISR: B2,B0  │ ISR: B0,B1  │
          └─────────────┴─────────────┴─────────────┘

存储模型(为何能回溯)

每个 Partition 在磁盘上是一个 追加写日志(Log),按段文件 Log Segment 滚动(如 1GB 或 7 天)。

机制说明
顺序写追加写充分利用磁盘顺序 I/O
Page Cache依赖 OS 页缓存,热数据在内存
Zero Copysendfile 减少用户态拷贝(消费侧)
保留策略retention.ms / retention.bytes 过期删除;或 compact 按 key 保留最新
消费进度存在内部 Topic __consumer_offsets(或事务 offset)

Compaction Topic:相同 key 只保留最新值,适合 配置变更、CDC 最新状态


集群元数据:ZooKeeper 与 KRaft

演进背景

早期 Kafka 依赖 Apache ZooKeeper 存储:

  • Broker / Topic / Partition 注册
  • Controller 选举
  • ACL(旧版)
  • 分区 ISR 变更通知

运维需 维护第二套 ZK 集群,且元数据路径与 Kafka 版本升级、大规模分区变更耦合,成为瓶颈。

KRaftKafka Raft Metadata mode,KIP-500)用 内置 Raft 共识日志 管理元数据,Kafka 4.0 起已移除 ZooKeeper,新集群应直接 KRaft-only

架构对比

【ZooKeeper 模式(已淘汰,仅老集群)】
  Brokers ◄──watch/注册──► ZooKeeper Ensemble
       └── Controller(从 Broker 中选举)
 
【KRaft 模式(当前标准)】
  Brokers ◄──元数据快照+日志──► Controller Quorum(Raft)
       └── 其中一名为 Active Controller
对比项ZooKeeper 模式KRaft
外部依赖需独立 ZK 集群无 ZK,仅需 Kafka 进程
元数据存储ZK znode内部 Topic __cluster_metadata(Raft log)
Controller 故障转移依赖 ZK 会话与选举Raft 自动选主,通常更快
分区规模数万分区时 ZK 压力大设计上支持 百万级分区 目标
运维复杂度ZK 调优、版本、配额统一在 Kafka 配置与工具链
新版本4.x 不再支持默认且唯一(4.0+)

KRaft 核心组件

组件作用
Controller Quorum若干节点组成 Raft 组,复制 元数据日志
Active ControllerQuorum 的 Leader,处理元数据变更(建 Topic、分区重分配、Broker 注册)
Broker仍负责消息读写;向 Controller 上报;可 合并部署独立部署
Metadata Log有序记录集群状态变更;定期 快照(Snapshot) 压缩

进程角色process.roles):

配置含义
broker仅消息节点
controller仅元数据仲裁节点(生产建议 3 或 5 台 奇数)
broker,controller开发/小规模合并部署

KRaft 生产配置要点

server.properties(3.7+ / 4.x 示例,具体键名以所用版本文档为准):

# 节点唯一 ID(集群内不可重复)
node.id=1
 
# 角色:生产常拆分 controller 与 broker
process.roles=broker,controller
 
# 本机对外监听
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
advertised.listeners=PLAINTEXT://kafka-1.example.com:9092
controller.listener.names=CONTROLLER
 
# Raft 投票成员:node.id@host:controller端口(全体 controller 节点)
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
 
# 数据目录(日志与元数据)
log.dirs=/var/kafka/data
 
# 格式化存储(仅新集群首次启动前执行一次)
# kafka-storage.sh format -t <cluster-uuid> -c server.properties

集群 UUIDkafka-storage.sh random-uuid 生成;所有节点 format 时必须使用同一 UUID

格式化与启动(KRaft 新集群)

# 1. 生成集群 ID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
 
# 2. 每个节点格式化(仅一次)
bin/kafka-storage.sh format -t "$KAFKA_CLUSTER_ID" -c config/kraft/server.properties
 
# 3. 启动(各节点)
bin/kafka-server-start.sh config/kraft/server.properties

从 ZooKeeper 迁移到 KRaft

老集群升级需按官方 迁移工具链(版本相关,如 kafka-metadata.sh / Migration 流程):

  1. 准备 KRaft Controller Quorum(可与 Broker 滚动升级配合)
  2. 双写或迁移元数据阶段(按目标版本文档操作
  3. 验证 Topic、ACL、Consumer Group 元数据
  4. 下线 ZK

注意:跨大版本跳跃需查 Kafka Upgrade 文档;生产务必 先在预发全量演练

KRaft 运维关注点

说明
Controller 奇数节点容忍 f 台故障需 2f+1 台(3/5)
Controller 与 Broker 资源Controller 节点避免与超高负载 Broker 争磁盘 I/O(大规模建议分离)
元数据延迟大量并发建删 Topic、分区重分配会打 Controller;批量操作需限流
备份备份 log.dirs元数据快照;恢复依赖 cluster id 与 quorum 配置一致
监控Active Controller 身份、MetadataLogEndOffset、Raft 选主次数

为什么快(设计摘要)

设计说明
分区并行多 Partition 多磁盘/多 Broker 并行读写
批量Producer batch.sizelinger.ms;Consumer fetch.min.bytes
压缩`compression.type=lz4
Pull 消费消费者按能力拉取,避免 Broker 推爆慢消费者
页缓存 + 顺序写架构 · Kafka 为什么快

生产者

关键配置

配置说明生产建议
acks0 不等确认;1 Leader 确认;all/-1 ISR 全确认不丢用 all
enable.idempotence幂等(PID + 序列号)true,防重试重复
retries发送失败重试配合幂等
max.in.flight.requests.per.connection未确认在途请求数幂等时 ≤5;严格顺序时设为 1
compression.typelz4 / zstd带宽敏感用 zstd
linger.ms / batch.size凑批延迟与大小吞吐优先可适当增大

分区路由

partition = hash(key) % numPartitions   (默认 Murmur2)
无 key → 轮询(粘性批次内)

同一业务 id 用同一 key → 分区内有序(见下文顺序性)。

Java 示例(原生客户端)

完整 Spring Boot 配置、KafkaTemplate@KafkaListener、重试/DLT、事务与幂等见下文 Java 实战

Properties props = new Properties();
props.put("bootstrap.servers", "kafka-1:9092,kafka-2:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("acks", "all");
props.put("enable.idempotence", "true");
props.put("compression.type", "lz4");
 
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
    ProducerRecord<String, String> record =
        new ProducerRecord<>("order-events", orderId, json);
    // 同步确认关键路径
    RecordMetadata meta = producer.send(record).get();
    // 异步:producer.send(record, (m, ex) -> { ... });
}

事务 Producer(跨分区原子写)

props.put("transactional.id", "order-tx-1");
producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("order-events", ...));
    producer.send(new ProducerRecord<>("outbox", ...));
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

transaction.state.log.replication.factor 等集群侧支持;与 消费端 read_committed事务性 offset 配合实现 EOS。


消费者

消费语义

语义行为实现要点
At most once可能丢先 commit offset 再处理
At least once可能重复先处理再 commit(常见)
Exactly once不丢不重(端到端)幂等消费 + 事务 Producer + read_committed

关键配置

配置说明
group.id同组分摊分区;不同组互不影响
enable.auto.commit自动提交间隔提交 offset
auto.offset.reset无位移时:earliest / latest
max.poll.interval.ms两次 poll 最大间隔,超则踢出组触发 rebalance
session.timeout.ms心跳超时判死
heartbeat.interval.ms通常约为 session 的 1/3
isolation.levelread_committed 过滤未提交事务消息

Java 示例(手动提交)

props.put("group.id", "order-service");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.records", "500");
 
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(List.of("order-events"));
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
        for (ConsumerRecord<String, String> r : records) {
            process(r); // 业务需幂等
        }
        consumer.commitSync(); // 或 commitAsync + 回调处理失败
    }
}

反模式:在 poll 循环内做长时间阻塞(DB、HTTP),应 异步消费 + 限流 或调大 max.poll.interval.ms

静态成员(减少 Rebalance)

group.instance.id 为每个 consumer 实例固定 ID,短暂重启不立即触发大规模分区迁移(KIP-345)。


副本与 ISR

Producer ──acks=all──► Leader (Partition P)

                    replicate

                      Follower ∈ ISR
事件后果
Follower 落后过多踢出 ISR;acks=all 只需 当前 ISR 确认
Leader 宕机Controller 从 ISR 内 选新 Leader;若 ISR 为空可能 丢已确认数据(unclean 选举,unclean.leader.election.enable 生产通常 false
min.insync.replicasacks=all 配合;ISR 数不足时 Producer 报错而非静默丢

消息顺序

范围做法
分区内有序默认保证;单分区内单消费者线程处理
业务键有序相同 key → 同一分区
全局有序Topic 仅 1 个分区(吞吐极差,慎用)
跨分区有序应用层版本号 / 单线程汇总,Kafka 不保证

幂等 + 顺序:同一分区需 max.in.flight.requests.per.connection=1 且启用幂等,或接受重试乱序风险。


Rebalance(再均衡)

触发:消费者入退组、心跳超时、poll 超时、订阅 Topic 变更、分区数变化。

过程:组内消费者 暂停消费 → 重新分配分区 → 恢复。

优化说明
合理 session.timeout.ms / heartbeat.interval.ms避免网络抖动误踢
CooperativeStickyAssignor增量再均衡,减少「停全部分区」
不阻塞 poll耗时逻辑异步化
group.instance.id静态成员,缩短重启抖动
消费者数 ≤ 分区数多出的消费者空闲

Topic 与分区规划

原则说明
分区数决定 最大并行消费者数(同组);过多增加文件句柄与元数据开销
副本因子生产通常 3replication.factor ≤ Broker 数
键设计热点 key 导致 分区倾斜;可加盐再下游汇总
保留时间日志型 7d+;事件溯源可能更长或 compact
跨机房replica.selector.class、机架感知 broker.rack
# 创建 Topic(示例)
kafka-topics.sh --bootstrap-server kafka-1:9092 \
  --create --topic order-events \
  --partitions 12 --replication-factor 3 \
  --config min.insync.replicas=2

运维命令速查

# 集群描述(KRaft 下可见 ClusterId、Controller)
kafka-broker-api-versions.sh --bootstrap-server kafka-1:9092
 
# Topic 列表与详情
kafka-topics.sh --bootstrap-server kafka-1:9092 --list
kafka-topics.sh --bootstrap-server kafka-1:9092 --describe --topic order-events
 
# 消费者组与 Lag(堆积排查核心)
kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 \
  --describe --group order-service
 
# 重置 offset(危险,需停消费或换 group)
kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 \
  --group order-service --topic order-events \
  --reset-offsets --to-datetime 2026-05-01T00:00:00.000 --execute
 
# 生产/消费测试
kafka-console-producer.sh --bootstrap-server kafka-1:9092 --topic test
kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic test --from-beginning

消息堆积排查

Lag = LOG-END-OFFSET − CURRENT-OFFSET(按分区)
  1. 消费者是否存活kafka-consumer-groups.sh --describe,看 STATE、心跳。
  2. 处理是否过慢:CPU、DB、下游 API;看 max.poll.interval.ms 是否触发 rebalance。
  3. 分区倾斜:某分区 Lag 极高 → 检查 key 热点。
  4. 扩容:同组 增加消费者实例(≤ 分区数);或 增加分区(需评估 key 与下游)。
  5. 临时止血:限流生产、降级非核心消费者、延长保留时间防磁盘满。

Java 实战(Spring Boot / 原生客户端)

Java 生态有两条主路径:原生 kafka-clients(细粒度控制)与 spring-kafka(Spring Boot 自动配置 + KafkaTemplate / @KafkaListener)。微服务若需与 Rabbit 切换 Binding,见 Kafka 集成消息驱动;更完整的 Topic/批量/Outbox 示例见 Spring Boot 消息队列 · Kafka

依赖

<!-- Spring Boot 3.x:由 BOM 管理版本 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
 
<!-- 仅原生客户端(无 Spring) -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>
组件用途
KafkaProducer / KafkaConsumerorg.apache.kafka.clients.*底层 API
KafkaTemplateorg.springframework.kafka.core发送封装
@KafkaListenerorg.springframework.kafka.annotation声明式消费
KafkaAdminorg.springframework.kafka.core创建 Topic、集群 Admin

Spring Boot 配置(application.yml

spring:
  kafka:
    bootstrap-servers: kafka-1:9092,kafka-2:9092,kafka-3:9092
    client-id: order-service
 
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 3
      properties:
        enable.idempotence: true
        max.in.flight.requests.per.connection: 5
      compression-type: lz4
      batch-size: 16384
      linger-ms: 5
 
    consumer:
      group-id: order-service
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false
      max-poll-records: 200
      properties:
        isolation.level: read_committed   # 配合事务 Producer
        spring.json.trusted.packages: "com.example.order.*"
        spring.json.value.default.type: com.example.order.OrderEvent
 
    listener:
      ack-mode: manual_immediate
      concurrency: 3                    # ≤ Topic 分区数
      missing-topics-fatal: false
 
    admin:
      auto-create: false                  # 生产建议运维/代码显式建 Topic
listener.ack-mode行为
record每条处理完提交
batch批量监听整批提交
manual / manual_immediate调用 Acknowledgment.acknowledge() 才提交
count / time累积 N 条或 T 秒后提交

编程式创建 Topic

@Configuration
public class KafkaTopicConfig {
 
    @Bean
    public KafkaAdmin kafkaAdmin(KafkaProperties props) {
        return new KafkaAdmin(props.buildAdminProperties(null));
    }
 
    @Bean
    public NewTopic orderEventsTopic() {
        return TopicBuilder.name("order-events")
            .partitions(12)
            .replicas(3)
            .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
            .config(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(7 * 24 * 3600 * 1000L))
            .build();
    }
}

生产者:KafkaTemplate

@Service
@RequiredArgsConstructor
public class OrderEventPublisher {
 
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
 
    /** 异步发送(默认);key = orderId 保证分区内有序 */
    public void publish(OrderEvent event) {
        String key = event.orderId().toString();
        kafkaTemplate.send("order-events", key, event)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    // 记录失败、告警或写入 Outbox 重试
                    throw new RuntimeException(ex);
                }
                var m = result.getRecordMetadata();
                log.info("sent partition={} offset={}", m.partition(), m.offset());
            });
    }
 
    /** 同步发送:下单等关键路径需确认 Broker 已收 */
    public void publishSync(OrderEvent event) throws Exception {
        kafkaTemplate.send("order-events", event.orderId().toString(), event)
            .get(5, TimeUnit.SECONDS);
    }
 
    /** 带 Header(链路追踪、事件类型) */
    public void publishWithHeaders(OrderEvent event) {
        ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
            "order-events",
            event.orderId().toString(),
            event
        );
        record.headers()
            .add("event-type", "ORDER_CREATED".getBytes(StandardCharsets.UTF_8))
            .add("trace-id", MDC.get("traceId").getBytes(StandardCharsets.UTF_8));
        kafkaTemplate.send(record);
    }
}
注意点说明
序列化生产 JSON 时 Consumer 端 JsonDeserializer 需配置 trusted.packages
key 为 null轮询分区,无法保证业务顺序
回调线程whenComplete 在客户端线程执行,避免阻塞过重逻辑

消费者:@KafkaListener

@Component
@Slf4j
@RequiredArgsConstructor
public class OrderEventListener {
 
    private final OrderService orderService;
 
    @KafkaListener(
        topics = "order-events",
        groupId = "order-service",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void onOrder(
            ConsumerRecord<String, OrderEvent> record,
            Acknowledgment ack) {
        try {
            orderService.handle(record.value());
            ack.acknowledge();
        } catch (RetryableException e) {
            // 不 ack → 下次 poll 重投(配合 ErrorHandler 更佳)
            throw e;
        }
    }
 
    /** 从 Header 读取元数据 */
    @KafkaListener(topics = "order-events", groupId = "audit-service")
    public void audit(ConsumerRecord<String, OrderEvent> record, Acknowledgment ack) {
        Header h = record.headers().lastHeader("event-type");
        String type = h != null ? new String(h.value(), StandardCharsets.UTF_8) : "UNKNOWN";
        auditService.log(type, record.value());
        ack.acknowledge();
    }
}

批量消费(提高吞吐,单批内处理需幂等):

@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> batchFactory(
        ConsumerFactory<String, OrderEvent> consumerFactory) {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderEvent>();
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    return factory;
}
 
@KafkaListener(topics = "order-events", groupId = "batch-service",
               containerFactory = "batchFactory")
public void onBatch(List<ConsumerRecord<String, OrderEvent>> records, Acknowledgment ack) {
    orderService.batchHandle(records.stream().map(ConsumerRecord::value).toList());
    ack.acknowledge();
}

重试与死信(Spring Kafka 2.8+)

@Bean
public DefaultErrorHandler kafkaErrorHandler(KafkaTemplate<String, Object> template) {
    // 失败记录发到 order-events.DLT(默认命名:原 Topic + .DLT)
    var recoverer = new DeadLetterPublishingRecoverer(template);
    var handler = new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3L));
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    handler.setRetryListeners((record, ex, deliveryAttempt) ->
        log.warn("retry attempt={} topic={}", deliveryAttempt, record.topic()));
    return handler;
}
 
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory(
        ConsumerFactory<String, OrderEvent> consumerFactory,
        DefaultErrorHandler kafkaErrorHandler) {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderEvent>();
    factory.setConsumerFactory(consumerFactory);
    factory.setCommonErrorHandler(kafkaErrorHandler);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    return factory;
}
消费失败 → 按 BackOff 重试 → 仍失败 → 写入 *.DLT Topic → 人工/巡检消费修复

幂等消费(业务层,Java 必做)

Kafka 默认 at least once,同一条消息可能因 rebalance、重试重复投递。

@Transactional
public void handle(OrderEvent event) {
    // 方案 1:DB 唯一键(推荐)
    if (processedRepo.existsByEventId(event.eventId())) {
        return;
    }
    orderService.apply(event);
    processedRepo.save(new ProcessedEvent(event.eventId()));
 
    // 方案 2:Redis SETNX,带 TTL
    // Boolean ok = redis.setIfAbsent("evt:" + event.eventId(), "1", Duration.ofDays(7));
}
方案优点注意
DB 唯一约束与业务同事务,最稳表需 event_id 唯一索引
Redis SETNX需考虑 Redis 可用性与 key 过期
业务状态机只允许合法状态迁移适合订单等有状态实体

Kafka 事务(多 Topic 原子写)

Producer 开启事务后,多条 send 同一事务提交;Consumer 需 isolation.level=read_committed

@Configuration
@EnableKafka
public class KafkaTxConfig {
 
    @Bean
    public ProducerFactory<String, Object> txProducerFactory(KafkaProperties props) {
        Map<String, Object> map = props.buildProducerProperties(null);
        map.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        map.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-"); // 前缀,每实例唯一
        return new DefaultKafkaProducerFactory<>(map);
    }
 
    @Bean
    public KafkaTransactionManager kafkaTransactionManager(
            ProducerFactory<String, Object> txProducerFactory) {
        return new KafkaTransactionManager(txProducerFactory);
    }
}
 
@Service
@RequiredArgsConstructor
public class OrderTxPublisher {
    private final KafkaTemplate<String, Object> kafkaTemplate;
 
    @Transactional("kafkaTransactionManager")
    public void publishAtomically(OrderEvent order, InventoryEvent inv) {
        kafkaTemplate.send("order-events", order.orderId().toString(), order);
        kafkaTemplate.send("inventory-events", order.orderId().toString(), inv);
    }
}

端到端 EOS(消费 + 生产 + DB)需 事务性消费 + 外部存储协调,复杂度高;业务侧更常用 本地消息表 / Outbox(见 Outbox)。


自定义分区器(Java)

public class UserIdPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        int num = cluster.partitionCountForTopic(topic);
        if (key == null) return ThreadLocalRandom.current().nextInt(num);
        long userId = Long.parseLong(key.toString());
        return (int) (userId % num);
    }
    // partitionFor / close 省略
}
 
// 注册
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, UserIdPartitioner.class.getName());

Spring 中可对 DefaultKafkaProducerFactoryProducerConfig 设置 PARTITIONER_CLASS_CONFIG


原生客户端(非 Spring)要点

适用于 CLI 工具、Flink 以外轻量消费者、或对 poll 循环要完全掌控的场景。

// 消费 + 按分区同步提交(细粒度控制)
for (ConsumerRecord<String, String> r : records) {
    process(r);
}
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(
    new TopicPartition(r.topic(), r.partition()),
    new OffsetAndMetadata(r.offset() + 1)
);
consumer.commitSync(offsets);
与 Spring 对比原生Spring Kafka
配置Properties 手写application.yml + @ConfigurationProperties
消费线程自建线程跑 pollConcurrentMessageListenerContainer
重试/DLT自行实现DefaultErrorHandler
生命周期自行 close()容器随 Spring 启停;见 优雅停机

测试

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = "order-events")
class OrderKafkaIT {
    @Autowired KafkaTemplate<String, OrderEvent> template;
 
    @Test
    void shouldConsume() {
        template.send("order-events", "1", new OrderEvent(...));
        await().atMost(5, SECONDS).until(() -> orderService.processedCount() > 0);
    }
}

集成测试更贴近生产可用 Testcontainers Kafka(见 Spring Boot 测试)。


Java 常见问题

现象原因处理
SerializationExceptionJSON 包名不在 trusted.packages配置 trust 或改用 Avro + Schema Registry
消费重复先 ack 后异常 / rebalance手动 ack + 业务幂等
越消费越慢然后 rebalancepoll 内同步调 DB/HTTP 过久异步化或增大 max.poll.interval.ms
CommitFailedException分区已分配给同组其他实例正常 rebalance;保证处理幂等
发送超时acks=all 但 ISR 不足查 Broker、min.insync.replicas
事务挂死transactional.id 冲突每 JVM 实例唯一 transactionalId
DLT 无消息未配置 DefaultErrorHandler / 异常被吞检查 @KafkaListener 异常传播

CDC / 微服务监听示例

Canal / Debezium 写入 Kafka 后,Spring 消费 JSON binlog:

@KafkaListener(topics = "canal.order_db", groupId = "cache-sync")
public void onBinlog(String payload, Acknowledgment ack) {
    CanalEntry entry = objectMapper.readValue(payload, CanalEntry.class);
    cacheService.applyChange(entry);
    ack.acknowledge();
}

详见 数据同步与 CDCCanal


生态组件(简表)

组件作用
Kafka Connect连接器框架(CDC、ES、JDBC 入湖)
Kafka StreamsJVM 流处理库
ksqlDBSQL 流处理
Schema RegistryAvro/Protobuf/JSON Schema 版本管理
MirrorMaker 2跨集群复制

CDC 场景见 CDC数据同步与 CDC;流处理见 Flink


安全与多租户(概要)

机制说明
SASLPLAIN、SCRAM、GSSAPI(Kerberos)
SSL/TLS链路加密
ACL细粒度 Topic/Group 权限(KRaft 下由 Kafka 自身管理)
配额quota 限制生产/消费字节速率

与 RabbitMQ / RocketMQ 对比

特性KafkaRabbitMQRocketMQ
吞吐量极高中高
延迟ms 级可至 μs 级ms 级
消息回溯按 offset 任意回放消费即删(默认)按时间/位点
路由模型Topic + 分区Exchange 灵活Topic + Tag
事务消息事务 Producer + EOS事务通道原生事务消息
延迟队列需自实现 / 外部TTL + DLX原生定时
元数据KRaft(4.x 标准)内置NameServer
典型场景日志、流、CDC、大数据业务解耦、任务电商、金融、顺序

生产检查清单

  • 集群模式为 KRaft,Controller 至少 3 节点奇数 quorum
  • replication.factor=3min.insync.replicas=2,Producer acks=all + 幂等
  • 消费者 手动提交 或明确语义;业务 幂等
  • Topic 保留与磁盘监控;Lag 告警
  • 禁止 unclean.leader.election 除非明确接受数据丢失风险
  • 跨版本升级与迁移有 回滚方案

相关链接

Spring / Java 实战

同目录