数据传输
→ 返回 数据库
数据传输指在不同系统、数据库或存储之间搬运数据,涵盖全量迁移、增量同步和实时流式传输三种模式。
传输模式
| 模式 | 说明 | 典型场景 | 延迟 |
|---|---|---|---|
| 全量(Full Load) | 一次性导出全部数据,导入目标 | 初始化迁移、历史数据归档 | 分钟 ~ 小时 |
| 增量(Incremental) | 只传输上次同步后的变更(时间戳/版本号过滤) | T+1 数据仓库 ETL | 分钟 ~ 小时 |
| CDC 实时 | 基于日志捕获变更,毫秒级推送 | 实时数仓、缓存同步 | 毫秒 ~ 秒 |
| 流式(Streaming) | 数据产生即传输,无批次概念 | 埋点事件、IoT 数据 | 毫秒 |
ETL vs ELT
| ETL(Extract-Transform-Load) | ELT(Extract-Load-Transform) | |
|---|---|---|
| 转换位置 | 中间层(ETL 引擎)完成清洗再写入 | 先写入目标,在目标库内用 SQL 转换 |
| 适用场景 | 数仓时代,目标库计算能力弱 | 云原生大数据,目标库(ClickHouse/Snowflake)计算强 |
| 代表工具 | Kettle, Informatica, DataStage | dbt, Spark SQL, BigQuery |
| 优点 | 数据入库前就已清洗 | 充分利用目标库算力,无中间层瓶颈 |
| 缺点 | 中间层成为瓶颈,开发复杂 | 原始脏数据先落库,存储占用大 |
主流工具对比
| 工具 | 类型 | 支持数据源 | 特点 |
|---|---|---|---|
| DataX | 离线批量 | MySQL/PG/Oracle/HDFS/OSS/ES/HBase 等 | 阿里开源,插件化架构,稳定可靠 |
| SeaTunnel | 批流一体 | 100+ 数据源 | Apache 项目,支持 Flink/Spark 引擎,新一代 DataX 替代 |
| Kettle(PDI) | 离线 ETL | 广泛 JDBC + 文件 | 可视化设计,适合复杂转换逻辑,见 Kettle |
| Sqoop | Hadoop ↔ RDBMS | MySQL/Oracle ↔ HDFS/Hive | 老牌 Hadoop 生态工具,功能单一 |
| Kafka Connect | 实时流 | 100+ Connector | 与 Kafka 深度集成,Debezium CDC 在其上运行 |
| Flink | 实时流 | 广泛 Connector | 有状态流处理,支持 CDC Source,见 Flink |
| Canal / Debezium | CDC 实时 | MySQL/PG/Oracle | 日志解析,见 CDC |
| AWS DMS | 云托管迁移 | 主流 RDBMS + NoSQL | 异构数据库迁移,托管免运维 |
DataX 快速上手
DataX 是阿里巴巴开源的离线数据同步框架,以 JSON 配置驱动:
Reader(数据源读取)→ Channel(内存队列)→ Writer(目标写入)
MySQL → MySQL 同步示例
{
"job": {
"setting": {
"speed": {
"channel": 4,
"bps": 10485760
},
"errorLimit": {
"record": 10,
"percentage": 0.05
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"connection": [{
"jdbcUrl": ["jdbc:mysql://source:3306/db?useSSL=false"],
"querySql": ["SELECT id,name,age,updated_at FROM users WHERE updated_at >= '${start_time}'"]
}]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "root",
"writeMode": "replace",
"column": ["id", "name", "age", "updated_at"],
"connection": [{
"jdbcUrl": "jdbc:mysql://target:3306/db?useSSL=false",
"table": ["users"]
}]
}
}
}]
}
}python datax.py job.json -p "-Dstart_time=2026-05-01 00:00:00"MySQL → Elasticsearch
{
"reader": {
"name": "mysqlreader",
"parameter": { "...": "..." }
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://es:9200",
"indexName": "users",
"typeName": "_doc",
"actionType": "index",
"column": [
{ "name": "id", "type": "id" },
{ "name": "name", "type": "text" },
{ "name": "age", "type": "integer" }
]
}
}
}全量 + 增量迁移流程
大表迁移的标准流程,将停机窗口压缩到秒级:
1. 准备阶段
├── 评估数据量与表结构
├── 目标端建表,确认索引与约束
└── 开启 CDC(如 Canal / Debezium),记录起始位点
2. 全量同步(DataX / pg_dump / mysqldump)
├── 限速避免对源库造成过大压力
└── 记录全量同步结束时间 T1
3. 增量追平(CDC 回放 T1 之后的变更)
└── 直到主从延迟 < 阈值(如 1s)
4. 切流窗口(秒级停机)
├── 停止写入源库(或切只读)
├── 等待 CDC 完全消费追平
├── 校验源目行数与 checksum
└── 切换业务连接到目标库
5. 验证
└── 业务监控观察,保留回滚方案 24~72h
性能调优
| 维度 | 手段 |
|---|---|
| 并发通道 | DataX channel 数对应并发 JDBC 连接,按目标库承受力调整 |
| 批提交大小 | Writer batchSize 过小则 RTT 多,过大则事务内存压力大,通常 1000~5000 |
| 读取分片 | 按主键范围或时间戳分片并行读取,避免单线程扫大表 |
| 网络带宽限速 | DataX bps 参数,防止打满网络影响业务 |
| 索引延后创建 | 全量写入完成后再建索引,比边写边维护索引快数倍 |
| 关闭外键检查 | 迁移期间 SET FOREIGN_KEY_CHECKS=0 |
| 调大 bulk 批次 | ES Writer bulkActions=5000,减少 HTTP 请求次数 |
Schema 演化处理
数据传输中源端和目标端 Schema 可能不同步:
| 场景 | 处理方式 |
|---|---|
| 新增字段 | 目标端提前加好可空字段,源端有值时自动同步 |
| 删除字段 | 目标端保留字段(软删除),不影响历史数据 |
| 字段重命名 | 在 ETL/Flink SQL 层做字段映射 |
| 类型变更(兼容) | 如 INT → BIGINT,先改目标端,再改源端 |
| 类型变更(不兼容) | 需停机或蓝绿切换 |
| CDC + Schema 变更 | Debezium Schema Registry + Avro 自动演化 |
数据质量校验
迁移完成后必须校验数据一致性:
-- 行数对比
SELECT COUNT(*) FROM source.users; -- 源
SELECT COUNT(*) FROM target.users; -- 目标
-- 聚合校验(金额类字段)
SELECT SUM(amount), MAX(updated_at) FROM source.orders WHERE ...;
SELECT SUM(amount), MAX(updated_at) FROM target.orders WHERE ...;
-- 抽样 Hash 校验(DataX 自带 checksum 模式)
-- 或使用 pt-table-checksum(Percona Toolkit)
pt-table-checksum --databases=mydb h=source_host