emr6.x上s3a的s3guard和parquet魔术委员会

piah890a  于 2021-05-17  发布在  Spark
关注(0)|答案(2)|浏览(677)

我们使用的是cdh5.13和spark2.3.0以及s3guard。在使用相同的资源在emr5.x/6.x上运行相同的作业之后,我们的性能下降了5-20倍。根据https://docs.aws.amazon.com/emr/latest/releaseguide/emr-spark-committer-reqs.html 默认提交程序(从5.20开始)不适合s3a。我们测试了emr-5.15.1,得到了与hadoop相同的结果。
如果我试着使用魔法的话

py4j.protocol.Py4JJavaError: An error occurred while calling o72.save.
: java.lang.ClassNotFoundException: org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

我的代码是(+我是通过emr config配置的):

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext, SparkSession
from pyspark.sql.functions import *

sconf = SparkConf()
sconf.set("spark.hadoop.fs.s3a.committer.name", "magic")
sconf.set("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
sconf.set("spark.sql.sources.commitProtocolClass", "com.hortonworks.spark.cloud.commit.PathOutputCommitProtocol")
sconf.set("spark.sql.parquet.output.committer.class", "org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter")
sconf.set("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
sconf.set("spark.hadoop.fs.s3a.commiter.staging.conflict-mode", "replace")

sc = SparkContext(appName="s3acommitter", conf = sconf)

spark = SparkSession(sc)
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()

sourceDF = spark.range(0, 10000)
datasets = "s3a://parquet/commiter-test"
sourceDF.write.format("parquet").save(datasets + "parquet")
sc.stop()

在https://repo.hortonworks.com/content/repositories/releases/org/apache/spark/spark-hadoop-cloud_2.11/ 我找不到spark2.4.4和hadoop3.2.1的jar
如何在emr上启用magic Committer?
Spark日志:

20/11/25 21:49:38 INFO ParquetFileFormat: Using user defined output committer for Parquet: com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
20/11/25 21:49:38 WARN ParquetOutputFormat: Setting parquet.enable.summary-metadata is deprecated, please use parquet.summary.metadata.level
20/11/25 21:49:38 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
20/11/25 21:49:38 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
20/11/25 21:49:38 INFO SQLHadoopMapReduceCommitProtocol: Using user defined output committer class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
20/11/25 21:49:38 INFO EmrOptimizedParquetOutputCommitter: EMR Optimized committer is not supported by this filesystem (org.apache.hadoop.fs.s3a.S3AFileSystem)
20/11/25 21:49:38 INFO EmrOptimizedParquetOutputCommitter: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
20/11/25 21:49:38 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
62o28rlo

62o28rlo1#

我相信,在这次发布之后,amazons3更新——强大的先读后写一致性——大多数人都会在他们的环境中放弃s3guard。
但是,我们改进了工作的整体性能(在emr上使用s3a)45分钟-2小时=>2分钟:
我们在emr配置中添加了:
fs.s3a.impl org.apache.hadoop.fs.s3a.s3a=>com.amazon.ws.emr.hadoop.fs.emrfilesystem
{“classification”:“core site”,“properties”:{“fs.s3a.impl”:“com.amazon.ws.emr.hadoop.fs.emrfilesystem”},“configurations”:[]}
禁用parquet.enable.summary-metadata
{“parquet.enable.summary metadata”,“false”}
cloudera通过云存储提高spark性能:
当将parquet.enable.summary-metadata设置为true时,我们可以看到,一旦写入操作终止,驱动程序将对所有写入的parquet文件页脚执行顺序扫描,以便检索相关架构并在写入操作指定的目录上创建摘要文件\u common \u元数据。这是一个单线程操作,需要很多时间。特别是生成的Parquet文件越多,需要检查的文件就越多,因此所需的时间也就越多。
主要的一点是,Parquet地板摘要文件现在不是特别有用,因为:
当模式合并被禁用时,我们假设所有Parquet部件文件的模式都是相同的,因此我们可以从任何部件文件中读取页脚。
当模式合并被启用时,我们仍然需要读取所有文件的页脚来进行合并。
以下是在最近的spark版本中,默认情况下禁用编写Parquet地板摘要文件的一些原因:
https://issues.apache.org/jira/browse/spark-15719

kqqjbcuj

kqqjbcuj2#

由于spark(在spark hadoop云中)中的一些绑定类不在类路径上,您将得到类未找到错误。即使他们是,你也会被挡在下一个障碍上:那些提交者不在emr中。
amazonemr有自己版本的s3a提交器:使用emrfs3优化的提交器

相关问题