CDC(Change Data Capture)
→ 返回 分布式
CDC 是一种捕获数据库数据变更事件(INSERT / UPDATE / DELETE)并将其实时流向下游系统的技术。相比全量同步,CDC 只传输增量变更,延迟低、资源消耗小。
三种实现方式
| 方式 | 原理 | 优点 | 缺点 |
|---|---|---|---|
| 基于日志(推荐) | 解析数据库事务日志(binlog / WAL / redo log) | 对源库零侵入,延迟毫秒级,精确捕获所有变更 | 需要日志级别配置,对 DDL 变更处理复杂 |
| 基于触发器 | 对每张表创建 INSERT/UPDATE/DELETE 触发器写入影子表 | 无需特殊权限 | 源库性能损耗大,DDL 同步困难,维护成本高 |
| 基于查询 | 定期 SELECT WHERE updated_at > last_sync | 实现简单 | 需要时间戳/版本号字段,无法捕获 DELETE,延迟高 |
主流 CDC 工具对比
| 工具 | 支持数据库 | 日志解析 | 输出目标 | 特点 |
|---|---|---|---|---|
| Canal | MySQL | binlog | Kafka / RocketMQ / TCP | 阿里开源,Java 生态友好,见 Canal |
| Debezium | MySQL / PG / Oracle / MongoDB / SQL Server | binlog / WAL | Kafka(Kafka Connect) | Apache 顶级项目,多数据库支持最全 |
| Maxwell | MySQL | binlog | Kafka / Redis / Kinesis / STDOUT | 轻量,JSON 输出,适合简单场景 |
| Flink CDC | MySQL / PG / Oracle / MongoDB 等 | 直接嵌入 Flink 作业 | Flink 下游算子 | 无需 Kafka 中转,全程流式处理 |
| Debezium + Flink | 同上 | Debezium Source Connector | Flink SQL / DataStream | 解耦、灵活,生产常用 |
Debezium 快速上手(MySQL → Kafka)
前提:MySQL 开启 binlog ROW 模式
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL # 确保 UPDATE 包含修改前后所有字段启动 Debezium Connector(Kafka Connect 配置)
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"topic.prefix": "myapp",
"database.include.list": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schemahistory.myapp"
}
}消息格式(Debezium Kafka 消息体)
{
"before": { "id": 1, "name": "Alice", "email": "alice@old.com" },
"after": { "id": 1, "name": "Alice", "email": "alice@new.com" },
"source": { "db": "inventory", "table": "users", "ts_ms": 1716000000000 },
"op": "u", // c=INSERT, u=UPDATE, d=DELETE, r=READ(快照)
"ts_ms": 1716000000100
}Flink CDC 直连(无 Kafka 中转)
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>3.1.0</version>
</dependency>Flink SQL 方式
-- 创建 MySQL CDC 源表
CREATE TABLE orders_source (
id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
status STRING,
created_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'shop',
'table-name' = 'orders',
'server-id' = '5401-5404', -- 范围 server-id 用于并行读取
'scan.startup.mode' = 'initial' -- initial=先全量快照再增量
);
-- 实时写入 Kafka
CREATE TABLE orders_kafka (
id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
status STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'key.format' = 'json',
'value.format' = 'json'
);
INSERT INTO orders_kafka SELECT id, user_id, amount, status FROM orders_source;消费端幂等处理
CDC 消息在网络重传或消费者重启时可能重复投递,消费端必须保证幂等:
| 方案 | 说明 |
|---|---|
| 主键 UPSERT | 目标库用 INSERT ... ON DUPLICATE KEY UPDATE |
| Redis SET NX | 用 消息ID 做去重 key,TTL 覆盖处理窗口 |
| 版本号/时间戳 | 只处理版本号更大的变更,忽略旧版本消息 |
| 幂等写入 ES | 使用文档 _id = 源表主键,天然覆盖写 |
事件顺序保证
同一行的多次变更必须按顺序处理,否则会出现数据错乱:
- Kafka 分区键:以源表主键(或业务 ID)作为 Kafka 消息 key,保证同一行变更落在同一分区
- Canal 分区配置:
canal.mq.partitionHash=db.table:id - 跨表依赖:如果下游需要多张表的事件按事务顺序处理,需要在同一分区或使用事务 ID 排序
典型使用场景
| 场景 | 说明 |
|---|---|
| 缓存同步 | 数据库变更 → 刷新 Redis,保持缓存与 DB 一致 |
| 搜索同步 | 数据库变更 → Elasticsearch 增量更新,无需全量重建索引 |
| 异构数据库同步 | MySQL → ClickHouse / MongoDB / TiDB |
| 微服务数据解耦 | 替代双写,用 CDC 保证跨服务数据最终一致性 |
| 数据审计 | 所有变更写入审计日志,支持数据回溯 |
| 实时数据仓库 | ODS → DWD 增量 ETL,替代 T+1 全量同步 |
| 数据迁移 | 存量全量 + 增量 CDC 平滑迁移,停机窗口趋近于零 |
注意事项
- DDL 变更(
ALTER TABLE)可能导致 CDC 流中断,需要提前演练 Schema 演化策略 - 大事务(如
DELETE 百万行)会产生巨量 binlog,注意背压和内存压力 - Debezium 初次启动会做全量快照,期间对源库有读负载
- binlog 保留时长(
expire_logs_days)需足够长,防止 CDC 断连后日志已被清除