import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.spark.sql.SparkSession
object APP{
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
def main(args: Array[String]): Unit = {
val url = "jdbc:sqlserver://dc-bir-cdb01;databaseName=dbapp;integratedSecurity=true";
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
val BusinessDate = "2019-02-28"
val destination = "src/main/resources/out/"
val filename = s"Example@$BusinessDate.csv.gz"
val outputFileName = destination + "/temp_" + filename
val mergedFileName = destination + "/merged_" + filename
val mergeFindGlob = outputFileName
val spark = SparkSession.
builder.master("local[*]")
//.config("spark.debug.maxToStringFields", "100")
.appName("Application Big Data")
.getOrCreate()
val query = s"""(SELECT a,b,c From table') tmp """.stripMargin
val responseWithSelectedColumns = spark
.read
.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("dbtable", query)
.load()
print("TOTAL: "+responseWithSelectedColumns.count())
responseWithSelectedColumns
.coalesce(1) //So just a single part- file will be created
.repartition(10)
.write.mode("overwrite")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.format("com.databricks.spark.csv")
.option("charset", "UTF8")
.option("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") //Avoid creating of crc files
.option("header", "true") //Write the header
.save(outputFileName)
merge(mergeFindGlob, mergedFileName)
responseWithSelectedColumns.unpersist()
spark.stop()
}
}
字符串
上面的代码生成了一个具有多个头的文件。
我应该如何修改代码,使文件中只有一个头?
1条答案
按热度按时间guz6ccqo1#
基本上,您正在尝试为所有文件生成只有一个头的csv文件。一个简单的解决方案是使用
coalesce(1)
并删除您引入的repartition(10)
。这样做的问题是所有数据都到一个分区。它可能非常慢,或者更糟,抛出OOM错误。然而(如果它工作)您得到一个只有一个头的大文件。为了继续利用spark的并行性a,您可以像这样单独编写header(假设我们有一个
df
)字符串
还要注意spark 2.x支持写csv,这是我用来代替databricks的库的,这使得事情稍微冗长一些。