Apache SeaTunnel

返回 运维工具

Apache SeaTunnel 是批流一体的数据集成平台,支持 100+ 数据源,可替代 DataX 处理更复杂的数据同步场景,支持 Flink/Spark 引擎以及自研 Zeta 引擎。


与 DataX 对比

维度DataX(阿里)SeaTunnel(Apache)
模式仅离线批量批流一体
引擎单机多线程Flink / Spark / Zeta(本地)
数据源数量~30 种插件100+ Connector
实时支持不支持支持 CDC、Kafka 等实时源
转换能力简单字段映射SQL Transform,支持复杂 SQL 转换
配置格式JSONHOCON(.conf 文件)
集群部署不支持支持(Flink/Spark on YARN/K8s)
社区活跃度阿里维护,更新较慢Apache 顶级项目,社区活跃

核心概念

Source(读取插件)→ Transform(转换,可选)→ Sink(写入插件)
      ↑                   ↑                      ↑
  MySQL/Kafka/ES      SQL/字段映射             ClickHouse/ES/Kafka
  文件/API/JDBC       过滤/类型转换           数据库/文件/MQ
组件说明
Source数据来源(MySQL/PG/MongoDB/Kafka/OSS/API 等)
Transform中间转换(SQL 查询、字段映射、类型转换、过滤)
Sink数据目标(ClickHouse/ES/Redis/Kafka/数据库 等)
Job一个 Source → Transform → Sink 的完整管道
Engine执行引擎:Zeta(本地,推荐)/ Flink / Spark

安装(Zeta 本地引擎)

# 下载(选择对应版本)
wget https://archive.apache.org/dist/seatunnel/2.3.4/apache-seatunnel-2.3.4-bin.tar.gz
tar -zxvf apache-seatunnel-2.3.4-bin.tar.gz
cd apache-seatunnel-2.3.4
 
# 安装 Connector 插件(按需选择)
sh bin/install-plugin.sh
 
# 或手动指定插件版本
sh bin/install-plugin.sh 2.3.4
# config/seatunnel.yaml(全局配置)
seatunnel:
  engine:
    backup-count: 1
    queue-type: blockingqueue
    print-execution-info-interval: 60
    slot-service:
      dynamic-slot: true
    checkpoint:
      interval: 10000
      timeout: 60000

作业配置格式(HOCON)

# job.conf
env {
  job.name = "mysql_to_clickhouse"
  job.mode = "BATCH"          # BATCH(批量)或 STREAMING(实时)
  parallelism = 4             # 并行度
  checkpoint.interval = 10000 # 实时模式的 checkpoint 间隔(ms)
}
 
source {
  Jdbc {
    url = "jdbc:mysql://localhost:3306/shop?useSSL=false"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = "root"
    query = "SELECT id, user_id, product_id, amount, created_at FROM orders WHERE dt = '${today}'"
    result_table_name = "orders_source"
  }
}
 
transform {
  # 可选的 SQL 转换
  Sql {
    source_table_name = "orders_source"
    result_table_name = "orders_clean"
    query = """
      SELECT
        id,
        user_id,
        product_id,
        CAST(amount AS DECIMAL(10,2)) AS amount,
        toDate(created_at) AS order_date
      FROM orders_source
      WHERE amount > 0
    """
  }
}
 
sink {
  ClickHouse {
    host = "clickhouse:8123"
    database = "dw"
    table = "ods_orders"
    username = "default"
    password = ""
    clickhouse.config {
      max_rows_to_group_by = "1000000"
    }
    # 写入模式:insert(追加)或 replace(覆盖)
    save_mode = "append"
  }
}

常用场景示例

MySQL → Elasticsearch(全量同步)

env {
  job.name = "mysql_to_es"
  job.mode = "BATCH"
}
 
source {
  Jdbc {
    url = "jdbc:mysql://mysql:3306/shop"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = "root"
    query = "SELECT id, name, category, price FROM products"
  }
}
 
sink {
  Elasticsearch {
    hosts = ["http://es:9200"]
    index = "products"
    index_type = "_doc"
    primary_keys = ["id"]     # 用于 upsert
  }
}

Kafka → ClickHouse(实时流)

env {
  job.name = "kafka_to_clickhouse_stream"
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}
 
source {
  Kafka {
    topic = "user_events"
    bootstrap.servers = "kafka:9092"
    consumer.group = "seatunnel_group"
    start.mode = "EARLIEST"
    format = "json"
    schema = {
      fields {
        user_id = "long"
        event_type = "string"
        ts = "long"
      }
    }
  }
}
 
sink {
  ClickHouse {
    host = "clickhouse:8123"
    database = "dw"
    table = "ods_user_events"
    username = "default"
    password = ""
  }
}

MySQL CDC → Kafka(实时变更捕获)

env {
  job.name = "mysql_cdc_to_kafka"
  job.mode = "STREAMING"
}
 
source {
  MySQL-CDC {
    hostname = "mysql"
    port = 3306
    username = "root"
    password = "root"
    database-names = ["shop"]
    table-names = ["shop.orders"]
    server-id = "5401-5404"
    base-url = "jdbc:mysql://mysql:3306/shop"
  }
}
 
sink {
  Kafka {
    topic = "mysql_cdc_orders"
    bootstrap.servers = "kafka:9092"
    kafka.config {
      acks = "all"
    }
  }
}

多源合并(一 Job 多 Source)

source {
  Jdbc {
    url = "jdbc:mysql://mysql1:3306/db"
    query = "SELECT * FROM users"
    result_table_name = "users_mysql"
  }
  Jdbc {
    url = "jdbc:postgresql://pg:5432/db"
    driver = "org.postgresql.Driver"
    user = "postgres"
    password = "postgres"
    query = "SELECT * FROM users"
    result_table_name = "users_pg"
  }
}
 
transform {
  Sql {
    query = "SELECT * FROM users_mysql UNION ALL SELECT * FROM users_pg"
    result_table_name = "users_merged"
  }
}
 
sink {
  ClickHouse {
    ...
    source_table_name = "users_merged"
  }
}

运行作业

# 本地 Zeta 引擎运行(开发/测试)
bin/seatunnel.sh --config job.conf -e local
 
# 集群 Zeta 引擎运行(生产)
bin/seatunnel.sh --config job.conf -e cluster
 
# Flink 引擎(需要 Flink 集群)
bin/start-seatunnel-flink-15-connector-v2.sh --config job.conf
 
# 传递参数
bin/seatunnel.sh --config job.conf \
  --variable today=2026-05-18 \
  -e local

Web UI(SeaTunnel Web,可选)

# 启动 SeaTunnel Web(需单独部署)
bin/seatunnel-backend-daemon.sh start
 
# 功能:可视化作业设计、作业调度、运行监控
# 端口:8801

与调度系统集成

# DolphinScheduler 中调用 SeaTunnel
# 在 DolphinScheduler 添加 Shell 任务:
/opt/seatunnel/bin/seatunnel.sh --config /jobs/daily_sync.conf -e cluster
 
# Airflow DAG 中调用:
BashOperator(
    task_id='seatunnel_sync',
    bash_command='/opt/seatunnel/bin/seatunnel.sh --config /jobs/sync.conf -e cluster',
)

Connector 支持一览(部分)

分类SourceSink
关系型数据库MySQL / PG / Oracle / SQL Server / DB2同左
NoSQLMongoDB / Redis / Cassandra同左
大数据Hive / HBase / HDFS / S3 / OSS同左
搜索引擎Elasticsearch / OpenSearch同左
消息队列Kafka / Pulsar / RabbitMQ / RocketMQ同左
列式数仓ClickHouse / Doris / StarRocks同左
文件CSV / JSON / Parquet / ORC / Excel同左
云数仓Snowflake / BigQuery / Redshift同左
CDCMySQL-CDC / PG-CDC / Oracle-CDC / MongoDB-CDC
HTTPHTTP / REST APIHTTP

性能调优

参数说明
parallelism并行度,对应 Zeta 引擎的 Slot 数
batch_sizeSink 批提交大小(如 ClickHouse/JDBC)
fetch_sizeJDBC Source 游标每次拉取行数
checkpoint.interval流式模式 checkpoint 频率
max_retriesSource/Sink 重试次数
# JDBC Source 分片并行读取(加速大表)
source {
  Jdbc {
    ...
    partition_column = "id"          # 分片列(通常是主键)
    partition_upper_bound = 10000000 # 最大值
    partition_lower_bound = 1        # 最小值
    partition_num = 10               # 切成 10 片并行读取
  }
}

相关

  • 数据传输 — DataX 与 SeaTunnel 的对比、全量+增量迁移流程
  • CDC — Debezium / Flink CDC,与 SeaTunnel MySQL-CDC Source 的关系
  • 数仓架构 — SeaTunnel 作为 ODS 层数据入仓工具
  • Kettle — 可视化 ETL 工具,重转换逻辑场景
  • Flink — SeaTunnel 支持 Flink 作为执行引擎
  • ClickHouse — SeaTunnel 常用 Sink 目标