Kafka
Apache Kafka 是分布式流式平台(不仅是 MQ):高吞吐、持久化日志、按 offset 可回溯、水平扩展。典型场景:日志/指标采集、事件驱动微服务、CDC 管道、Flink/Spark 数据源、流式 ETL。
与 RabbitMQ(灵活路由、低延迟任务队列)、RocketMQ(事务/定时消息)对比选型见下文;Java 接入见 Kafka 集成、Spring Boot 消息队列。
核心概念
| 概念 | 说明 |
|---|---|
| Topic | 消息逻辑分类;生产者写入、消费者订阅 |
| Partition | Topic 的物理分片;并行度与顺序性的基本单位 |
| Offset | 分区内单调递增序号;消费者用 offset 记录进度 |
| Record | 一条消息:key、value、timestamp、headers |
| Broker | 存储与服务的节点;集群由多个 Broker 组成 |
| Replica | 分区副本;Leader 处理读写,Follower 拉取同步 |
| ISR | In-Sync Replicas;与 Leader 落后在阈值内的副本集合 |
| Producer | 发布者;按 key 哈希或自定义分区器选分区 |
| Consumer Group | 组内消费者分摊分区;不同组各自独立消费同一 Topic |
| Controller | 集群元数据与协调(Leader 选举、分区分配);KRaft 模式下由 Quorum 承担 |
Topic: order-events
┌─────────────┬─────────────┬─────────────┐
│ Partition 0 │ Partition 1 │ Partition 2 │
│ Leader: B1 │ Leader: B2 │ Leader: B0 │
│ ISR: B1,B2 │ ISR: B2,B0 │ ISR: B0,B1 │
└─────────────┴─────────────┴─────────────┘存储模型(为何能回溯)
每个 Partition 在磁盘上是一个 追加写日志(Log),按段文件 Log Segment 滚动(如 1GB 或 7 天)。
| 机制 | 说明 |
|---|---|
| 顺序写 | 追加写充分利用磁盘顺序 I/O |
| Page Cache | 依赖 OS 页缓存,热数据在内存 |
| Zero Copy | sendfile 减少用户态拷贝(消费侧) |
| 保留策略 | retention.ms / retention.bytes 过期删除;或 compact 按 key 保留最新 |
| 消费进度 | 存在内部 Topic __consumer_offsets(或事务 offset) |
Compaction Topic:相同 key 只保留最新值,适合 配置变更、CDC 最新状态。
集群元数据:ZooKeeper 与 KRaft
演进背景
早期 Kafka 依赖 Apache ZooKeeper 存储:
- Broker / Topic / Partition 注册
- Controller 选举
- ACL(旧版)
- 分区 ISR 变更通知
运维需 维护第二套 ZK 集群,且元数据路径与 Kafka 版本升级、大规模分区变更耦合,成为瓶颈。
KRaft(Kafka Raft Metadata mode,KIP-500)用 内置 Raft 共识日志 管理元数据,Kafka 4.0 起已移除 ZooKeeper,新集群应直接 KRaft-only。
架构对比
【ZooKeeper 模式(已淘汰,仅老集群)】
Brokers ◄──watch/注册──► ZooKeeper Ensemble
└── Controller(从 Broker 中选举)
【KRaft 模式(当前标准)】
Brokers ◄──元数据快照+日志──► Controller Quorum(Raft)
└── 其中一名为 Active Controller| 对比项 | ZooKeeper 模式 | KRaft |
|---|---|---|
| 外部依赖 | 需独立 ZK 集群 | 无 ZK,仅需 Kafka 进程 |
| 元数据存储 | ZK znode | 内部 Topic __cluster_metadata(Raft log) |
| Controller 故障转移 | 依赖 ZK 会话与选举 | Raft 自动选主,通常更快 |
| 分区规模 | 数万分区时 ZK 压力大 | 设计上支持 百万级分区 目标 |
| 运维复杂度 | ZK 调优、版本、配额 | 统一在 Kafka 配置与工具链 |
| 新版本 | 4.x 不再支持 | 默认且唯一(4.0+) |
KRaft 核心组件
| 组件 | 作用 |
|---|---|
| Controller Quorum | 若干节点组成 Raft 组,复制 元数据日志 |
| Active Controller | Quorum 的 Leader,处理元数据变更(建 Topic、分区重分配、Broker 注册) |
| Broker | 仍负责消息读写;向 Controller 上报;可 合并部署 或 独立部署 |
| Metadata Log | 有序记录集群状态变更;定期 快照(Snapshot) 压缩 |
进程角色(process.roles):
| 配置 | 含义 |
|---|---|
broker | 仅消息节点 |
controller | 仅元数据仲裁节点(生产建议 3 或 5 台 奇数) |
broker,controller | 开发/小规模合并部署 |
KRaft 生产配置要点
server.properties(3.7+ / 4.x 示例,具体键名以所用版本文档为准):
# 节点唯一 ID(集群内不可重复)
node.id=1
# 角色:生产常拆分 controller 与 broker
process.roles=broker,controller
# 本机对外监听
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
advertised.listeners=PLAINTEXT://kafka-1.example.com:9092
controller.listener.names=CONTROLLER
# Raft 投票成员:node.id@host:controller端口(全体 controller 节点)
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
# 数据目录(日志与元数据)
log.dirs=/var/kafka/data
# 格式化存储(仅新集群首次启动前执行一次)
# kafka-storage.sh format -t <cluster-uuid> -c server.properties集群 UUID:kafka-storage.sh random-uuid 生成;所有节点 format 时必须使用同一 UUID。
格式化与启动(KRaft 新集群)
# 1. 生成集群 ID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
# 2. 每个节点格式化(仅一次)
bin/kafka-storage.sh format -t "$KAFKA_CLUSTER_ID" -c config/kraft/server.properties
# 3. 启动(各节点)
bin/kafka-server-start.sh config/kraft/server.properties从 ZooKeeper 迁移到 KRaft
老集群升级需按官方 迁移工具链(版本相关,如 kafka-metadata.sh / Migration 流程):
- 准备 KRaft Controller Quorum(可与 Broker 滚动升级配合)
- 双写或迁移元数据阶段(按目标版本文档操作)
- 验证 Topic、ACL、Consumer Group 元数据
- 下线 ZK
注意:跨大版本跳跃需查 Kafka Upgrade 文档;生产务必 先在预发全量演练。
KRaft 运维关注点
| 项 | 说明 |
|---|---|
| Controller 奇数节点 | 容忍 f 台故障需 2f+1 台(3/5) |
| Controller 与 Broker 资源 | Controller 节点避免与超高负载 Broker 争磁盘 I/O(大规模建议分离) |
| 元数据延迟 | 大量并发建删 Topic、分区重分配会打 Controller;批量操作需限流 |
| 备份 | 备份 log.dirs 与 元数据快照;恢复依赖 cluster id 与 quorum 配置一致 |
| 监控 | Active Controller 身份、MetadataLogEndOffset、Raft 选主次数 |
为什么快(设计摘要)
| 设计 | 说明 |
|---|---|
| 分区并行 | 多 Partition 多磁盘/多 Broker 并行读写 |
| 批量 | Producer batch.size、linger.ms;Consumer fetch.min.bytes |
| 压缩 | `compression.type=lz4 |
| Pull 消费 | 消费者按能力拉取,避免 Broker 推爆慢消费者 |
| 页缓存 + 顺序写 | 见 架构 · Kafka 为什么快 |
生产者
关键配置
| 配置 | 说明 | 生产建议 |
|---|---|---|
acks | 0 不等确认;1 Leader 确认;all/-1 ISR 全确认 | 不丢用 all |
enable.idempotence | 幂等(PID + 序列号) | true,防重试重复 |
retries | 发送失败重试 | 配合幂等 |
max.in.flight.requests.per.connection | 未确认在途请求数 | 幂等时 ≤5;严格顺序时设为 1 |
compression.type | lz4 / zstd | 带宽敏感用 zstd |
linger.ms / batch.size | 凑批延迟与大小 | 吞吐优先可适当增大 |
分区路由
partition = hash(key) % numPartitions (默认 Murmur2)
无 key → 轮询(粘性批次内)同一业务 id 用同一 key → 分区内有序(见下文顺序性)。
Java 示例(原生客户端)
完整 Spring Boot 配置、KafkaTemplate、@KafkaListener、重试/DLT、事务与幂等见下文 Java 实战。
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-1:9092,kafka-2:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("acks", "all");
props.put("enable.idempotence", "true");
props.put("compression.type", "lz4");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record =
new ProducerRecord<>("order-events", orderId, json);
// 同步确认关键路径
RecordMetadata meta = producer.send(record).get();
// 异步:producer.send(record, (m, ex) -> { ... });
}事务 Producer(跨分区原子写)
props.put("transactional.id", "order-tx-1");
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("order-events", ...));
producer.send(new ProducerRecord<>("outbox", ...));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}需 transaction.state.log.replication.factor 等集群侧支持;与 消费端 read_committed、事务性 offset 配合实现 EOS。
消费者
消费语义
| 语义 | 行为 | 实现要点 |
|---|---|---|
| At most once | 可能丢 | 先 commit offset 再处理 |
| At least once | 可能重复 | 先处理再 commit(常见) |
| Exactly once | 不丢不重(端到端) | 幂等消费 + 事务 Producer + read_committed |
关键配置
| 配置 | 说明 |
|---|---|
group.id | 同组分摊分区;不同组互不影响 |
enable.auto.commit | 自动提交间隔提交 offset |
auto.offset.reset | 无位移时:earliest / latest |
max.poll.interval.ms | 两次 poll 最大间隔,超则踢出组触发 rebalance |
session.timeout.ms | 心跳超时判死 |
heartbeat.interval.ms | 通常约为 session 的 1/3 |
isolation.level | read_committed 过滤未提交事务消息 |
Java 示例(手动提交)
props.put("group.id", "order-service");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.records", "500");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(List.of("order-events"));
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> r : records) {
process(r); // 业务需幂等
}
consumer.commitSync(); // 或 commitAsync + 回调处理失败
}
}反模式:在 poll 循环内做长时间阻塞(DB、HTTP),应 异步消费 + 限流 或调大 max.poll.interval.ms。
静态成员(减少 Rebalance)
group.instance.id 为每个 consumer 实例固定 ID,短暂重启不立即触发大规模分区迁移(KIP-345)。
副本与 ISR
Producer ──acks=all──► Leader (Partition P)
│
replicate
▼
Follower ∈ ISR| 事件 | 后果 |
|---|---|
| Follower 落后过多 | 踢出 ISR;acks=all 只需 当前 ISR 确认 |
| Leader 宕机 | Controller 从 ISR 内 选新 Leader;若 ISR 为空可能 丢已确认数据(unclean 选举,unclean.leader.election.enable 生产通常 false) |
min.insync.replicas | 与 acks=all 配合;ISR 数不足时 Producer 报错而非静默丢 |
消息顺序
| 范围 | 做法 |
|---|---|
| 分区内有序 | 默认保证;单分区内单消费者线程处理 |
| 业务键有序 | 相同 key → 同一分区 |
| 全局有序 | Topic 仅 1 个分区(吞吐极差,慎用) |
| 跨分区有序 | 应用层版本号 / 单线程汇总,Kafka 不保证 |
幂等 + 顺序:同一分区需 max.in.flight.requests.per.connection=1 且启用幂等,或接受重试乱序风险。
Rebalance(再均衡)
触发:消费者入退组、心跳超时、poll 超时、订阅 Topic 变更、分区数变化。
过程:组内消费者 暂停消费 → 重新分配分区 → 恢复。
| 优化 | 说明 |
|---|---|
合理 session.timeout.ms / heartbeat.interval.ms | 避免网络抖动误踢 |
| CooperativeStickyAssignor | 增量再均衡,减少「停全部分区」 |
不阻塞 poll | 耗时逻辑异步化 |
group.instance.id | 静态成员,缩短重启抖动 |
| 消费者数 ≤ 分区数 | 多出的消费者空闲 |
Topic 与分区规划
| 原则 | 说明 |
|---|---|
| 分区数 | 决定 最大并行消费者数(同组);过多增加文件句柄与元数据开销 |
| 副本因子 | 生产通常 3;replication.factor ≤ Broker 数 |
| 键设计 | 热点 key 导致 分区倾斜;可加盐再下游汇总 |
| 保留时间 | 日志型 7d+;事件溯源可能更长或 compact |
| 跨机房 | replica.selector.class、机架感知 broker.rack |
# 创建 Topic(示例)
kafka-topics.sh --bootstrap-server kafka-1:9092 \
--create --topic order-events \
--partitions 12 --replication-factor 3 \
--config min.insync.replicas=2运维命令速查
# 集群描述(KRaft 下可见 ClusterId、Controller)
kafka-broker-api-versions.sh --bootstrap-server kafka-1:9092
# Topic 列表与详情
kafka-topics.sh --bootstrap-server kafka-1:9092 --list
kafka-topics.sh --bootstrap-server kafka-1:9092 --describe --topic order-events
# 消费者组与 Lag(堆积排查核心)
kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 \
--describe --group order-service
# 重置 offset(危险,需停消费或换 group)
kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 \
--group order-service --topic order-events \
--reset-offsets --to-datetime 2026-05-01T00:00:00.000 --execute
# 生产/消费测试
kafka-console-producer.sh --bootstrap-server kafka-1:9092 --topic test
kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic test --from-beginning消息堆积排查
Lag = LOG-END-OFFSET − CURRENT-OFFSET(按分区)- 消费者是否存活:
kafka-consumer-groups.sh --describe,看STATE、心跳。 - 处理是否过慢:CPU、DB、下游 API;看
max.poll.interval.ms是否触发 rebalance。 - 分区倾斜:某分区 Lag 极高 → 检查 key 热点。
- 扩容:同组 增加消费者实例(≤ 分区数);或 增加分区(需评估 key 与下游)。
- 临时止血:限流生产、降级非核心消费者、延长保留时间防磁盘满。
Java 实战(Spring Boot / 原生客户端)
Java 生态有两条主路径:原生 kafka-clients(细粒度控制)与 spring-kafka(Spring Boot 自动配置 + KafkaTemplate / @KafkaListener)。微服务若需与 Rabbit 切换 Binding,见 Kafka 集成、消息驱动;更完整的 Topic/批量/Outbox 示例见 Spring Boot 消息队列 · Kafka。
依赖
<!-- Spring Boot 3.x:由 BOM 管理版本 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 仅原生客户端(无 Spring) -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>| 组件 | 包 | 用途 |
|---|---|---|
KafkaProducer / KafkaConsumer | org.apache.kafka.clients.* | 底层 API |
KafkaTemplate | org.springframework.kafka.core | 发送封装 |
@KafkaListener | org.springframework.kafka.annotation | 声明式消费 |
KafkaAdmin | org.springframework.kafka.core | 创建 Topic、集群 Admin |
Spring Boot 配置(application.yml)
spring:
kafka:
bootstrap-servers: kafka-1:9092,kafka-2:9092,kafka-3:9092
client-id: order-service
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 5
compression-type: lz4
batch-size: 16384
linger-ms: 5
consumer:
group-id: order-service
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
max-poll-records: 200
properties:
isolation.level: read_committed # 配合事务 Producer
spring.json.trusted.packages: "com.example.order.*"
spring.json.value.default.type: com.example.order.OrderEvent
listener:
ack-mode: manual_immediate
concurrency: 3 # ≤ Topic 分区数
missing-topics-fatal: false
admin:
auto-create: false # 生产建议运维/代码显式建 Topiclistener.ack-mode | 行为 |
|---|---|
record | 每条处理完提交 |
batch | 批量监听整批提交 |
manual / manual_immediate | 调用 Acknowledgment.acknowledge() 才提交 |
count / time | 累积 N 条或 T 秒后提交 |
编程式创建 Topic
@Configuration
public class KafkaTopicConfig {
@Bean
public KafkaAdmin kafkaAdmin(KafkaProperties props) {
return new KafkaAdmin(props.buildAdminProperties(null));
}
@Bean
public NewTopic orderEventsTopic() {
return TopicBuilder.name("order-events")
.partitions(12)
.replicas(3)
.config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
.config(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(7 * 24 * 3600 * 1000L))
.build();
}
}生产者:KafkaTemplate
@Service
@RequiredArgsConstructor
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
/** 异步发送(默认);key = orderId 保证分区内有序 */
public void publish(OrderEvent event) {
String key = event.orderId().toString();
kafkaTemplate.send("order-events", key, event)
.whenComplete((result, ex) -> {
if (ex != null) {
// 记录失败、告警或写入 Outbox 重试
throw new RuntimeException(ex);
}
var m = result.getRecordMetadata();
log.info("sent partition={} offset={}", m.partition(), m.offset());
});
}
/** 同步发送:下单等关键路径需确认 Broker 已收 */
public void publishSync(OrderEvent event) throws Exception {
kafkaTemplate.send("order-events", event.orderId().toString(), event)
.get(5, TimeUnit.SECONDS);
}
/** 带 Header(链路追踪、事件类型) */
public void publishWithHeaders(OrderEvent event) {
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
"order-events",
event.orderId().toString(),
event
);
record.headers()
.add("event-type", "ORDER_CREATED".getBytes(StandardCharsets.UTF_8))
.add("trace-id", MDC.get("traceId").getBytes(StandardCharsets.UTF_8));
kafkaTemplate.send(record);
}
}| 注意点 | 说明 |
|---|---|
| 序列化 | 生产 JSON 时 Consumer 端 JsonDeserializer 需配置 trusted.packages |
| key 为 null | 轮询分区,无法保证业务顺序 |
| 回调线程 | whenComplete 在客户端线程执行,避免阻塞过重逻辑 |
消费者:@KafkaListener
@Component
@Slf4j
@RequiredArgsConstructor
public class OrderEventListener {
private final OrderService orderService;
@KafkaListener(
topics = "order-events",
groupId = "order-service",
containerFactory = "kafkaListenerContainerFactory"
)
public void onOrder(
ConsumerRecord<String, OrderEvent> record,
Acknowledgment ack) {
try {
orderService.handle(record.value());
ack.acknowledge();
} catch (RetryableException e) {
// 不 ack → 下次 poll 重投(配合 ErrorHandler 更佳)
throw e;
}
}
/** 从 Header 读取元数据 */
@KafkaListener(topics = "order-events", groupId = "audit-service")
public void audit(ConsumerRecord<String, OrderEvent> record, Acknowledgment ack) {
Header h = record.headers().lastHeader("event-type");
String type = h != null ? new String(h.value(), StandardCharsets.UTF_8) : "UNKNOWN";
auditService.log(type, record.value());
ack.acknowledge();
}
}批量消费(提高吞吐,单批内处理需幂等):
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> batchFactory(
ConsumerFactory<String, OrderEvent> consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderEvent>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@KafkaListener(topics = "order-events", groupId = "batch-service",
containerFactory = "batchFactory")
public void onBatch(List<ConsumerRecord<String, OrderEvent>> records, Acknowledgment ack) {
orderService.batchHandle(records.stream().map(ConsumerRecord::value).toList());
ack.acknowledge();
}重试与死信(Spring Kafka 2.8+)
@Bean
public DefaultErrorHandler kafkaErrorHandler(KafkaTemplate<String, Object> template) {
// 失败记录发到 order-events.DLT(默认命名:原 Topic + .DLT)
var recoverer = new DeadLetterPublishingRecoverer(template);
var handler = new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3L));
handler.addNotRetryableExceptions(IllegalArgumentException.class);
handler.setRetryListeners((record, ex, deliveryAttempt) ->
log.warn("retry attempt={} topic={}", deliveryAttempt, record.topic()));
return handler;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
DefaultErrorHandler kafkaErrorHandler) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderEvent>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(kafkaErrorHandler);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}消费失败 → 按 BackOff 重试 → 仍失败 → 写入 *.DLT Topic → 人工/巡检消费修复幂等消费(业务层,Java 必做)
Kafka 默认 at least once,同一条消息可能因 rebalance、重试重复投递。
@Transactional
public void handle(OrderEvent event) {
// 方案 1:DB 唯一键(推荐)
if (processedRepo.existsByEventId(event.eventId())) {
return;
}
orderService.apply(event);
processedRepo.save(new ProcessedEvent(event.eventId()));
// 方案 2:Redis SETNX,带 TTL
// Boolean ok = redis.setIfAbsent("evt:" + event.eventId(), "1", Duration.ofDays(7));
}| 方案 | 优点 | 注意 |
|---|---|---|
| DB 唯一约束 | 与业务同事务,最稳 | 表需 event_id 唯一索引 |
| Redis SETNX | 快 | 需考虑 Redis 可用性与 key 过期 |
| 业务状态机 | 只允许合法状态迁移 | 适合订单等有状态实体 |
Kafka 事务(多 Topic 原子写)
Producer 开启事务后,多条 send 同一事务提交;Consumer 需 isolation.level=read_committed。
@Configuration
@EnableKafka
public class KafkaTxConfig {
@Bean
public ProducerFactory<String, Object> txProducerFactory(KafkaProperties props) {
Map<String, Object> map = props.buildProducerProperties(null);
map.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
map.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-"); // 前缀,每实例唯一
return new DefaultKafkaProducerFactory<>(map);
}
@Bean
public KafkaTransactionManager kafkaTransactionManager(
ProducerFactory<String, Object> txProducerFactory) {
return new KafkaTransactionManager(txProducerFactory);
}
}
@Service
@RequiredArgsConstructor
public class OrderTxPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
@Transactional("kafkaTransactionManager")
public void publishAtomically(OrderEvent order, InventoryEvent inv) {
kafkaTemplate.send("order-events", order.orderId().toString(), order);
kafkaTemplate.send("inventory-events", order.orderId().toString(), inv);
}
}端到端 EOS(消费 + 生产 + DB)需 事务性消费 + 外部存储协调,复杂度高;业务侧更常用 本地消息表 / Outbox(见 Outbox)。
自定义分区器(Java)
public class UserIdPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int num = cluster.partitionCountForTopic(topic);
if (key == null) return ThreadLocalRandom.current().nextInt(num);
long userId = Long.parseLong(key.toString());
return (int) (userId % num);
}
// partitionFor / close 省略
}
// 注册
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, UserIdPartitioner.class.getName());Spring 中可对 DefaultKafkaProducerFactory 的 ProducerConfig 设置 PARTITIONER_CLASS_CONFIG。
原生客户端(非 Spring)要点
适用于 CLI 工具、Flink 以外轻量消费者、或对 poll 循环要完全掌控的场景。
// 消费 + 按分区同步提交(细粒度控制)
for (ConsumerRecord<String, String> r : records) {
process(r);
}
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(
new TopicPartition(r.topic(), r.partition()),
new OffsetAndMetadata(r.offset() + 1)
);
consumer.commitSync(offsets);| 与 Spring 对比 | 原生 | Spring Kafka |
|---|---|---|
| 配置 | Properties 手写 | application.yml + @ConfigurationProperties |
| 消费线程 | 自建线程跑 poll | ConcurrentMessageListenerContainer |
| 重试/DLT | 自行实现 | DefaultErrorHandler |
| 生命周期 | 自行 close() | 容器随 Spring 启停;见 优雅停机 |
测试
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = "order-events")
class OrderKafkaIT {
@Autowired KafkaTemplate<String, OrderEvent> template;
@Test
void shouldConsume() {
template.send("order-events", "1", new OrderEvent(...));
await().atMost(5, SECONDS).until(() -> orderService.processedCount() > 0);
}
}集成测试更贴近生产可用 Testcontainers Kafka(见 Spring Boot 测试)。
Java 常见问题
| 现象 | 原因 | 处理 |
|---|---|---|
SerializationException | JSON 包名不在 trusted.packages | 配置 trust 或改用 Avro + Schema Registry |
| 消费重复 | 先 ack 后异常 / rebalance | 手动 ack + 业务幂等 |
| 越消费越慢然后 rebalance | poll 内同步调 DB/HTTP 过久 | 异步化或增大 max.poll.interval.ms |
CommitFailedException | 分区已分配给同组其他实例 | 正常 rebalance;保证处理幂等 |
| 发送超时 | acks=all 但 ISR 不足 | 查 Broker、min.insync.replicas |
| 事务挂死 | transactional.id 冲突 | 每 JVM 实例唯一 transactionalId |
| DLT 无消息 | 未配置 DefaultErrorHandler / 异常被吞 | 检查 @KafkaListener 异常传播 |
CDC / 微服务监听示例
Canal / Debezium 写入 Kafka 后,Spring 消费 JSON binlog:
@KafkaListener(topics = "canal.order_db", groupId = "cache-sync")
public void onBinlog(String payload, Acknowledgment ack) {
CanalEntry entry = objectMapper.readValue(payload, CanalEntry.class);
cacheService.applyChange(entry);
ack.acknowledge();
}生态组件(简表)
| 组件 | 作用 |
|---|---|
| Kafka Connect | 连接器框架(CDC、ES、JDBC 入湖) |
| Kafka Streams | JVM 流处理库 |
| ksqlDB | SQL 流处理 |
| Schema Registry | Avro/Protobuf/JSON Schema 版本管理 |
| MirrorMaker 2 | 跨集群复制 |
CDC 场景见 CDC、数据同步与 CDC;流处理见 Flink。
安全与多租户(概要)
| 机制 | 说明 |
|---|---|
| SASL | PLAIN、SCRAM、GSSAPI(Kerberos) |
| SSL/TLS | 链路加密 |
| ACL | 细粒度 Topic/Group 权限(KRaft 下由 Kafka 自身管理) |
| 配额 | quota 限制生产/消费字节速率 |
与 RabbitMQ / RocketMQ 对比
| 特性 | Kafka | RabbitMQ | RocketMQ |
|---|---|---|---|
| 吞吐量 | 极高 | 中高 | 高 |
| 延迟 | ms 级 | 可至 μs 级 | ms 级 |
| 消息回溯 | 按 offset 任意回放 | 消费即删(默认) | 按时间/位点 |
| 路由模型 | Topic + 分区 | Exchange 灵活 | Topic + Tag |
| 事务消息 | 事务 Producer + EOS | 事务通道 | 原生事务消息 |
| 延迟队列 | 需自实现 / 外部 | TTL + DLX | 原生定时 |
| 元数据 | KRaft(4.x 标准) | 内置 | NameServer |
| 典型场景 | 日志、流、CDC、大数据 | 业务解耦、任务 | 电商、金融、顺序 |
生产检查清单
- 集群模式为 KRaft,Controller 至少 3 节点奇数 quorum
-
replication.factor=3,min.insync.replicas=2,Produceracks=all+ 幂等 - 消费者 手动提交 或明确语义;业务 幂等
- Topic 保留与磁盘监控;Lag 告警
- 禁止
unclean.leader.election除非明确接受数据丢失风险 - 跨版本升级与迁移有 回滚方案
相关链接
Spring / Java 实战
- 本文 Java 实战章节(
KafkaTemplate、监听、DLT、事务、测试) - Kafka 集成
- 消息驱动
- 数据同步与 CDC
- Spring Boot 消息队列
- 优雅停机 · Kafka
- Spring Cloud 总览