spark rdd已缓存但未重用

8wigbo56  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(447)

我有一个Dataframe并使用该Dataframe运行多个迭代。

val raw = getDataframe() // <-- the is Stage 3 operation.

val df = raw.repartition(2000, col("id")) // <-- start stage 4
            .dropDuplicates(Seq("id"))
            .persist(StorageLevel.MEMORY_AND_DISK_SER)

// do iterative jobs..
// job2
// job3
// job4
// ...

工作2

在阶段4有一个缓存点(mappartitionsinternal)
因为这是第一次迭代,所以所有的阶段和操作都需要操作。

工作3

已跳过阶段3
但是,第4阶段的前5个蓝框不会被跳过,即使它们是从job2缓存的

问题

Spark只跳过舞台级?但不是操作级任务?
第四阶段的排序可能是 dropDuplicate 操作。我怎么能只丢一次?
威尔 raw.count() 之后 dropDuplicate() 把舞台一分为二 dropDuplicate 会从第二次迭代中跳过吗?

更新1

我在persist之后添加了count()。

val df = raw.repartition(2000, col("id")) // <-- start stage 4
            .dropDuplicates(Seq("id"))
            .persist(StorageLevel.MEMORY_AND_DISK_SER)

df.count()
df.explain(true) <-- plan A

// do iterative jobs..
// job2
// iterativejob.explain(true) <-- plan B

迭代作业示例

迭代作业正在使用sql。差不多吧。

df.createOrReplaceTempView("tb")

    dataframes = dataframes :+ sqlContext.sql(
      s"""
         | SELECT
         |   d_scene_id, d_action_id, d_classifier,
         |   count(*) m_event,
         |   approx_count_distinct(user_key, ${rsd}) m_user,
         |   approx_count_distinct(device_id, ${rsd}) m_device,
         |   3 dimension_count
         | FROM tb
         | GROUP BY d_scene_id, d_action_id, d_classifier
         |""".stripMargin)

再次比较计划

然后比较了两个物理计划。https://github.com/jeesim2/test/pull/1/files?diff=split&w=1
左一个是a的平面图,右一个是b的平面图。
就像你看到的树木 InMemoryRelation(cache) 两边完全一样。

+- InMemoryRelation [event_time#13, scene_id#14, action_id#15, classifier#16, event_hash#0, user_key#3, device_id#11, product#4, country#77, os_name#157, app_ver#117, p0name#197, p0value#597, p1name#237, p1value#637, p2name#277, p2value#677, p3name#317, p3value#717, p4name#357, p4value#757, p5name#397, p5value#797, p6name#437, ... 7 more fields], true, 10000, StorageLevel(disk, memory, 1 replicas)

那么,为什么 SortAggregate(key=[event_hash#0]) (dropduplicates('event\u hash'))每次迭代都重新计算?

wooyq4lh

wooyq4lh1#

在我看来一切正常。由于在不同的列上有大约不同的计数,所以在计划中有两个分类阶段。
缓存发生在对应于列上的重新分区并在同一列上删除重复项的whitestagecodegen之后。exchange对应于将这些分区交换给相应的执行器。
此时,这些新分区被缓存在执行器中,当触发迭代作业2时,将跳过这些阶段。
所以要回答带圆圈的阶段和“这是重新计算”的答案是肯定的,但根据提供的信息,这是应该发生的。

相关问题