scala—仅使用spark sql api时广播变量的用法

flmtquvp  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(464)

在使用spark rddapi时,我们可以使用广播变量来优化spark分配不可变状态的方式。

1) 广播变量如何在内部工作?

我的假设是:对于用于对数据集执行操作的每个闭包,它引用的所有变量都必须序列化、通过网络传输并与任务一起还原,以便可以执行闭包。
注册如下广播变量时:

val broadcastVar = sc.broadcast("hello world")

返回的对象( Broadcast[String] )不保留对实际对象(“hello world”)的引用,而只保留某个id。当广播变量句柄从如上所述的闭包中被引用时,它将被序列化,就像其他所有变量一样-只是广播变量句柄本身不包含实际对象。
稍后在目标节点上执行闭包时,实际对象(“hello world”)已经传输到每个节点。当闭包到达 broadcastVar.value 调用时,广播变量句柄使用id在内部检索实际对象。
这个假设正确吗?

2) 有没有办法在sparksql中利用这种机制?

假设我有一组允许的值。
使用rdd api时,我会为AllowedValue创建一个广播变量:

val broadcastAllowedValues = sc.broadcast(allowedValues) // Broadcast[Set[String]]

rdd.filter(row => broadcastAllowedValues.value.contains(row("mycol")))

当然,在使用sparksqlapi时,我会使用 Column.isin / Column.isInCollection 方法:

dataframe.where(col("mycol").isInCollection(allowedValues))

但我似乎无法通过这种方式获得广播变量的优势。
另外,如果我将这段代码更改为:

val broadcastAllowedValues = sc.broadcast(allowedValues) // Broadcast[Set[String]]

dataframe.where(col("mycol").isInCollection(allowedValues.value))

本部分:

col("mycol").isInCollection(allowedValues.value)
// and more important this part:
allowedValues.value

将已经在驱动程序上进行评估,从而生成一个新的 Column -对象。所以广播变量失去了它的优势。与第一个示例相比,它甚至会有一些开销。。。
有没有一种方法可以利用sparksqlapi的广播变量,或者在这些点上我必须显式地使用rddapi?

3xiyfsfu

3xiyfsfu1#

广播变量如何在内部工作?
广播的数据被序列化并物理地移动到所有执行器。根据有关广播变量的文件,它说
“广播变量允许程序员在每台机器上缓存一个只读变量,而不是将其副本与任务一起发送。”
有没有办法在sparksql中利用这种机制?
是的,有一种方法可以利用。spark在加入大小Dataframe时默认应用广播散列连接。
根据《学习Spark-第二版》一书,它说:
“默认情况下,如果较小的数据集小于10mb,spark将使用广播连接。此配置在中设置 spark.sql.autoBroadcastJoinThreshold ; 根据每个执行器和驱动程序中的内存大小,可以减小或增大大小。“
在您的例子中,您需要将所有惟一的allowedvalue列在一个简单的Dataframe(dataframe)中 allowedeValuesDF )只有一列(称为 allowValues )并应用联接来筛选 dataframe .
像这样:

import org.apache.spark.sql.functions.broadcast
val result = dataframe.join(broadcast(allowedValuesDF), "mycol === allowedValues")

实际上,你可以省去 broadcast 默认情况下,as spark将执行广播连接。
编辑:
在spark的更高版本中,还可以使用sql语法中的连接提示来告诉执行引擎要使用哪些策略。sql文档中提供了详细信息,下面提供了一个示例:

-- Join Hints for broadcast join 
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

相关问题