Flink

Apache Flink 是有状态的流批一体计算引擎,以低延迟、高吞吐和精确一次(Exactly-Once)语义著称。既可处理无界流数据(实时),也可处理有界批数据,是实时数仓和事件驱动应用的主流选择。

Spark 对比:Spark 是微批(Micro-Batch),Flink 是真正的逐条流处理,延迟更低。


架构

Client(提交作业)
      │
      ▼
JobManager(调度 Task、Checkpoint 协调、故障恢复)
      │
      ├── TaskManager 1(执行 Task,管理 Slot)
      ├── TaskManager 2
      └── TaskManager 3
角色职责
JobManager接收作业、生成执行图、调度 Task、触发 Checkpoint
TaskManager实际执行算子,每个 TM 有固定数量的 Task Slot
Task SlotTM 上的资源单元,隔离不同作业的内存

核心概念

时间语义

类型含义
Event Time事件实际发生时间(推荐,需处理乱序)
Processing Time算子处理时的系统时间(最低延迟,不精确)
Ingestion Time数据进入 Flink 的时间

Watermark

用于处理乱序数据,告知 Flink “Event Time ≤ W 的数据已全部到达”,触发窗口计算。

窗口类型

窗口说明
Tumbling Window固定大小,不重叠(如每 5 分钟统计一次)
Sliding Window固定大小,有重叠(如每 1 分钟统计过去 5 分钟)
Session Window按活动间隔划分,间隔超时则关闭窗口

DataStream API 示例

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
DataStream<String> stream = env.addSource(
    new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties)
);
 
stream
    .map(line -> line.split(","))
    .filter(fields -> fields.length >= 2)
    .keyBy(fields -> fields[0])
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new CountAggregator())
    .addSink(new FlinkKafkaProducer<>("result-topic", new SimpleStringSchema(), properties));
 
env.execute("Realtime Count Job");

Checkpoint 与容错

Flink 通过**分布式快照(Chandy-Lamport 算法)**实现精确一次容错:

JobManager 定期触发 Checkpoint
  → 向数据源注入 Barrier
  → 各算子收到 Barrier 后保存本地状态到外部存储(HDFS / S3)
  → 所有算子完成后 Checkpoint 成功
故障重启时从最近的 Checkpoint 恢复状态并重放数据
env.enableCheckpointing(60000);  // 每 60s 触发
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);

-- 创建 Kafka 源表
CREATE TABLE user_behavior (
  user_id  BIGINT,
  behavior STRING,
  ts       TIMESTAMP(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic'     = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format'    = 'json'
);
 
-- 实时统计每分钟 PV
SELECT
  TUMBLE_START(ts, INTERVAL '1' MINUTE) AS window_start,
  COUNT(*) AS pv
FROM user_behavior
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);

State(状态管理)

Flink 的核心能力之一:算子可以保存跨事件的状态,是有状态流处理的基础。

Keyed State(按 Key 分区)

// ValueState:每个 Key 存一个值
ValueStateDescriptor<Long> desc =
    new ValueStateDescriptor<>("count", Long.class);
ValueState<Long> count = getRuntimeContext().getState(desc);
 
// MapState:每个 Key 存一个 Map
MapStateDescriptor<String, Integer> mapDesc =
    new MapStateDescriptor<>("scores", String.class, Integer.class);
 
// ListState:每个 Key 存一个列表
// ReducingState:每个 Key 聚合为单值(自动合并新增元素)
// AggregatingState:泛化版 ReducingState,输入输出类型可不同

Operator State(算子级)

不按 Key 分区,整个算子实例共享一个状态,常用于 Source/Sink:

// 如 FlinkKafkaConsumer 用 ListState 保存各分区 offset
ListState<Tuple2<Integer, Long>> offsetState;

State Backend

决定 State 存储位置和 Checkpoint 存储方式:

BackendState 存储适用场景
HashMapStateBackend(默认)JVM 堆内存状态量小,低延迟
EmbeddedRocksDBStateBackend本地磁盘(RocksDB)超大状态(TB 级),避免 GC 压力
// 使用 RocksDB(需引入 flink-statebackend-rocksdb 依赖)
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");

Checkpoint vs Savepoint

CheckpointSavepoint
触发方式自动定期触发手动触发
目的故障自动恢复计划内停机、版本升级、A/B 测试
生命周期自动清理旧版本永久保留,手动删除
格式可增量(RocksDB)总是全量
# 手动触发 Savepoint
flink savepoint <jobId> hdfs:///savepoints/
 
# 从 Savepoint 恢复
flink run -s hdfs:///savepoints/savepoint-xxx -c com.example.Main job.jar

背压(Backpressure)

下游算子处理速度跟不上上游时,通过缓冲区反向传播压力,最终减缓数据源速率:

Source → Op1 → Op2(慢)
               ↑ 缓冲区满
         ↑ 被迫减速

Flink WebUI 中背压指标:

  • OK(0~0.1):正常
  • LOW(0.1~0.5):轻微背压
  • HIGH(0.5~1.0):严重背压,需扩并行度或优化算子

常用 Connector

Connector依赖用途
flink-connector-kafkakafka-clients最常用 Source/Sink
flink-connector-jdbcJDBC 驱动写入 MySQL/PG 等
flink-connector-elasticsearchES Client实时写入 ES
flink-connector-filesystem写入 HDFS/S3/OSS(支持 Parquet/ORC)
flink-connector-mysql-cdcDebeziumMySQL CDC Source,无需 Kafka
flink-connector-hudiHudi湖仓一体写入

Kafka Source 精确一次

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("input-topic")
    .setGroupId("flink-group")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();
 
// 精确一次需要:Checkpoint + Kafka Source offset 随 Checkpoint 提交
env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);

Flink CDC 直接将 MySQL / PostgreSQL 等数据库变更作为 Flink Source,无需额外 Kafka:

CREATE TABLE products (
  id       INT,
  name     STRING,
  price    DECIMAL(10, 2),
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector'         = 'mysql-cdc',
  'hostname'          = 'mysql',
  'port'              = '3306',
  'username'          = 'root',
  'password'          = 'root',
  'database-name'     = 'shop',
  'table-name'        = 'products',
  'scan.startup.mode' = 'initial'   -- 先全量快照,再增量
);

详见 CDC


部署模式

模式说明适用
Standalone手动启动 JM + TM,无资源调度开发测试
YARNFlink 作为 YARN Application,按需申请资源传统 Hadoop 集群
Kubernetes(Native)Flink 直接与 K8s API 交互,动态申请 Pod云原生生产推荐
Kubernetes(Session)共享集群,多作业复用 TM 资源资源利用率高但隔离弱
# K8s Application Mode 提交
./bin/flink run-application \
  --target kubernetes-application \
  -Dkubernetes.cluster-id=my-flink-job \
  -Dkubernetes.container.image=flink:1.19 \
  local:///opt/flink/usrlib/job.jar

维度FlinkSpark
流处理模型真正逐条流(Native Streaming)微批(Micro-Batch)
延迟毫秒级秒级(批次间隔)
状态管理内置,RocksDB 支持超大状态依赖外部存储
批处理支持(DataSet / Table API)更成熟,生态更丰富
SQL 支持Flink SQL(持续查询)Spark SQL(批为主)
机器学习较弱MLlib 成熟

相关链接

  • Spark — 同为大数据计算引擎,Spark 擅长批处理,Flink 擅长流处理
  • Hadoop — 提供 HDFS 作为 Flink Checkpoint 存储
  • Flume — 传统日志采集方案,实时场景可被 Flink Source 替代
  • Hive — Flink 可通过 Hive Catalog 读取 Hive 表元数据
  • CDC — Flink CDC Source 原理与用法
  • 数据传输 — Flink 在数据管道中的定位