Canal
Canal 是阿里巴巴开源的 MySQL binlog 增量订阅与消费组件,通过模拟 MySQL Slave 的交互协议,解析 binlog 实现数据变更的实时捕获(CDC,Change Data Capture)。
工作原理
MySQL Master
│ binlog(Row 模式)
▼
Canal Server(伪装成 Slave)
│ 解析 binlog → RowChange 事件
▼
Canal Client / MQ(Kafka / RocketMQ)
│
▼
目标系统(Redis / ES / 另一个 MySQL / ...)
- Canal Server 向 MySQL 发送
SHOW MASTER STATUS获取当前 binlog 位点 - 发送
COM_BINLOG_DUMP请求,MySQL 开始推送 binlog - Canal 解析 binlog,封装为
Entry(包含库名、表名、行变更前后的数据) - 客户端消费
Entry或推送到 MQ
前提:MySQL 必须开启 binlog,且格式为 ROW:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1核心组件
| 组件 | 说明 |
|---|---|
| Canal Server | 解析 binlog,管理 Instance |
| Canal Client | 消费 Server 推送的数据变更 |
| Canal Admin | Web 管理界面,管理多个 Server 和 Instance |
| Canal Deployer | Server 的启动包 |
Instance:Canal 的最小工作单元,对应一个 MySQL 数据源,每个 Instance 独立维护 binlog 位点。
部署模式
单机模式
Canal Server(单节点)
└── Instance(destination)
适合开发测试或非核心业务。
高可用模式(ZooKeeper)
ZooKeeper(选主 + 位点存储)
│
┌────┴────┐
Server-1 Server-2(Standby)
└── Instance
- 主 Server 故障后,ZooKeeper 触发 Standby 接管
- binlog 位点存储在 ZooKeeper,保证续读不丢失
快速上手
MySQL 授权
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;Server 配置(conf/example/instance.properties)
# MySQL 地址
canal.instance.master.address=127.0.0.1:3306
# 账号
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 订阅的库表(正则)
canal.instance.filter.regex=test\\..*
# binlog 起始位点(为空从当前开始)
canal.instance.master.journal.name=
canal.instance.master.position=Java Client 依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.7</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.7</version>
</dependency>Java Client 消费示例
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example", // destination(Instance 名)
"", ""
);
connector.connect();
connector.subscribe("test\\..*"); // 订阅库表
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100); // 批量拉取
long batchId = message.getId();
List<Entry> entries = message.getEntries();
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) continue;
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType(); // INSERT / UPDATE / DELETE
String db = entry.getHeader().getSchemaName();
String table = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumns(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumns(rowData.getAfterColumnsList());
} else {
// UPDATE:beforeColumns 是修改前,afterColumns 是修改后
printColumns(rowData.getAfterColumnsList());
}
}
}
connector.ack(batchId); // 确认消费
// connector.rollback(batchId); // 回滚重新消费
}
private void printColumns(List<Column> columns) {
columns.forEach(col ->
System.out.println(col.getName() + " = " + col.getValue()
+ (col.getUpdated() ? " [已变更]" : ""))
);
}对接 Kafka(Canal Kafka 模式)
修改 canal.properties:
canal.serverMode=kafka
kafka.bootstrap.servers=127.0.0.1:9092修改 instance.properties:
canal.mq.topic=canal-topic
canal.mq.partitionsNum=3
# 按表名 hash 保证同一张表的消息顺序
canal.mq.partitionHash=test.order:idKafka 消息格式(JSON):
{
"database": "test",
"table": "order",
"type": "INSERT",
"ts": 1714000000,
"data": [{"id": "1", "amount": "100.00"}],
"old": null
}常见使用场景
| 场景 | 说明 |
|---|---|
| 缓存同步 | MySQL 数据变更后自动刷新 Redis |
| 搜索引擎同步 | 同步数据到 Elasticsearch |
| 数据迁移 | 跨库、跨实例的增量迁移 |
| 数据审计 | 记录所有数据变更历史 |
| 读写分离延迟补偿 | 检测主从同步延迟 |
| 异构数据库同步 | MySQL → MongoDB / ClickHouse |
注意事项
- binlog 必须是
ROW模式,STATEMENT模式无法解析具体字段值 - Canal Server 的
server-id不能与 MySQL 集群中已有节点冲突 - 位点(offset)是 Canal 的核心状态,HA 模式下存 ZooKeeper,单机模式下存本地文件(
meta.dat) - 消费端需做幂等处理,网络抖动可能导致重复推送
- 大事务(批量操作)会产生大量 binlog,注意消费端背压
- DDL 语句(ALTER TABLE 等)也会产生 Entry,消费端需过滤或兼容处理
Spring Cloud 实战
- 数据同步与 CDC
- Kafka 集成(消费 Canal Topic)
- Spring Cloud 总览
- CDC 总览