spark:如何用另一个rdd的每个分区压缩rdd

b5buobof  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(405)

假设我有一个 RDD[U] 它总是只包含一个分区。我的任务是用另一个rdd的内容填充这个rdd RDD[T] 驻留在n个分区上的。最终输出应该是n个分区 RDD[U] .
我最初想做的是:

val newRDD = firstRDD.zip(secondRDD).map{ case(a, b)  => a.insert(b)}

但我有个错误: Can't zip RDDs with unequal numbers of partitions 我可以在rddapi文档中看到有一个名为 zipPartitions() . 有没有可能,如果有,如何使用这个方法来压缩每个分区 RDD[T] 只有一个 RDD[U] 在上面画一张Map?

rta7y2nd

rta7y2nd1#

这样的方法应该有用:

val zippedFirstRDD = firstRDD.zipWithIndex.map(_.swap)
val zippedSecondRDD = secondRDD.zipWithIndex.map(_.swap)

zippedFirstRDD.join(zippedSecondRDD)
  .map{case (key, (valueU, valueT)) => {
    valueU.insert(valueT)
  }}

相关问题