apachespark—随着表的增长,写入分区配置单元表需要更长的时间

fzwojiic  于 2021-07-15  发布在  Hadoop
关注(0)|答案(2)|浏览(287)

我正在使用spark 2.4.4写入2级分区的外部配置单元表(格式化hdfs上的Parquet):

CREATE EXTERNAL TABLE mytable (<SCHEMA>)
PARTITIONED BY (`field1` STRING, `field2` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
)
STORED AS
  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 'hdfs://nameservice1/user/....

模式相当复杂(许多嵌套数组和结构)。当我插入表格时:

df.write.mode("overwrite").insertInto(myTable)

io所花费的时间随着每项工作的进行而增加。每个作业(一批数据)我写进5-10个不同的 field2 分区(在作业之前是空的)。所以我实际上只是附加数据
从一个空表开始,写入一批数据需要几秒钟(一些gb的数据),现在时间已经增长到30分钟(sparkui显示所有作业都已完成,因此我假设是io阻止了spark应用程序的进度)。在这段时间内绝对没有写日志,无论是在执行器上还是在驱动程序上。
我假设spark会扫描所有现有分区中的每个覆盖操作。。。但我不确定。
我已经准备好了 hive.exec.dynamic.partition=true ,和 spark.sql.sources.partitionOverwriteMode=dynamic . 配置的其余部分是默认的。

bhmjp9jg

bhmjp9jg1#

您可以将dataframe直接保存到分区数据所在的路径中,该路径与 CREATE TABLE Hive声明

df.write.mode("overwrite").partitionBy("col_specified_for_partitioning").parquet("/path/mentioned/in/create/table")

spark.sql("MSCK REPAIR TABLE dbname.tablename")

这将解决您希望删除并重新创建某个分区的数据的情况 MSCK REPAIR TABLE 只是让表知道hdfs路径中的分区。

gmxoilav

gmxoilav2#

尝试

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode(SaveMode.Overwrite).insertInto("table")

您也可以尝试上面提到的@yayati sule方法来写入数据,即直接指定目标目录,如下所示,

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
df.write.mode(SaveMode.Overwrite).format("parquet").partitionBy("field1", "field2").save("hdfs://nameservice1/user/raw/table/<YYYYMMDDHHMMSS>")

您也可以尝试设置session conf,

sparkSession.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

或者,如果这也失败了,试着用老式的方法,然后做 alter table add partition .

df.write.mode(SaveMode.Overwrite).save("hdfs://nameservice1/user/raw/table/field1=val1/field2=val2/")

任何使用hadoop-3.3之前版本和使用hadoop\us3a\u客户端的s3的人都会有一些性能改进。所以升级。

相关问题