spark scala seq追加我应该广播变量吗?

qyuhtwio  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(353)

我´我把一个变量定义为一个空变量 Seq() 我在哪里´我将从一个 Map(String, String) rdd公司。
变量声明如下:

var list: Seq[(String, String, String, String, String, String, String, String, String)] = Seq()

然后,迭代rdd元素,在var列表中添加符合某些条件的结果。是这样的:

if (condition) {list :+= ("1","2","3","4","5","6","7","8")}

当我在本地尝试它时,我得到了所需的输出,但是当我在集群中尝试执行时,问题来了,在集群中我得到了空列表。
似乎这在并行计算中不起作用。我应该在附加变量之前广播该变量,还是用 collect() 最后??
谢谢!!

lsmepo6l

lsmepo6l1#

根据spark文档
通常,当传递给spark操作(如map或reduce)的函数在远程集群节点上执行时,它将在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器上,对远程机器上变量的更新不会传播回驱动程序。跨任务支持通用的读写共享变量将是低效的。然而,spark确实为两种常见的使用模式提供了两种有限类型的共享变量:广播变量和累加器。
请看以上文字中突出显示的部分。如前所述,任务/执行者不会将更改传播回驱动程序应用程序。所以你的名单是空的。
在进入解决方案之前。首先,让我们了解广播变量和累加器
广播变量
广播变量允许程序员在每台计算机上缓存一个只读变量,而不是将其副本与任务一起发送。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前反序列化。
这些广播变量是只读变量,您无法更新它们。在您的情况下,您正在更新 list :+= ("1","2","3","4","5","6","7","8") 你不能通过广播来达到这个目的
蓄能器
累加器是只通过结合和交换操作“添加”到的变量,因此可以有效地并行支持。它们可以用来实现计数器(如mapreduce)或求和。
您的任务可以使用累加器来完成。您可以通过扩展累加器v2 api编写自定义累加器,并将列表结构封装在其中,然后您可以在spark上下文中注册累加器 sc.register(objectReference, "MyAccumulator") 注:
蓄能器不会改变Spark的惰性评估模型。如果它们在rdd上的操作中被更新,那么它们的值仅在rdd作为操作的一部分被计算之后才被更新。因此,在像map()这样的延迟转换中进行累加器更新时,不能保证执行累加器更新

相关问题