pyspark在单节点和双节点集群(无序数据)上的连接数据性能

tsm1rwdh  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(341)

我有一个脚本片段,我在PySpark2.4的不同集群设置中运行

v1 = spark.read.parquet(os.path.join(v1_prefix, 'df1.parquet'))
v2 = spark.read.parquet(os.path.join(v2_prefix, 'df2.parquet'))

out = v1.join(v2, [v1.Id == v2.Id, v1.Year == v2.Year, v1.Month == v2.Month])

for x in v1.columns:
    tmp = out.select(v1[x].alias(x + '_old'), v2[x].alias(x + '_new')).filter('{}_old != {}_new'.format(x,x ))
    if tmp.count() > 0:
        tmp.show()

这两个Dataframe都有200多个列和150万条记录,因此out dataframe有400多个列相互比较以确定是否存在差异。
单节点群集需要4-8分钟
2节点群集大约需要50分钟
我假设在2节点集群中,数据在不同的执行器上被分区并被洗牌,这会降低性能。
如何改进Dataframe,使其均匀分布,并至少以与使用spark 2.4在单节点上运行时相同的性能运行?

3zwjbxry

3zwjbxry1#

广播连接应该会有帮助。

import pyspark.sql.functions as F
out = v1.join(F.broadcast(v2), [v1.Id == v2.Id, v1.Year == v2.Year, v1.Month == v2.Month])

只有150万行的Dataframe应该足够小,可以广播。应该广播较小的Dataframe(在您的示例中,两者的大小似乎大致相同)。
尝试重构 out.select(v1[x].alias(x + '_old'), v2[x].alias(x + '_new')) 在这里和这里概述的设计模式。通常不希望在Dataframe中的所有列上循环。
欢迎来到Pypark世界;)

相关问题