数据传输

返回 数据库

数据传输指在不同系统、数据库或存储之间搬运数据,涵盖全量迁移、增量同步和实时流式传输三种模式。


传输模式

模式说明典型场景延迟
全量(Full Load)一次性导出全部数据,导入目标初始化迁移、历史数据归档分钟 ~ 小时
增量(Incremental)只传输上次同步后的变更(时间戳/版本号过滤)T+1 数据仓库 ETL分钟 ~ 小时
CDC 实时基于日志捕获变更,毫秒级推送实时数仓、缓存同步毫秒 ~ 秒
流式(Streaming)数据产生即传输,无批次概念埋点事件、IoT 数据毫秒

ETL vs ELT

ETL(Extract-Transform-Load)ELT(Extract-Load-Transform)
转换位置中间层(ETL 引擎)完成清洗再写入先写入目标,在目标库内用 SQL 转换
适用场景数仓时代,目标库计算能力弱云原生大数据,目标库(ClickHouse/Snowflake)计算强
代表工具Kettle, Informatica, DataStagedbt, Spark SQL, BigQuery
优点数据入库前就已清洗充分利用目标库算力,无中间层瓶颈
缺点中间层成为瓶颈,开发复杂原始脏数据先落库,存储占用大

主流工具对比

工具类型支持数据源特点
DataX离线批量MySQL/PG/Oracle/HDFS/OSS/ES/HBase 等阿里开源,插件化架构,稳定可靠
SeaTunnel批流一体100+ 数据源Apache 项目,支持 Flink/Spark 引擎,新一代 DataX 替代
Kettle(PDI)离线 ETL广泛 JDBC + 文件可视化设计,适合复杂转换逻辑,见 Kettle
SqoopHadoop ↔ RDBMSMySQL/Oracle ↔ HDFS/Hive老牌 Hadoop 生态工具,功能单一
Kafka Connect实时流100+ Connector与 Kafka 深度集成,Debezium CDC 在其上运行
Flink实时流广泛 Connector有状态流处理,支持 CDC Source,见 Flink
Canal / DebeziumCDC 实时MySQL/PG/Oracle日志解析,见 CDC
AWS DMS云托管迁移主流 RDBMS + NoSQL异构数据库迁移,托管免运维

DataX 快速上手

DataX 是阿里巴巴开源的离线数据同步框架,以 JSON 配置驱动:

Reader(数据源读取)→ Channel(内存队列)→ Writer(目标写入)

MySQL → MySQL 同步示例

{
  "job": {
    "setting": {
      "speed": {
        "channel": 4,         
        "bps": 10485760       
      },
      "errorLimit": {
        "record": 10,         
        "percentage": 0.05    
      }
    },
    "content": [{
      "reader": {
        "name": "mysqlreader",
        "parameter": {
          "username": "root",
          "password": "root",
          "connection": [{
            "jdbcUrl": ["jdbc:mysql://source:3306/db?useSSL=false"],
            "querySql": ["SELECT id,name,age,updated_at FROM users WHERE updated_at >= '${start_time}'"]
          }]
        }
      },
      "writer": {
        "name": "mysqlwriter",
        "parameter": {
          "username": "root",
          "password": "root",
          "writeMode": "replace",   
          "column": ["id", "name", "age", "updated_at"],
          "connection": [{
            "jdbcUrl": "jdbc:mysql://target:3306/db?useSSL=false",
            "table": ["users"]
          }]
        }
      }
    }]
  }
}
python datax.py job.json -p "-Dstart_time=2026-05-01 00:00:00"

MySQL → Elasticsearch

{
  "reader": {
    "name": "mysqlreader",
    "parameter": { "...": "..." }
  },
  "writer": {
    "name": "elasticsearchwriter",
    "parameter": {
      "endpoint": "http://es:9200",
      "indexName": "users",
      "typeName": "_doc",
      "actionType": "index",
      "column": [
        { "name": "id",   "type": "id" },
        { "name": "name", "type": "text" },
        { "name": "age",  "type": "integer" }
      ]
    }
  }
}

全量 + 增量迁移流程

大表迁移的标准流程,将停机窗口压缩到秒级:

1. 准备阶段
   ├── 评估数据量与表结构
   ├── 目标端建表,确认索引与约束
   └── 开启 CDC(如 Canal / Debezium),记录起始位点

2. 全量同步(DataX / pg_dump / mysqldump)
   ├── 限速避免对源库造成过大压力
   └── 记录全量同步结束时间 T1

3. 增量追平(CDC 回放 T1 之后的变更)
   └── 直到主从延迟 < 阈值(如 1s)

4. 切流窗口(秒级停机)
   ├── 停止写入源库(或切只读)
   ├── 等待 CDC 完全消费追平
   ├── 校验源目行数与 checksum
   └── 切换业务连接到目标库

5. 验证
   └── 业务监控观察,保留回滚方案 24~72h

性能调优

维度手段
并发通道DataX channel 数对应并发 JDBC 连接,按目标库承受力调整
批提交大小Writer batchSize 过小则 RTT 多,过大则事务内存压力大,通常 1000~5000
读取分片按主键范围或时间戳分片并行读取,避免单线程扫大表
网络带宽限速DataX bps 参数,防止打满网络影响业务
索引延后创建全量写入完成后再建索引,比边写边维护索引快数倍
关闭外键检查迁移期间 SET FOREIGN_KEY_CHECKS=0
调大 bulk 批次ES Writer bulkActions=5000,减少 HTTP 请求次数

Schema 演化处理

数据传输中源端和目标端 Schema 可能不同步:

场景处理方式
新增字段目标端提前加好可空字段,源端有值时自动同步
删除字段目标端保留字段(软删除),不影响历史数据
字段重命名在 ETL/Flink SQL 层做字段映射
类型变更(兼容)如 INT → BIGINT,先改目标端,再改源端
类型变更(不兼容)需停机或蓝绿切换
CDC + Schema 变更Debezium Schema Registry + Avro 自动演化

数据质量校验

迁移完成后必须校验数据一致性:

-- 行数对比
SELECT COUNT(*) FROM source.users;   -- 源
SELECT COUNT(*) FROM target.users;   -- 目标
 
-- 聚合校验(金额类字段)
SELECT SUM(amount), MAX(updated_at) FROM source.orders WHERE ...;
SELECT SUM(amount), MAX(updated_at) FROM target.orders WHERE ...;
 
-- 抽样 Hash 校验(DataX 自带 checksum 模式)
-- 或使用 pt-table-checksum(Percona Toolkit)
pt-table-checksum --databases=mydb h=source_host

相关

  • Kettle — 可视化 ETL 工具
  • Flink — 实时流式数据传输与处理
  • CDC — 基于日志的实时变更捕获
  • Canal — MySQL CDC 工具
  • 数据库事务 — 传输过程中的事务一致性保障