使用apachespark写入hdfs时的输出序列

vkc1a9a2  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(313)

我在apachespark中从事一个项目,要求将spark处理后的输出写入一个特定的格式,如 Header -> Data -> Trailer . 我使用 .saveAsHadoopFile 方法,并使用键作为文件名将数据写入多个文件。但问题是数据的顺序没有被维护文件都写进去了 Data->Header->Trailer 或者三者的不同组合。rdd转换有什么我遗漏的吗?

ojsjcaue

ojsjcaue1#

好吧,在阅读了stackoverflow的问题、博客和谷歌的邮件档案之后。我发现了 .union() 以及其他转换的工作原理和分区的管理方式。当我们使用 .union() 结果rdd和顺序丢失了分区信息,这就是我的输出序列没有得到维护的原因。
为了克服这个问题,我做了如下的工作
页眉=1,正文=2,页脚=3
所以使用 sortBy 在三者的并集rdd上,我用一个分区的顺序号对它进行排序。在那之后,为了使用key作为文件名写入多个文件,我使用了hashpartitioner,以便相同的key数据应该进入不同的文件中。

val header: RDD[(String,(String,Int))] = ... // this is my header RDD`
val data: RDD[(String,(String,Int))] = ... // this is my data RDD
val footer: RDD[(String,(String,Int))] = ... // this is my footer RDD

val finalRDD: [(String,String)] = header.union(data).union(footer).sortBy(x=>x._2._2,true,1).map(x => (x._1,x._2._1))

val output: RDD[(String,String)] = new PairRDDFunctions[String,String](finalRDD).partitionBy(new HashPartitioner(num))

output.saveAsHadoopFile    ... // and using MultipleTextOutputFormat save to multiple file using key as filename

这可能不是最终的或最经济的解决方案,但它起了作用。我还试图找到其他方法来保持输出的顺序 Header->Body->Footer . 我也试过了 .coalesce(1) 在所有三个rdd上,然后执行并集,但这只是在rdd和 .sortBy 函数还获取分区信息,我认为这是相同的,但是首先合并RDD也起作用。如果有人有其他的方法,请让我知道,或添加更多的这将是真的很有帮助,因为我是新的Spark

参考文献:

通过键spark写入多个输出-一个spark作业
spark RDD上的有序并集
http://apache-spark-user-list.1001560.n3.nabble.com/union-of-2-rdd-s-only-returns-the-first-one-td766.html --这个很有帮助

相关问题