如何优化spark作业,将s3文件处理到配置单元Parquet表中

fcg9iug3  于 2021-06-27  发布在  Hive
关注(0)|答案(1)|浏览(268)

我是新的Spark分布式开发。我正在尝试优化我现有的Spark工作,需要1小时才能完成。
基础设施:
emr[10个r4.8XL示例(32核,244gb)]
源数据:s3中的1000个.gz文件(每个约30mb)
spark执行参数[executors:300,executor memory:6gb,cores:1]
通常,spark作业执行以下操作:

private def processLines(lines: RDD[String]): DataFrame = {
    val updatedLines = lines.mapPartitions(row => ...)
    spark.createDataFrame(updatedLines, schema)
}

// Read S3 files and repartition() and cache()
val lines: RDD[String] = spark.sparkContext
    .textFile(pathToFiles, numFiles) 
    .repartition(2 * numFiles) // double the parallelism
    .cache()

val numRawLines = lines.count()

// Custom process each line and cache table
val convertedLines: DataFrame = processLines(lines)
convertedRows.createOrReplaceTempView("temp_tbl")
spark.sqlContext.cacheTable("temp_tbl")
val numRows = spark.sql("select count(*) from temp_tbl").collect().head().getLong(0)

// Select a subset of the data
val myDataFrame = spark.sql("select a, b, c from temp_tbl where field = 'xxx' ")

// Define # of parquet files to write using coalesce
val numParquetFiles = numRows / 1000000
var lessParts = myDataFrame.rdd.coalesce(numParquetFiles)
var lessPartsDataFrame = spark.sqlContext.createDataFrame(lessParts, myDataFrame.schema)
lessPartsDataFrame.createOrReplaceTempView('my_view')

// Insert data from view into Hive parquet table
spark.sql("insert overwrite destination_tbl 
           select * from my_view")    
lines.unpersist()

应用程序读取所有s3文件=>重新分区为文件量的两倍=>缓存rdd=>自定义进程每行=>创建临时视图/缓存表=>计算行数=>选择数据子集=>减少分区数=>创建数据子集视图=>使用视图插入到配置单元目标表=>取消持久化rdd。
我不知道为什么要花很长时间来执行。spark执行参数是否设置不正确,或者是否有错误的调用?

cngwdvgl

cngwdvgl1#

在查看度量之前,我将尝试对您的代码进行以下更改。

private def processLines(lines: DataFrame): DataFrame = {
  lines.mapPartitions(row => ...)
}

val convertedLinesDf = spark.read.text(pathToFiles)
    .filter("field = 'xxx'")
    .cache()

val numLines = convertedLinesDf.count() //dataset get in memory here, it takes time        
// Select a subset of the data, but it will be fast if you have enough memory
// Just use Dataframe API
val myDataFrame = convertedLinesDf.transform(processLines).select("a","b","c")

//coalesce here without converting to RDD, experiment what best
myDataFrame.coalesce(<desired_output_files_number>)
  .write.option(SaveMode.Overwrite)
  .saveAsTable("destination_tbl")

如果不计算行数,缓存是无用的。它会占用一些内存并增加一些gc压力
缓存表可能会消耗更多内存并增加更多gc压力
将Dataframe转换为rdd代价高昂,因为这意味着ser/deser操作
不知道你想做什么: val numParquetFiles = numRows / 1000000 重新划分( 2 * numFiles ). 在您的设置中,1000个30mb的文件将为您提供1000个分区。像这样就可以了。调用重分区和合并可能会触发代价高昂的洗牌操作(合并可能不会触发洗牌)
告诉我你有什么改进!

相关问题