mapgroupswithstate的spark结构化流状态存储在哪里?

wmtdaxz3  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(317)

我知道状态作为状态存储保存在检查点位置。但我不知道当它还在记忆中的时候,它存储在哪里?
我创建了一个使用mapgroupswithstate的流作业,但是我看到执行器使用的存储内存是0。
这是否意味着状态存储在执行内存中?我不知道这个状态消耗了多少内存。不知道如何知道是否需要增加执行器内存!
另外,是否可以完全避免状态检查点并将状态始终保留在内存中?

xxls0lw8

xxls0lw81#

由于mapgroupswithstate是一个聚合,因此它将存储在spark应用程序生命周期内所有聚合的位置:在执行内存中(正如您已经假设的那样)。
查看方法的签名

def mapGroupsWithState[S: Encoder, U: Encoder](
      func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

你会注意到的 S 是用户定义状态的类型。这就是管理国家的地方。
因为这将被发送到执行器,所以它必须是可编码的,以触发sql类型。因此,您通常使用scala中的case类或java中的bean。这个 GroupState 是一个类型化 Package 器对象,提供访问和管理状态值的方法。
作为开发人员,您还必须注意如何从这种状态中删除数据,这一点至关重要。否则,你的国家将不可避免地造成一个oom,因为它只会增长,永远不会缩小。
如果在结构化流中不启用检查点,则不会存储任何内容,但在出现故障时会失去状态。如果您启用了检查点,例如为了跟踪输入源,spark还会将当前状态存储到检查点位置。

9lowa7mx

9lowa7mx2#

如果启用检查点,则状态存储在状态存储中。默认情况下,它是一个hdfsbackedstatestore,但也可以重写它。好好读一读https://medium.com/@polarpersonal/state-存储-in-spark-structured-streaming-e5c8af7bf509
正如前面提到的另一个答案,如果您不启用检查点,那么您将失去容错性和至少一次保证。

相关问题