字节跳动 Flink 状态查询实践与优化
发布时间:2022-08-02 12:03:47 所属栏目:云计算 来源:互联网
导读:背景 众所周知,Flink 中的 State 保存了算子计算过程的中间结果。当任务出现异常时,可以通过查询任务快照中的 State 获取有效线索。 但目前对于 Flink SQL 任务来说,当我们想要查询作业 State 时,通常会因为无法获知 State 的定义方式和具体类型等信息,
背景 众所周知,Flink 中的 State 保存了算子计算过程的中间结果。当任务出现异常时,可以通过查询任务快照中的 State 获取有效线索。 但目前对于 Flink SQL 任务来说,当我们想要查询作业 State 时,通常会因为无法获知 State 的定义方式和具体类型等信息,而导致查询 State 的成本过高。 为了解决这个问题,字节跳动流式计算团队在内部提出了 State Query on Flink SQL 的解决方案——用户通过写 SQL 的方式就可以简单地查询 State。本文将主要介绍字节跳动在 Flink 状态查询这方面所进行的相关工作。 State Processor API 介绍 提到状态查询,我们自然会联想到 Flink 在 1.9 版本提出的特性 -- State Processor API。使用 State Processor API,我们可以将作业产生的 Savepoint 转换成 DataSet,然后使用 DataSet API 完成对 State 的查询、修改和初始化等操作。 下面简单介绍一下如何使用 State Processor API 来完成 State 的查询: 首先创建 ExistingSavepoint 用来表示一个 Savepoint。初始化 ExistingSavepoint 时需要提供 Savepoint 路径和 StateBackend 等信息; 然后实现 ReaderFunction 用于重新注册所需要查询的 State 以及定义处理 State 的方式。查询状态的过程中会遍历所有的 Key 并按照我们定义的方式去操作 State; 最后,调用 Savepoint.readKeyedState 并传入算子的 uid 和 ReaderFunction,就可以完成 State 的查询。 接下来为大家简述一下 State 查询背后的原理。 在 Savepoint 目录中包含两种文件,一种是状态数据文件,比如上图中的 opA-1-state ,这个文件里面保存着算子 A 在第一个 SubTask 状态的明细数据;还有一种元数据文件,对应上图中的 _metadata,元数据文件中保存了每个算子和状态文件的映射关系。 当我们在进行状态查询的时候。首先在 Client 端会根据 Savepoint 路径去解析 metadata 文件。通过算子 ID,可以获取需要查询的状态所对应的文件的句柄。当状态查询真正执行时,负责读取状态的 Task 会创建一个新的 StateBackend ,然后将状态文件中的数据恢复到 Statebackend 中。等到状态恢复完成之后就会遍历全部的 Key 并把对应的状态交给 ReaderFunction 处理。 有些同学可能会问,既然社区已经提供了查询 State 的功能,我们为什么还要去做同样的工作呢?主要是因为我们在使用 State Processor API 的过程中发现一些问题: 每次查询 State 我们都需要独立开发一个 Flink Batch 任务,对用户来说具有一定的开发成本; 实现 ReaderFunction 的时候需要比较清晰地了解任务状态的定义方式,包括 State 的名称、类型以及 State Descriptor 等信息,对用户来说使用门槛高较高; 使用 State Processor API 时,只能查询单个算子状态,无法同时查询多个算子的状态; 无法直接查询任务状态的元信息,比如查询任务使用了哪些状态,或者查询某个状态的类型。 总体来说,我们的目标有两个,一是降低用户的使用成本;二是增强状态查询的功能。我们希望用户在查询 State 时能用最简单的方式;同时也不需要知道任何信息。 此外,我们还希望用户能同时查询多个算子的 State ,也可以直接查询作业使用了哪些 State,每个 State 的类型是什么。 因此,我们提出了 State Query on Flink SQL 的解决方案。简单来说是把 State 当成数据库一样,让用户通过写 SQL 的方式就可以很简单地查询 State。 在这个方案中,我们需要解决两个问题: 如何对用户屏蔽 State 的信息:参考 State Processor API 我们可以知道,查询 State 需要提供非常多的信息,比如 Savepoint 路径、 StateBacked 类型、算子 id 、State Descriptor 等等。通过 SQL 语句显然难以完整地表述这些复杂的信息,那么查询状态到底需要哪些内容,我们又如何对用户屏蔽 State 里复杂的细节呢?这是我们面对的第一个难点。 如何用 SQL 表达 State:State 在 Flink 中的存储方式并不像 Database 一样,我们如何去用 SQL 来表达状态的查询过程呢?这是我们要解决的另一个难点。 StateMeta Snapshot 机制 首先我们来回答第一个问题,查询一个 State 需要哪些信息呢? 可以参考上文中 State Processor API 的示例,当我们创建 ExistingSavepoint 和 ReaderFunction 的时候,我们需要提供的信息有 Savepoint 路径、Backend 类型、OperatorID、算子 key 的类型、State 名称以及 Serializer 等等,我们可以将这些统一称为状态的元信息。 对于 Flink SQL 任务来说,要清楚地了解这些信息,对用户来说门槛是非常高的。我们的想法是让用户只需要提供最简单的信息,即 Savepoint ID ,然后由 Flink 框架把其他的元信息都存在 Savepoint 中,这样就可以对用户屏蔽 State 那些复杂的细节,完成状态的查询。因此,我们引入了 StateMeta Snapshot 机制。 StateMeta Snapshot 简单来说就是把状态的元信息添加到 Savepoint Metadata 的过程,具体步骤如下: 首先在 State 注册的时候,Task 会把 operatorNameIDKeySerializerStateDescriptors 等元信息都保存在 Task 的内存中; 触发 Savepoint 时,Task 会在制作快照的同时,对状态的元信息也同样进行快照。快照完成之后将状态的元信息 (StateMeta) 和状态文件的句柄 (StateHandle) 一起上报给 JobManager; JobManager 在收到所有 Task 上报的 StateMeta 信息之后 ,将这些状态元信息进行合并,最后会把合并之后的状态元信息保存到 Savepoint 目录里名为 stateInfo 的文件中。 之后在状态查询时就只需解析 Savepoint 中的 stateInfo 文件,而不再需要用户通过代码去输入这些 State 的元信息。通过这样的方式可以很大程度地降低用户查询状态的成本。 State as Database 接下来我们来回答第二个问题,我们如何用 SQL 来表达 State。其实社区在设计 State Processor API 的时候就提出了一些解决思路,也就是 State As Database。 在传统的数据库中,通常用 Catalog、Database、Table 这个三个元素来表示一个 Table,其实我们也可以将用样的逻辑到映射到 Flink State 上。我们可以把 Flink 的 State 当作一种特殊的数据源,作业每次产生的 Savepoint 都当作一个独立 DB 。在这个 DB 中,我们将 State 元信息、State 的明细数据,都抽象成不同的 Table 暴露给用户,用户直接查询这些 Table 就可以获取任务的状态信息。 首先我们来看如何把 State 表示为 Table。我们都知道在 Flink 中,常用的 State 有两种类型,分别是 KeyedState 和 OperatorState。 对于 OperatorState 来说,它只有 Value 这一个属性,用来表示这个 State 具体的值。因此我们可以把 OperatorState 表示为只包含一个 Value 字段的表结构。 对于 KeyedState 来说,每个 State 在不同的 Key 和 Namespace 下的值可能都不一样, 因此我们可以将 KeyedState 表示为一个包含 Key、Namespace、Value 这三个字段的表结构。 当我们抽象出了单个 State 之后,想要表示多个 State 就比较容易了。可以看到在上图的例子中,这个算子包含 3 个 State,分别是两个 KeyedState 和一个 OperatorState,我们只需要将这些 Table 简单的 union 起来,再通过 state_name 字段去区分不同的 State,就可以表示这个算子中所有的 State。 最后还有一个问题,我们如何知道一个任务到底用了哪些 State 或者这些 State 的具体类型呢? 为了解决这个问题,我们定义了一种特殊表 -- StateMeta ,用来表示一个 Flink 任务中所有 State 的元信息。StateMeta 中包含一个任务中每个 State 的名称、State 所在的算子 ID 、算子名称 、Key 的类型和 Value 的类型等等,这样用户直接查询 StateMeta 这个表就能获取任务中所有状态的元信息。 (编辑:梅州站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
站长推荐
热点阅读