Canal

Canal 是阿里巴巴开源的 MySQL binlog 增量订阅与消费组件,通过模拟 MySQL Slave 的交互协议,解析 binlog 实现数据变更的实时捕获(CDC,Change Data Capture)。

工作原理

MySQL Master
     │  binlog(Row 模式)
     ▼
Canal Server(伪装成 Slave)
     │  解析 binlog → RowChange 事件
     ▼
Canal Client / MQ(Kafka / RocketMQ)
     │
     ▼
目标系统(Redis / ES / 另一个 MySQL / ...)
  1. Canal Server 向 MySQL 发送 SHOW MASTER STATUS 获取当前 binlog 位点
  2. 发送 COM_BINLOG_DUMP 请求,MySQL 开始推送 binlog
  3. Canal 解析 binlog,封装为 Entry(包含库名、表名、行变更前后的数据)
  4. 客户端消费 Entry 或推送到 MQ

前提:MySQL 必须开启 binlog,且格式为 ROW

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1

核心组件

组件说明
Canal Server解析 binlog,管理 Instance
Canal Client消费 Server 推送的数据变更
Canal AdminWeb 管理界面,管理多个 Server 和 Instance
Canal DeployerServer 的启动包

Instance:Canal 的最小工作单元,对应一个 MySQL 数据源,每个 Instance 独立维护 binlog 位点。

部署模式

单机模式

Canal Server(单节点)
     └── Instance(destination)

适合开发测试或非核心业务。

高可用模式(ZooKeeper)

ZooKeeper(选主 + 位点存储)
     │
┌────┴────┐
Server-1  Server-2(Standby)
     └── Instance
  • 主 Server 故障后,ZooKeeper 触发 Standby 接管
  • binlog 位点存储在 ZooKeeper,保证续读不丢失

快速上手

MySQL 授权

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

Server 配置(conf/example/instance.properties

# MySQL 地址
canal.instance.master.address=127.0.0.1:3306
 
# 账号
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
 
# 订阅的库表(正则)
canal.instance.filter.regex=test\\..*
 
# binlog 起始位点(为空从当前开始)
canal.instance.master.journal.name=
canal.instance.master.position=

Java Client 依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.7</version>
</dependency>
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.7</version>
</dependency>

Java Client 消费示例

CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress("127.0.0.1", 11111),
    "example",   // destination(Instance 名)
    "", ""
);
 
connector.connect();
connector.subscribe("test\\..*");  // 订阅库表
connector.rollback();
 
while (true) {
    Message message = connector.getWithoutAck(100); // 批量拉取
    long batchId = message.getId();
    List<Entry> entries = message.getEntries();
 
    for (Entry entry : entries) {
        if (entry.getEntryType() != EntryType.ROWDATA) continue;
 
        RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
        EventType eventType = rowChange.getEventType(); // INSERT / UPDATE / DELETE
 
        String db = entry.getHeader().getSchemaName();
        String table = entry.getHeader().getTableName();
 
        for (RowData rowData : rowChange.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumns(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumns(rowData.getAfterColumnsList());
            } else {
                // UPDATE:beforeColumns 是修改前,afterColumns 是修改后
                printColumns(rowData.getAfterColumnsList());
            }
        }
    }
 
    connector.ack(batchId);   // 确认消费
    // connector.rollback(batchId); // 回滚重新消费
}
 
private void printColumns(List<Column> columns) {
    columns.forEach(col ->
        System.out.println(col.getName() + " = " + col.getValue()
            + (col.getUpdated() ? " [已变更]" : ""))
    );
}

对接 Kafka(Canal Kafka 模式)

修改 canal.properties

canal.serverMode=kafka
kafka.bootstrap.servers=127.0.0.1:9092

修改 instance.properties

canal.mq.topic=canal-topic
canal.mq.partitionsNum=3
# 按表名 hash 保证同一张表的消息顺序
canal.mq.partitionHash=test.order:id

Kafka 消息格式(JSON):

{
  "database": "test",
  "table": "order",
  "type": "INSERT",
  "ts": 1714000000,
  "data": [{"id": "1", "amount": "100.00"}],
  "old": null
}

常见使用场景

场景说明
缓存同步MySQL 数据变更后自动刷新 Redis
搜索引擎同步同步数据到 Elasticsearch
数据迁移跨库、跨实例的增量迁移
数据审计记录所有数据变更历史
读写分离延迟补偿检测主从同步延迟
异构数据库同步MySQL → MongoDB / ClickHouse

注意事项

  • binlog 必须是 ROW 模式,STATEMENT 模式无法解析具体字段值
  • Canal Server 的 server-id 不能与 MySQL 集群中已有节点冲突
  • 位点(offset)是 Canal 的核心状态,HA 模式下存 ZooKeeper,单机模式下存本地文件(meta.dat
  • 消费端需做幂等处理,网络抖动可能导致重复推送
  • 大事务(批量操作)会产生大量 binlog,注意消费端背压
  • DDL 语句(ALTER TABLE 等)也会产生 Entry,消费端需过滤或兼容处理

Spring Cloud 实战