Apache SeaTunnel
→ 返回 运维工具
Apache SeaTunnel 是批流一体的数据集成平台,支持 100+ 数据源,可替代 DataX 处理更复杂的数据同步场景,支持 Flink/Spark 引擎以及自研 Zeta 引擎。
与 DataX 对比
| 维度 | DataX(阿里) | SeaTunnel(Apache) |
|---|
| 模式 | 仅离线批量 | 批流一体 |
| 引擎 | 单机多线程 | Flink / Spark / Zeta(本地) |
| 数据源数量 | ~30 种插件 | 100+ Connector |
| 实时支持 | 不支持 | 支持 CDC、Kafka 等实时源 |
| 转换能力 | 简单字段映射 | SQL Transform,支持复杂 SQL 转换 |
| 配置格式 | JSON | HOCON(.conf 文件) |
| 集群部署 | 不支持 | 支持(Flink/Spark on YARN/K8s) |
| 社区活跃度 | 阿里维护,更新较慢 | Apache 顶级项目,社区活跃 |
核心概念
Source(读取插件)→ Transform(转换,可选)→ Sink(写入插件)
↑ ↑ ↑
MySQL/Kafka/ES SQL/字段映射 ClickHouse/ES/Kafka
文件/API/JDBC 过滤/类型转换 数据库/文件/MQ
| 组件 | 说明 |
|---|
| Source | 数据来源(MySQL/PG/MongoDB/Kafka/OSS/API 等) |
| Transform | 中间转换(SQL 查询、字段映射、类型转换、过滤) |
| Sink | 数据目标(ClickHouse/ES/Redis/Kafka/数据库 等) |
| Job | 一个 Source → Transform → Sink 的完整管道 |
| Engine | 执行引擎:Zeta(本地,推荐)/ Flink / Spark |
安装(Zeta 本地引擎)
# 下载(选择对应版本)
wget https://archive.apache.org/dist/seatunnel/2.3.4/apache-seatunnel-2.3.4-bin.tar.gz
tar -zxvf apache-seatunnel-2.3.4-bin.tar.gz
cd apache-seatunnel-2.3.4
# 安装 Connector 插件(按需选择)
sh bin/install-plugin.sh
# 或手动指定插件版本
sh bin/install-plugin.sh 2.3.4
# config/seatunnel.yaml(全局配置)
seatunnel:
engine:
backup-count: 1
queue-type: blockingqueue
print-execution-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
interval: 10000
timeout: 60000
作业配置格式(HOCON)
# job.conf
env {
job.name = "mysql_to_clickhouse"
job.mode = "BATCH" # BATCH(批量)或 STREAMING(实时)
parallelism = 4 # 并行度
checkpoint.interval = 10000 # 实时模式的 checkpoint 间隔(ms)
}
source {
Jdbc {
url = "jdbc:mysql://localhost:3306/shop?useSSL=false"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "root"
query = "SELECT id, user_id, product_id, amount, created_at FROM orders WHERE dt = '${today}'"
result_table_name = "orders_source"
}
}
transform {
# 可选的 SQL 转换
Sql {
source_table_name = "orders_source"
result_table_name = "orders_clean"
query = """
SELECT
id,
user_id,
product_id,
CAST(amount AS DECIMAL(10,2)) AS amount,
toDate(created_at) AS order_date
FROM orders_source
WHERE amount > 0
"""
}
}
sink {
ClickHouse {
host = "clickhouse:8123"
database = "dw"
table = "ods_orders"
username = "default"
password = ""
clickhouse.config {
max_rows_to_group_by = "1000000"
}
# 写入模式:insert(追加)或 replace(覆盖)
save_mode = "append"
}
}
常用场景示例
MySQL → Elasticsearch(全量同步)
env {
job.name = "mysql_to_es"
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://mysql:3306/shop"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "root"
query = "SELECT id, name, category, price FROM products"
}
}
sink {
Elasticsearch {
hosts = ["http://es:9200"]
index = "products"
index_type = "_doc"
primary_keys = ["id"] # 用于 upsert
}
}
Kafka → ClickHouse(实时流)
env {
job.name = "kafka_to_clickhouse_stream"
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Kafka {
topic = "user_events"
bootstrap.servers = "kafka:9092"
consumer.group = "seatunnel_group"
start.mode = "EARLIEST"
format = "json"
schema = {
fields {
user_id = "long"
event_type = "string"
ts = "long"
}
}
}
}
sink {
ClickHouse {
host = "clickhouse:8123"
database = "dw"
table = "ods_user_events"
username = "default"
password = ""
}
}
MySQL CDC → Kafka(实时变更捕获)
env {
job.name = "mysql_cdc_to_kafka"
job.mode = "STREAMING"
}
source {
MySQL-CDC {
hostname = "mysql"
port = 3306
username = "root"
password = "root"
database-names = ["shop"]
table-names = ["shop.orders"]
server-id = "5401-5404"
base-url = "jdbc:mysql://mysql:3306/shop"
}
}
sink {
Kafka {
topic = "mysql_cdc_orders"
bootstrap.servers = "kafka:9092"
kafka.config {
acks = "all"
}
}
}
多源合并(一 Job 多 Source)
source {
Jdbc {
url = "jdbc:mysql://mysql1:3306/db"
query = "SELECT * FROM users"
result_table_name = "users_mysql"
}
Jdbc {
url = "jdbc:postgresql://pg:5432/db"
driver = "org.postgresql.Driver"
user = "postgres"
password = "postgres"
query = "SELECT * FROM users"
result_table_name = "users_pg"
}
}
transform {
Sql {
query = "SELECT * FROM users_mysql UNION ALL SELECT * FROM users_pg"
result_table_name = "users_merged"
}
}
sink {
ClickHouse {
...
source_table_name = "users_merged"
}
}
运行作业
# 本地 Zeta 引擎运行(开发/测试)
bin/seatunnel.sh --config job.conf -e local
# 集群 Zeta 引擎运行(生产)
bin/seatunnel.sh --config job.conf -e cluster
# Flink 引擎(需要 Flink 集群)
bin/start-seatunnel-flink-15-connector-v2.sh --config job.conf
# 传递参数
bin/seatunnel.sh --config job.conf \
--variable today=2026-05-18 \
-e local
Web UI(SeaTunnel Web,可选)
# 启动 SeaTunnel Web(需单独部署)
bin/seatunnel-backend-daemon.sh start
# 功能:可视化作业设计、作业调度、运行监控
# 端口:8801
与调度系统集成
# DolphinScheduler 中调用 SeaTunnel
# 在 DolphinScheduler 添加 Shell 任务:
/opt/seatunnel/bin/seatunnel.sh --config /jobs/daily_sync.conf -e cluster
# Airflow DAG 中调用:
BashOperator(
task_id='seatunnel_sync',
bash_command='/opt/seatunnel/bin/seatunnel.sh --config /jobs/sync.conf -e cluster',
)
Connector 支持一览(部分)
| 分类 | Source | Sink |
|---|
| 关系型数据库 | MySQL / PG / Oracle / SQL Server / DB2 | 同左 |
| NoSQL | MongoDB / Redis / Cassandra | 同左 |
| 大数据 | Hive / HBase / HDFS / S3 / OSS | 同左 |
| 搜索引擎 | Elasticsearch / OpenSearch | 同左 |
| 消息队列 | Kafka / Pulsar / RabbitMQ / RocketMQ | 同左 |
| 列式数仓 | ClickHouse / Doris / StarRocks | 同左 |
| 文件 | CSV / JSON / Parquet / ORC / Excel | 同左 |
| 云数仓 | Snowflake / BigQuery / Redshift | 同左 |
| CDC | MySQL-CDC / PG-CDC / Oracle-CDC / MongoDB-CDC | — |
| HTTP | HTTP / REST API | HTTP |
性能调优
| 参数 | 说明 |
|---|
parallelism | 并行度,对应 Zeta 引擎的 Slot 数 |
batch_size | Sink 批提交大小(如 ClickHouse/JDBC) |
fetch_size | JDBC Source 游标每次拉取行数 |
checkpoint.interval | 流式模式 checkpoint 频率 |
max_retries | Source/Sink 重试次数 |
# JDBC Source 分片并行读取(加速大表)
source {
Jdbc {
...
partition_column = "id" # 分片列(通常是主键)
partition_upper_bound = 10000000 # 最大值
partition_lower_bound = 1 # 最小值
partition_num = 10 # 切成 10 片并行读取
}
}
相关
- 数据传输 — DataX 与 SeaTunnel 的对比、全量+增量迁移流程
- CDC — Debezium / Flink CDC,与 SeaTunnel MySQL-CDC Source 的关系
- 数仓架构 — SeaTunnel 作为 ODS 层数据入仓工具
- Kettle — 可视化 ETL 工具,重转换逻辑场景
- Flink — SeaTunnel 支持 Flink 作为执行引擎
- ClickHouse — SeaTunnel 常用 Sink 目标