Spark SQL广播哈希连接

piztneat  于 6个月前  发布在  Apache
关注(0)|答案(3)|浏览(60)

我正在尝试使用SparkSQL在多个框架上执行广播哈希连接,如此处所示。
在该示例中,(小)DataFrame通过saveAsTable持久化,然后通过spark SQL(即通过sqlContext.sql("...")))进行连接
我遇到的问题是,我需要使用sparkSQL API来构造我的SQL(我用ID列表连接了大约50个表,并且不想手工编写SQL)。
如何通过API告诉spark使用广播哈希连接?问题是,如果我将ID列表(从通过saveAsTable持久化的表中)加载到DataFrame中以用于连接,我不清楚Spark是否可以应用广播哈希连接。

zsbz8rwp

zsbz8rwp1#

您可以使用broadcast函数显式地将DataFrame标记为足够小以进行广播:

Python

from pyspark.sql.functions import broadcast

small_df = ...
large_df = ...

large_df.join(broadcast(small_df), ["foo"])

字符串
或广播提示(Spark >= 2.2):

large_df.join(small_df.hint("broadcast"), ["foo"])

Scala

import org.apache.spark.sql.functions.broadcast

val smallDF: DataFrame = ???
val largeDF: DataFrame = ???

largeDF.join(broadcast(smallDF), Seq("foo"))


或广播提示(Spark >= 2.2):

largeDF.join(smallDF.hint("broadcast"), Seq("foo"))

SQL

您可以使用提示(Spark >= 2.2):

SELECT /*+ MAPJOIN(small) */ * 
FROM large JOIN small
ON large.foo = small.foo


SELECT /*+  BROADCASTJOIN(small) */ * 
FROM large JOIN small
ON large.foo = small.foo


SELECT /*+ BROADCAST(small) */ * 
FROM large JOIN small
ON larger.foo = small.foo

R(SparkR):

对于hint(Spark >= 2.2):

join(large, hint(small, "broadcast"), large$foo == small$foo)


使用broadcast(Spark >= 2.3)

join(large, broadcast(small), large$foo == small$foo)

备注

如果其中一个结构相对较小,则广播连接非常有用。否则,它的开销可能比完全 Shuffle 大得多。

mum43rcc

mum43rcc2#

jon_rdd = sqlContext.sql( "select * from people_in_india  p
                            join states s
                            on p.state = s.name")

jon_rdd.toDebugString() / join_rdd.explain() :

字符串

shuffledHashJoin:

印度的所有数据将被混洗到每个州的29个键中。2问题:不均匀的分片。3有限的并行性与29个输出分区。

broadcasteHashJoin

将小RDD广播到所有工作节点。大RDD的并行性仍然保持,甚至不需要shuffle。

PS:图片可能很丑,但信息丰富。

umuewwlo

umuewwlo3#

对于广播连接,连接等式的一边被具体化并发送给所有Map器,因此它被认为是Map端连接。
当数据集被具体化并通过网络发送时,它只会带来显著的性能改进,如果它相当小的话。
因此,如果您尝试执行smallDF.join(largeDF)
等等!!!另一个限制是它还需要完全适合每个执行器的内存。它还需要适合驱动程序的内存!**
广播变量使用Torrent协议(即对等协议)在执行器之间共享,Torrent协议的优点是对等体彼此共享文件的块,而不依赖于持有所有块的中央实体。
上面提到的例子足以开始播放广播加入。

注意: 创建后不能修改值,如果尝试修改,只会在一个节点(&node)上修改*

相关问题