spark saveastable在append模式下导致内存不足

g2ieeal7  于 2021-05-26  发布在  Spark
关注(0)|答案(0)|浏览(336)

以下是配置:
spark版本:2.4.3
hadoop版本:2.7.7
java版本:1.8
spark以独立模式运行(群集管理器)。
元存储在本地模式下运行。
postgres充当metastore的db。
当我们试图在append模式下通过saveastable将sparkDataframe附加到现有spark表时,会遇到outofmemory。
以下是驱动程序应用中使用的spark配置:

-Xms526m -Xmx4096m 
spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/process_heap"
spark.executor.memory=5g 
spark.executor.cores=1 
spark.cores.max=4 
spark.scheduler.mode=FAIR 
spark.app.name=DUMMY_NAME 
spark.master=spark://DUMMY_HOST1:7077,DUMMY_HOST2:7077 
spark.authenticate=true 
spark.authenticate.secret=ABCDEF
spark.auth.user=ANY_USER 
spark.auth.password=ANY_PASSWORD 
spark.eventLog.enabled=true 
spark.eventLog.compress=true 
spark.shuffle.service.enabled=true 
spark.shuffle.unsafe.file.output.buffer=5m 
spark.io.compression.lz4.blockSize=512k 
spark.unsafe.sorter.spill.reader.buffer.size=5m 
spark.shuffle.file.buffer=1m 
spark.rdd.compress=true 
spark.storage.memoryMapThreshold=10m 
spark.driver.host=DUMMY_DRIVER_HOST
spark.hadoop.javax.jdo.option.ConnectionURL=jdbc:postgresql://POSTGRES_HOST:1234/metastore 
spark.hadoop.javax.jdo.option.ConnectionDriverName=org.postgresql.Driver 
spark.hadoop.javax.jdo.option.ConnectionUserName=POSTGRES_USER 
spark.hadoop.javax.jdo.option.ConnectionPassword=POSTGRES_PASSWORD 
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict 
spark.hadoop.hive.exec.dynamic.partition=true 
spark.hadoop.hive.metastore.local=true 
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 
spark.hadoop.outputCommitCoordination.enabled=false 
spark.hadoop.dfs.client.block.write.replace.datanode-on-failure.enable=true 
spark.hadoop.dfs.client.block.write.replace.datanode-on-failure.policy=ALWAYS 
spark.hadoop.dfs.client.block.write.replace.datanode-on-failure.best-effort=true 
spark.hadoop.dfs.replication=2
spark.hadoop.fs.defaultFS=hdfs://HDFS_CLUSTER_NAME
spark.sql.warehouse.dir=hdfs://HDFS_CLUSTER_NAME/spark-warehouse 
spark.hadoop.dfs.nameservices=HDFS_CLUSTER_NAME
spark.hadoop.dfs.ha.namenodes.HDFS_CLUSTER_NAME=nn1,nn2
spark.hadoop.dfs.client.failover.proxy.provider.HDFS_CLUSTER_NAME=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider 
spark.eventLog.dir=hdfs://HDFS_CLUSTER_NAME/eventLog 
spark.hadoop.dfs.namenode.rpc-address.HDFS_CLUSTER_NAME.nn1=HOST1:8020 
spark.hadoop.dfs.namenode.rpc-address.HDFS_CLUSTER_NAME.nn2=HOST2:8020

在分析堆转储时,我们可以查看saveastable操作为整个表带来的所有分区及其对应的fieldschema。下面是通过mat(eclipse)完成的堆转储屏幕截图分析。
spark驱动程序堆转储
支配树分析
我们还可以从上面的屏幕截图中看到,为4000个分区创建了2500万个fieldschema对象,每个分区由大约6800个字段组成。
要保存表的代码段:

sparkSession.read()
            .option("basePath", basePath)
            .option("mergeSchema", "true")
            .parquet(partitionPaths.toArray(new String[0]))
            .withColumn(
                Constants.HOUR_COL,
                functions.to_timestamp(
                        date_format(
                                col(Constants.DATE_COL),
                                "yyyy-MM-dd-HH"
                        ),
                        "yyyy-MM-dd-HH"
        )).write().partitionBy(
                Constants.COL1,
                Constants.COL2,
                Constants.HOUR_COL
        ).mode(SaveMode.Append).format("parquet").saveAsTable("DUMMY_TABLE_NAME");

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题