Flink
Apache Flink 是有状态的流批一体计算引擎,以低延迟、高吞吐和精确一次(Exactly-Once)语义著称。既可处理无界流数据(实时),也可处理有界批数据,是实时数仓和事件驱动应用的主流选择。
与 Spark 对比:Spark 是微批(Micro-Batch),Flink 是真正的逐条流处理,延迟更低。
架构
Client(提交作业)
│
▼
JobManager(调度 Task、Checkpoint 协调、故障恢复)
│
├── TaskManager 1(执行 Task,管理 Slot)
├── TaskManager 2
└── TaskManager 3
| 角色 | 职责 |
|---|---|
| JobManager | 接收作业、生成执行图、调度 Task、触发 Checkpoint |
| TaskManager | 实际执行算子,每个 TM 有固定数量的 Task Slot |
| Task Slot | TM 上的资源单元,隔离不同作业的内存 |
核心概念
时间语义
| 类型 | 含义 |
|---|---|
| Event Time | 事件实际发生时间(推荐,需处理乱序) |
| Processing Time | 算子处理时的系统时间(最低延迟,不精确) |
| Ingestion Time | 数据进入 Flink 的时间 |
Watermark
用于处理乱序数据,告知 Flink “Event Time ≤ W 的数据已全部到达”,触发窗口计算。
窗口类型
| 窗口 | 说明 |
|---|---|
| Tumbling Window | 固定大小,不重叠(如每 5 分钟统计一次) |
| Sliding Window | 固定大小,有重叠(如每 1 分钟统计过去 5 分钟) |
| Session Window | 按活动间隔划分,间隔超时则关闭窗口 |
DataStream API 示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.addSource(
new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties)
);
stream
.map(line -> line.split(","))
.filter(fields -> fields.length >= 2)
.keyBy(fields -> fields[0])
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new CountAggregator())
.addSink(new FlinkKafkaProducer<>("result-topic", new SimpleStringSchema(), properties));
env.execute("Realtime Count Job");Checkpoint 与容错
Flink 通过**分布式快照(Chandy-Lamport 算法)**实现精确一次容错:
JobManager 定期触发 Checkpoint
→ 向数据源注入 Barrier
→ 各算子收到 Barrier 后保存本地状态到外部存储(HDFS / S3)
→ 所有算子完成后 Checkpoint 成功
故障重启时从最近的 Checkpoint 恢复状态并重放数据
env.enableCheckpointing(60000); // 每 60s 触发
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);Flink SQL
-- 创建 Kafka 源表
CREATE TABLE user_behavior (
user_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- 实时统计每分钟 PV
SELECT
TUMBLE_START(ts, INTERVAL '1' MINUTE) AS window_start,
COUNT(*) AS pv
FROM user_behavior
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);State(状态管理)
Flink 的核心能力之一:算子可以保存跨事件的状态,是有状态流处理的基础。
Keyed State(按 Key 分区)
// ValueState:每个 Key 存一个值
ValueStateDescriptor<Long> desc =
new ValueStateDescriptor<>("count", Long.class);
ValueState<Long> count = getRuntimeContext().getState(desc);
// MapState:每个 Key 存一个 Map
MapStateDescriptor<String, Integer> mapDesc =
new MapStateDescriptor<>("scores", String.class, Integer.class);
// ListState:每个 Key 存一个列表
// ReducingState:每个 Key 聚合为单值(自动合并新增元素)
// AggregatingState:泛化版 ReducingState,输入输出类型可不同Operator State(算子级)
不按 Key 分区,整个算子实例共享一个状态,常用于 Source/Sink:
// 如 FlinkKafkaConsumer 用 ListState 保存各分区 offset
ListState<Tuple2<Integer, Long>> offsetState;State Backend
决定 State 存储位置和 Checkpoint 存储方式:
| Backend | State 存储 | 适用场景 |
|---|---|---|
| HashMapStateBackend(默认) | JVM 堆内存 | 状态量小,低延迟 |
| EmbeddedRocksDBStateBackend | 本地磁盘(RocksDB) | 超大状态(TB 级),避免 GC 压力 |
// 使用 RocksDB(需引入 flink-statebackend-rocksdb 依赖)
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");Checkpoint vs Savepoint
| Checkpoint | Savepoint | |
|---|---|---|
| 触发方式 | 自动定期触发 | 手动触发 |
| 目的 | 故障自动恢复 | 计划内停机、版本升级、A/B 测试 |
| 生命周期 | 自动清理旧版本 | 永久保留,手动删除 |
| 格式 | 可增量(RocksDB) | 总是全量 |
# 手动触发 Savepoint
flink savepoint <jobId> hdfs:///savepoints/
# 从 Savepoint 恢复
flink run -s hdfs:///savepoints/savepoint-xxx -c com.example.Main job.jar背压(Backpressure)
下游算子处理速度跟不上上游时,通过缓冲区反向传播压力,最终减缓数据源速率:
Source → Op1 → Op2(慢)
↑ 缓冲区满
↑ 被迫减速
Flink WebUI 中背压指标:
OK(0~0.1):正常LOW(0.1~0.5):轻微背压HIGH(0.5~1.0):严重背压,需扩并行度或优化算子
常用 Connector
| Connector | 依赖 | 用途 |
|---|---|---|
flink-connector-kafka | kafka-clients | 最常用 Source/Sink |
flink-connector-jdbc | JDBC 驱动 | 写入 MySQL/PG 等 |
flink-connector-elasticsearch | ES Client | 实时写入 ES |
flink-connector-filesystem | — | 写入 HDFS/S3/OSS(支持 Parquet/ORC) |
flink-connector-mysql-cdc | Debezium | MySQL CDC Source,无需 Kafka |
flink-connector-hudi | Hudi | 湖仓一体写入 |
Kafka Source 精确一次
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("input-topic")
.setGroupId("flink-group")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 精确一次需要:Checkpoint + Kafka Source offset 随 Checkpoint 提交
env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);Flink CDC(数据库实时 Source)
Flink CDC 直接将 MySQL / PostgreSQL 等数据库变更作为 Flink Source,无需额外 Kafka:
CREATE TABLE products (
id INT,
name STRING,
price DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'shop',
'table-name' = 'products',
'scan.startup.mode' = 'initial' -- 先全量快照,再增量
);详见 CDC。
部署模式
| 模式 | 说明 | 适用 |
|---|---|---|
| Standalone | 手动启动 JM + TM,无资源调度 | 开发测试 |
| YARN | Flink 作为 YARN Application,按需申请资源 | 传统 Hadoop 集群 |
| Kubernetes(Native) | Flink 直接与 K8s API 交互,动态申请 Pod | 云原生生产推荐 |
| Kubernetes(Session) | 共享集群,多作业复用 TM 资源 | 资源利用率高但隔离弱 |
# K8s Application Mode 提交
./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-flink-job \
-Dkubernetes.container.image=flink:1.19 \
local:///opt/flink/usrlib/job.jarFlink vs Spark 对比
| 维度 | Flink | Spark |
|---|---|---|
| 流处理模型 | 真正逐条流(Native Streaming) | 微批(Micro-Batch) |
| 延迟 | 毫秒级 | 秒级(批次间隔) |
| 状态管理 | 内置,RocksDB 支持超大状态 | 依赖外部存储 |
| 批处理 | 支持(DataSet / Table API) | 更成熟,生态更丰富 |
| SQL 支持 | Flink SQL(持续查询) | Spark SQL(批为主) |
| 机器学习 | 较弱 | MLlib 成熟 |