CDC(Change Data Capture)

返回 分布式

CDC 是一种捕获数据库数据变更事件(INSERT / UPDATE / DELETE)并将其实时流向下游系统的技术。相比全量同步,CDC 只传输增量变更,延迟低、资源消耗小。


三种实现方式

方式原理优点缺点
基于日志(推荐)解析数据库事务日志(binlog / WAL / redo log)对源库零侵入,延迟毫秒级,精确捕获所有变更需要日志级别配置,对 DDL 变更处理复杂
基于触发器对每张表创建 INSERT/UPDATE/DELETE 触发器写入影子表无需特殊权限源库性能损耗大,DDL 同步困难,维护成本高
基于查询定期 SELECT WHERE updated_at > last_sync实现简单需要时间戳/版本号字段,无法捕获 DELETE,延迟高

主流 CDC 工具对比

工具支持数据库日志解析输出目标特点
CanalMySQLbinlogKafka / RocketMQ / TCP阿里开源,Java 生态友好,见 Canal
DebeziumMySQL / PG / Oracle / MongoDB / SQL Serverbinlog / WALKafka(Kafka Connect)Apache 顶级项目,多数据库支持最全
MaxwellMySQLbinlogKafka / Redis / Kinesis / STDOUT轻量,JSON 输出,适合简单场景
Flink CDCMySQL / PG / Oracle / MongoDB 等直接嵌入 Flink 作业Flink 下游算子无需 Kafka 中转,全程流式处理
Debezium + Flink同上Debezium Source ConnectorFlink SQL / DataStream解耦、灵活,生产常用

Debezium 快速上手(MySQL → Kafka)

前提:MySQL 开启 binlog ROW 模式

[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL    # 确保 UPDATE 包含修改前后所有字段

启动 Debezium Connector(Kafka Connect 配置)

{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "topic.prefix": "myapp",
    "database.include.list": "inventory",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schemahistory.myapp"
  }
}

消息格式(Debezium Kafka 消息体)

{
  "before": { "id": 1, "name": "Alice", "email": "alice@old.com" },
  "after":  { "id": 1, "name": "Alice", "email": "alice@new.com" },
  "source": { "db": "inventory", "table": "users", "ts_ms": 1716000000000 },
  "op": "u",      // c=INSERT, u=UPDATE, d=DELETE, r=READ(快照)
  "ts_ms": 1716000000100
}

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>3.1.0</version>
</dependency>
-- 创建 MySQL CDC 源表
CREATE TABLE orders_source (
  id         BIGINT,
  user_id    BIGINT,
  amount     DECIMAL(10, 2),
  status     STRING,
  created_at TIMESTAMP(3),
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector'           = 'mysql-cdc',
  'hostname'            = 'mysql',
  'port'                = '3306',
  'username'            = 'root',
  'password'            = 'root',
  'database-name'       = 'shop',
  'table-name'          = 'orders',
  'server-id'           = '5401-5404',   -- 范围 server-id 用于并行读取
  'scan.startup.mode'   = 'initial'      -- initial=先全量快照再增量
);
 
-- 实时写入 Kafka
CREATE TABLE orders_kafka (
  id      BIGINT,
  user_id BIGINT,
  amount  DECIMAL(10, 2),
  status  STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector'                = 'upsert-kafka',
  'topic'                    = 'orders',
  'properties.bootstrap.servers' = 'kafka:9092',
  'key.format'               = 'json',
  'value.format'             = 'json'
);
 
INSERT INTO orders_kafka SELECT id, user_id, amount, status FROM orders_source;

消费端幂等处理

CDC 消息在网络重传或消费者重启时可能重复投递,消费端必须保证幂等:

方案说明
主键 UPSERT目标库用 INSERT ... ON DUPLICATE KEY UPDATE
Redis SET NX消息ID 做去重 key,TTL 覆盖处理窗口
版本号/时间戳只处理版本号更大的变更,忽略旧版本消息
幂等写入 ES使用文档 _id = 源表主键,天然覆盖写

事件顺序保证

同一行的多次变更必须按顺序处理,否则会出现数据错乱:

  • Kafka 分区键:以源表主键(或业务 ID)作为 Kafka 消息 key,保证同一行变更落在同一分区
  • Canal 分区配置canal.mq.partitionHash=db.table:id
  • 跨表依赖:如果下游需要多张表的事件按事务顺序处理,需要在同一分区或使用事务 ID 排序

典型使用场景

场景说明
缓存同步数据库变更 → 刷新 Redis,保持缓存与 DB 一致
搜索同步数据库变更 → Elasticsearch 增量更新,无需全量重建索引
异构数据库同步MySQL → ClickHouse / MongoDB / TiDB
微服务数据解耦替代双写,用 CDC 保证跨服务数据最终一致性
数据审计所有变更写入审计日志,支持数据回溯
实时数据仓库ODS → DWD 增量 ETL,替代 T+1 全量同步
数据迁移存量全量 + 增量 CDC 平滑迁移,停机窗口趋近于零

注意事项

  • DDL 变更(ALTER TABLE)可能导致 CDC 流中断,需要提前演练 Schema 演化策略
  • 大事务(如 DELETE 百万行)会产生巨量 binlog,注意背压和内存压力
  • Debezium 初次启动会做全量快照,期间对源库有读负载
  • binlog 保留时长(expire_logs_days)需足够长,防止 CDC 断连后日志已被清除

相关

  • Canal — 阿里开源 MySQL CDC 工具详解
  • Flink — Flink CDC 作为 Flink 内置 Source
  • 消息队列 — Kafka 是 CDC 事件流的常用承载层

Spring Cloud 实战