val header: RDD[(String,(String,Int))] = ... // this is my header RDD`
val data: RDD[(String,(String,Int))] = ... // this is my data RDD
val footer: RDD[(String,(String,Int))] = ... // this is my footer RDD
val finalRDD: [(String,String)] = header.union(data).union(footer).sortBy(x=>x._2._2,true,1).map(x => (x._1,x._2._1))
val output: RDD[(String,String)] = new PairRDDFunctions[String,String](finalRDD).partitionBy(new HashPartitioner(num))
output.saveAsHadoopFile ... // and using MultipleTextOutputFormat save to multiple file using key as filename
1条答案
按热度按时间ojsjcaue1#
好吧,在阅读了stackoverflow的问题、博客和谷歌的邮件档案之后。我发现了
.union()
以及其他转换的工作原理和分区的管理方式。当我们使用.union()
结果rdd和顺序丢失了分区信息,这就是我的输出序列没有得到维护的原因。为了克服这个问题,我做了如下的工作
页眉=1,正文=2,页脚=3
所以使用
sortBy
在三者的并集rdd上,我用一个分区的顺序号对它进行排序。在那之后,为了使用key作为文件名写入多个文件,我使用了hashpartitioner,以便相同的key数据应该进入不同的文件中。这可能不是最终的或最经济的解决方案,但它起了作用。我还试图找到其他方法来保持输出的顺序
Header->Body->Footer
. 我也试过了.coalesce(1)
在所有三个rdd上,然后执行并集,但这只是在rdd和.sortBy
函数还获取分区信息,我认为这是相同的,但是首先合并RDD也起作用。如果有人有其他的方法,请让我知道,或添加更多的这将是真的很有帮助,因为我是新的Spark参考文献:
通过键spark写入多个输出-一个spark作业
spark RDD上的有序并集
http://apache-spark-user-list.1001560.n3.nabble.com/union-of-2-rdd-s-only-returns-the-first-one-td766.html --这个很有帮助