我有一个脚本片段,我在PySpark2.4的不同集群设置中运行
v1 = spark.read.parquet(os.path.join(v1_prefix, 'df1.parquet'))
v2 = spark.read.parquet(os.path.join(v2_prefix, 'df2.parquet'))
out = v1.join(v2, [v1.Id == v2.Id, v1.Year == v2.Year, v1.Month == v2.Month])
for x in v1.columns:
tmp = out.select(v1[x].alias(x + '_old'), v2[x].alias(x + '_new')).filter('{}_old != {}_new'.format(x,x ))
if tmp.count() > 0:
tmp.show()
这两个Dataframe都有200多个列和150万条记录,因此out dataframe有400多个列相互比较以确定是否存在差异。
单节点群集需要4-8分钟
2节点群集大约需要50分钟
我假设在2节点集群中,数据在不同的执行器上被分区并被洗牌,这会降低性能。
如何改进Dataframe,使其均匀分布,并至少以与使用spark 2.4在单节点上运行时相同的性能运行?
1条答案
按热度按时间3zwjbxry1#
广播连接应该会有帮助。
只有150万行的Dataframe应该足够小,可以广播。应该广播较小的Dataframe(在您的示例中,两者的大小似乎大致相同)。
尝试重构
out.select(v1[x].alias(x + '_old'), v2[x].alias(x + '_new'))
在这里和这里概述的设计模式。通常不希望在Dataframe中的所有列上循环。欢迎来到Pypark世界;)