以下是配置:
spark版本:2.4.3
hadoop版本:2.7.7
java版本:1.8
spark以独立模式运行(群集管理器)。
元存储在本地模式下运行。
postgres充当metastore的db。
当我们试图在append模式下通过saveastable将sparkDataframe附加到现有spark表时,会遇到outofmemory。
以下是驱动程序应用中使用的spark配置:
-Xms526m -Xmx4096m
spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/process_heap"
spark.executor.memory=5g
spark.executor.cores=1
spark.cores.max=4
spark.scheduler.mode=FAIR
spark.app.name=DUMMY_NAME
spark.master=spark://DUMMY_HOST1:7077,DUMMY_HOST2:7077
spark.authenticate=true
spark.authenticate.secret=ABCDEF
spark.auth.user=ANY_USER
spark.auth.password=ANY_PASSWORD
spark.eventLog.enabled=true
spark.eventLog.compress=true
spark.shuffle.service.enabled=true
spark.shuffle.unsafe.file.output.buffer=5m
spark.io.compression.lz4.blockSize=512k
spark.unsafe.sorter.spill.reader.buffer.size=5m
spark.shuffle.file.buffer=1m
spark.rdd.compress=true
spark.storage.memoryMapThreshold=10m
spark.driver.host=DUMMY_DRIVER_HOST
spark.hadoop.javax.jdo.option.ConnectionURL=jdbc:postgresql://POSTGRES_HOST:1234/metastore
spark.hadoop.javax.jdo.option.ConnectionDriverName=org.postgresql.Driver
spark.hadoop.javax.jdo.option.ConnectionUserName=POSTGRES_USER
spark.hadoop.javax.jdo.option.ConnectionPassword=POSTGRES_PASSWORD
spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict
spark.hadoop.hive.exec.dynamic.partition=true
spark.hadoop.hive.metastore.local=true
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
spark.hadoop.outputCommitCoordination.enabled=false
spark.hadoop.dfs.client.block.write.replace.datanode-on-failure.enable=true
spark.hadoop.dfs.client.block.write.replace.datanode-on-failure.policy=ALWAYS
spark.hadoop.dfs.client.block.write.replace.datanode-on-failure.best-effort=true
spark.hadoop.dfs.replication=2
spark.hadoop.fs.defaultFS=hdfs://HDFS_CLUSTER_NAME
spark.sql.warehouse.dir=hdfs://HDFS_CLUSTER_NAME/spark-warehouse
spark.hadoop.dfs.nameservices=HDFS_CLUSTER_NAME
spark.hadoop.dfs.ha.namenodes.HDFS_CLUSTER_NAME=nn1,nn2
spark.hadoop.dfs.client.failover.proxy.provider.HDFS_CLUSTER_NAME=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
spark.eventLog.dir=hdfs://HDFS_CLUSTER_NAME/eventLog
spark.hadoop.dfs.namenode.rpc-address.HDFS_CLUSTER_NAME.nn1=HOST1:8020
spark.hadoop.dfs.namenode.rpc-address.HDFS_CLUSTER_NAME.nn2=HOST2:8020
在分析堆转储时,我们可以查看saveastable操作为整个表带来的所有分区及其对应的fieldschema。下面是通过mat(eclipse)完成的堆转储屏幕截图分析。
spark驱动程序堆转储
支配树分析
我们还可以从上面的屏幕截图中看到,为4000个分区创建了2500万个fieldschema对象,每个分区由大约6800个字段组成。
要保存表的代码段:
sparkSession.read()
.option("basePath", basePath)
.option("mergeSchema", "true")
.parquet(partitionPaths.toArray(new String[0]))
.withColumn(
Constants.HOUR_COL,
functions.to_timestamp(
date_format(
col(Constants.DATE_COL),
"yyyy-MM-dd-HH"
),
"yyyy-MM-dd-HH"
)).write().partitionBy(
Constants.COL1,
Constants.COL2,
Constants.HOUR_COL
).mode(SaveMode.Append).format("parquet").saveAsTable("DUMMY_TABLE_NAME");
暂无答案!
目前还没有任何答案,快来回答吧!