Flink 快照分析:定位大状态和数据倾斜的算子

x33g5p2x  于2021-12-30 转载在 Flink  
字(3.4k)|赞(0)|评价(0)|浏览(370)

在 Flink 作业中,无论是 SQL 还是 JAR 模式,常常会直接或者间接地使用到状态(State)。当 Flink 进行快照时,用户定义的这些状态数据可以被保存在状态点中,以供后续的崩溃恢复。

Flink 的状态分为 Operator StateKeyed State,而Keyed State 又可以分为 ValueState、MapState、ListState、AggregatingState、MergingState、ReducingState等多种类型。此外,这些林林总总的状态又有多种具体的实现(HeapState、RocksDBState 等),状态的存取还需要各种 Serializer 和 Deserializer 的参与,整个链路精妙而又繁杂。

对于普通用户而言,Flink 内部的运行模式就像黑盒,但是状态带来的困扰却是实实在在的,尤其是在使用 SQL 的多表 JOIN 或者 GROUP BY 等语义时,很容易因为状态越来越多,导致频繁的 TaskManager OOM(内存不足),影响线上业务的稳定性,更影响心情

很多用户面对持续崩溃的作业,以及磁盘上几十上百 GB 的快照文件,自己也随之崩溃了:这么大的状态,究竟里面存了什么?能不能删点内容呢?下文笔者将带领大家分析 Flink 快照系统,找出影响大状态和数据倾斜的算子。

一、快照的类型

Flink 的快照包括 Checkpoint(周期触发)和 Savepoint(用户主动触发)两种,其中 Checkpoint 分为普通 Checkpoint 和外部化(Externalized)Checkpoint。普通 Checkpoint 只能用于本次 JobManager 存活期间的内部恢复;而外部化 Checkpoint 和 Savepoint 可以用于从零开始的冷启动恢复。

对于 Savepoint,以及开启了 外部化特性(https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#externalized-checkpoints) 的 Checkpoint,Flink 会在快照目录生成一个元数据文件(快照目录中名为 _metadata 的文件),这个文件是我们分析快照时至关重要的线索。

二、快照的存储格式

我们先从这个元数据(_metadata 文件)入手,看一下它的数据结构:


Flink 快照 _metadata 文件结构

在 Master State 的不定长结构中,也有自己的 Magic Number、数据长度等信息,通常不会有太多数据。

Operator State 是状态的大头,在它的不定长结构中,主要包含了每个 Operator 的 ID(由两个 Long 拼起来组成),以及当前算子的并行度(parallelism)和最大并行度(maximum parallelism),还有子任务(subtask)状态的个数、每个子任务的 index、元数据(是否包含 raw 和 managed 的 Operator State、是否包含 raw 和 managed 的 Keyed State、包含哪些具体的状态、 KeyGroup 范围、偏移量、是否是 Incremental 状态、状态文件的指针 RelativeFileStateHandle 等)。

除元数据文件以外,还有很多具体的状态文件(RelativeFileStateHandle 指针指向文件),它们通常是因为尺寸过大而不能直接嵌入 _metadata 文件,只能以独立文件的方式存在的状态。

三、快照的读取方式

从上文可以看到,解析状态文件并非易事,有很多需要考虑的地方。解铃还须系铃人,我们可以用 Flink 自身来实现状态文件的读取和解析:

最简单的方式,是找到 Flink 恢复快照状态的源码,然后按图索骥查找反序列化 _metadata 文件的类。很快,我们就找到了 org.apache.flink.runtime.checkpoint.Checkpoints#loadCheckpointMetadata 这个静态方法,它可以将给定的数据流反序列化成 Flink 内部的 CheckpointMetadata对象(即上述文件的内存映射)。

如果只想处理元数据信息,而不涉及到读写具体的状态数据时,可以采用该方法。

2. 封装后的 State Processor API

在新的 Flink 版本中,还包含了封装后的 State Processor API(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/),通过这个 API,我们不仅可以读取具体的状态文件,还可以按需生成状态数据以供新的 Flink 作业使用。

使用 State Processor API 时,由于涉及到具体状态的读写,需要给定 StateBackend 实例,以及具体的 Operator UID 等信息,且是以 DataSet 批处理任务方式执行的,流程相对复杂,本文不再展开描述,后续会有单独的文章介绍其使用方式。

四、一起实践

我们来尝试使用 Flink 内部 API 来读取状态元数据信息,并统计分析哪些 Operator 的状态占比最大,以及这些 Operator 的各个 Subtask(多个并行度下的子任务)的状态用量。

示例代码(https://github.com/kylemeow/flink-snapshot-analyzer/blob/master/src/main/java/FlinkSnapshotAnalyzer.java)非常简单,这里展示一下具体的分析结果:


快照的分析结果(从小状态的算子开始输出)


快照的分析结果(最后是状态最大的算子)

可以看到,元数据文件里的各项信息都被打印输出了,而且显示出了 4421bbc22ac32fa6abe810c70a869c54这个 Operator 的状态占比最大,达到了 92.31%,且各个 Subtask 的状态量较为平均,都在 1.1G ~ 1.3G 之间,基本不存在数据倾斜的现象。

由于元数据里并不包含这个 Operator 的名字和类型等信息,需要通过查找日志搜索这个 Operator ID。从日志中可以看到是一个 InnerJoin 的算子。


从日志中寻找指定 ID 的算子

进一步细致分析源码可以得到,是 StreamingJoinOperator 这个流式 JOIN 算子的两个 JoinRecordStateView状态的数据。


StreamingJoinOperator 算子源码中状态存储的定义

从原理上来讲,要想实现两个流的常规 JOIN(无边界的 JOIN),必须永久保留两个流的所有数据备查,且默认没有清理机制(除非设置了下面的 Idle State Retention Time),因此这种 JOIN 在生产环境很容易因为状态过大而发生 OOM。我们建议用户使用 Interval JOIN(时间区间 JOIN)来代替,具体可以参考此篇文档(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/joins.html)。

另外一个在 SQL 环境下容易造成超大状态的算子是无边界的 GROUP BY,但还好 Flink 提供了 Idle State Retention Time 机制(https://cloud.tencent.com/developer/article/1452854?from=10680),可以配置状态的定期清理逻辑,将这些 GROUP BY 和 JOIN 的过期状态及时清理掉。

参考阅读

[1] https://ververica.cn/developers/introduction-to-state-management-and-fault-tolerance/

[2] https://ververica.cn/developers/state-management/

[3] https://flink.apache.org/feature/2019/09/13/state-processor-api.html

[4] https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/joins.html

相关文章