我有一个Dataframe并使用该Dataframe运行多个迭代。
val raw = getDataframe() // <-- the is Stage 3 operation.
val df = raw.repartition(2000, col("id")) // <-- start stage 4
.dropDuplicates(Seq("id"))
.persist(StorageLevel.MEMORY_AND_DISK_SER)
// do iterative jobs..
// job2
// job3
// job4
// ...
工作2
在阶段4有一个缓存点(mappartitionsinternal)
因为这是第一次迭代,所以所有的阶段和操作都需要操作。
工作3
已跳过阶段3
但是,第4阶段的前5个蓝框不会被跳过,即使它们是从job2缓存的
问题
Spark只跳过舞台级?但不是操作级任务?
第四阶段的排序可能是 dropDuplicate
操作。我怎么能只丢一次?
威尔 raw.count()
之后 dropDuplicate()
把舞台一分为二 dropDuplicate
会从第二次迭代中跳过吗?
更新1
我在persist之后添加了count()。
val df = raw.repartition(2000, col("id")) // <-- start stage 4
.dropDuplicates(Seq("id"))
.persist(StorageLevel.MEMORY_AND_DISK_SER)
df.count()
df.explain(true) <-- plan A
// do iterative jobs..
// job2
// iterativejob.explain(true) <-- plan B
迭代作业示例
迭代作业正在使用sql。差不多吧。
df.createOrReplaceTempView("tb")
dataframes = dataframes :+ sqlContext.sql(
s"""
| SELECT
| d_scene_id, d_action_id, d_classifier,
| count(*) m_event,
| approx_count_distinct(user_key, ${rsd}) m_user,
| approx_count_distinct(device_id, ${rsd}) m_device,
| 3 dimension_count
| FROM tb
| GROUP BY d_scene_id, d_action_id, d_classifier
|""".stripMargin)
再次比较计划
然后比较了两个物理计划。https://github.com/jeesim2/test/pull/1/files?diff=split&w=1
左一个是a的平面图,右一个是b的平面图。
就像你看到的树木 InMemoryRelation(cache)
两边完全一样。
+- InMemoryRelation [event_time#13, scene_id#14, action_id#15, classifier#16, event_hash#0, user_key#3, device_id#11, product#4, country#77, os_name#157, app_ver#117, p0name#197, p0value#597, p1name#237, p1value#637, p2name#277, p2value#677, p3name#317, p3value#717, p4name#357, p4value#757, p5name#397, p5value#797, p6name#437, ... 7 more fields], true, 10000, StorageLevel(disk, memory, 1 replicas)
那么,为什么 SortAggregate(key=[event_hash#0])
(dropduplicates('event\u hash'))每次迭代都重新计算?
1条答案
按热度按时间wooyq4lh1#
在我看来一切正常。由于在不同的列上有大约不同的计数,所以在计划中有两个分类阶段。
缓存发生在对应于列上的重新分区并在同一列上删除重复项的whitestagecodegen之后。exchange对应于将这些分区交换给相应的执行器。
此时,这些新分区被缓存在执行器中,当触发迭代作业2时,将跳过这些阶段。
所以要回答带圆圈的阶段和“这是重新计算”的答案是肯定的,但根据提供的信息,这是应该发生的。