如何在pyspark中高效地连接一个非常大的表和一个很大的表

hkmswyz6  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(459)

我有两张table。这两个表都是配置单元中以Parquet数据格式存储的外部表。
从2015年开始,第一个表1每天有2.5亿行。此表根据创建日期进行分区。因此,对于每个创建日期,大约有2.5亿行。
第二个表-表2是每日增量表,平均行数约为150万行。
两个表中都有一个公共列“lookup\u id”。现在我需要使用Dataframe从表1中获取表2中增量数据的所有列。
我想做下面这样的事情

table_1=spark.table("table_1")
table_2=spark.table("table_2")
result_df=table_1.join(table_2, table_1.lookup_id=table_2.lookup_id, "inner").drop(table_2.lookup_id)

但我怀疑这是否真的有效,以及pyspark是否能够在没有任何内存错误的情况下处理这个问题。
问题1:如何基于create\u date分区并行化表\u 1扫描?
问题2:有没有其他方法可以基于表2中的查找ID和/或分区来优化表1扫描?
更多信息,让我更清楚地寻找什么:
我试图理解当我们使用dataframes连接表时,spark是否读取数据并将其保存在内存中并连接它们,或者它只是在读取自身时进行连接。如果第二个是真的,那么第二个语句适用于哪些连接呢。另外,如果有任何需要使用循环,以避免任何内存错误。

kzmpq1sx

kzmpq1sx1#

当你读到csv时。。它将被自动分区并进行并行处理。。基于默认配置(以防我们不更改任何配置)
一个具体的答案是…如果你有一个30gb的未压缩文本文件存储在hdfs上,那么默认的hdfs块大小设置(128mb)它将存储在235个块中,这意味着你从这个文件读取的rdd将有235个分区。
现在,这里有两件事1。像csv和2这样的平面文件。压缩文件,如Parquet地板
当你有一个文本文件…当spark从hdfs读取一个文件时,它会为一个输入分割创建一个分区。输入分割由用于读取此文件的hadoop inputformat设置。例如,如果您使用textfile(),它将是hadoop中的textinputformat,它将为单个hdf块返回单个分区(但分区之间的拆分将在行拆分中完成,而不是完全的块拆分),除非您有一个压缩的文本文件。
对于Parquet或压缩文件:在压缩文件的情况下,你会得到一个单一的分区为一个单一的文件(因为压缩文本文件是不可分割的)。
现在,当您使用parquet时,这已经是很好的分区了,在进行优化时,您可以检查集群大小,并查看发生了多少分区等。
那么,回答:问题1:如何基于create\u date分区并行化表\u 1扫描?这已经分区了
对于,问题2:是否有其他方法可以基于表2中的查找ID和/或分区来优化表1扫描?
您可以尝试过滤不必要的记录,这个概念在spark sql查询中称为spark predicate 下推,因此即使在将数据加载到内存之前,spark也会过滤掉不必要的列。。这里有更多
spark predicate 下推到数据库允许更好地优化spark查询。 predicate 是查询上返回true或false的条件,通常位于where子句中。 predicate 下推过滤数据库查询中的数据,减少从数据库检索的条目数并提高查询性能。默认情况下,spark数据集api会自动将有效的where子句下推到数据库中。

eivnm1vs

eivnm1vs2#

不确定您的驱动程序和执行程序内存,但通常有两种可能的连接优化—将小表广播给所有执行程序,并对两个Dataframe使用相同的分区键。在您的例子中,如果表2太大而无法广播,那么基于查找id的重新分区将使其更快。但赔偿有其自身的成本。你可以在这里找到更多-https://umbertogriffo.gitbook.io/apache-spark-best-practices-and-tuning/avoiding_shuffle_less_stage-_more_fast#:~:text=one%20way%20to%20avoid%20shufflies,然后%20broadcast%20to%20every%20executor。
让我知道你的想法。期待就此主题展开讨论。
如果你不能广播,一个使用bucketing避免连接的例子-从这里得到启发:spark:在连接两个相同分区的Dataframe时防止shuffle/exchange

spark.catalog.setCurrentDatabase(<your databasename>)
test1.write.mode('overwrite').bucketBy(100,'item').saveAsTable('table_item')
test2.write.mode('overwrite').bucketBy(100,'item').saveAsTable('table_item1')

# test1.

# %%

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) # this is just to disable auto broadcasting for testing
import pyspark.sql.functions as F
inputDf1 = spark.sql("select * from table_item")
inputDf2 = spark.sql("select * from table_item1")
inputDf3 = inputDf1.alias("df1").join(inputDf2.alias("df2"),on='item')

现在试试看

inputDf3.explain()

结果如下:

== Physical Plan ==

* (3) Project [item#1033, col1#1030, col2#1031, col3#1032, id#1038]

+- *(3) SortMergeJoin [item#1033], [item#1039], Inner
   :- *(1) Sort [item#1033 ASC NULLS FIRST], false, 0
   :  +- *(1) Project [col1#1030, col2#1031, col3#1032, item#1033]
   :     +- *(1) Filter isnotnull(item#1033)
   :        +- *(1) FileScan parquet 
   +- *(2) Sort [item#1039 ASC NULLS FIRST], false, 0
      +- *(2) Project [id#1038, item#1039]
         +- *(2) Filter isnotnull(item#1039)
            +- *(2) FileScan parquet

如您所见,这里没有发生交换。所以试着把你的两个数据框都扣起来,试着加入。

相关问题