在Map器输出大部分排序时最小化无序

wwodge7n  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(255)

我有一个map reduce过程,在这个过程中,Map器从按键排序的文件中获取输入。例如:

1 ...
2 ...
2 ...
3 ...
3 ...
3 ...
4 ...

然后它被转换,99.9%的键彼此保持相同的顺序,其余的99%是接近的。因此,以下可能是对上述数据运行map任务的输出:

a ...
c ...
c ...
d ...
e ...
d ...
e ...

因此,如果您可以确保一个reducer接收一系列输入,并将该reducer放在大多数输入已经位于的同一节点中,那么洗牌将只需要很少的数据传输。例如,假设我对数据进行了分区,这样a-d由一个reducer处理,e-g由下一个reducer处理。然后,如果a-d可以在处理1-4Map的同一节点上运行,那么只需要通过网络发送e的两个记录。
如何构造一个系统来利用数据的这个属性?我有hadoop和spark,不介意编写自定义分区器之类的东西。然而,完整的工作负载是mapreduce的一个经典例子,因此我希望使用一个支持该范例的框架。
hadoop邮件存档提到了这样的优化。是否需要修改框架本身来实现它?

6qftjkof

6qftjkof1#

从spark的Angular 来看,这并没有直接的支持:最接近的是mappartitions,preservepartitions=true。然而,这将不会直接帮助你的情况下,因为钥匙可能不会改变。

/**
   * Return a new RDD by applying a function to each partition of this RDD.
   *
   * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
   * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
   */
  def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
    val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)
    new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
  }

如果您能够确切地知道没有一个键会移动到它们原来的分区之外,那么上面的方法就可以工作了。但边界上的价值观可能不会合作。
与迁移密钥相比,数据的规模有多大?您可以考虑添加一个后处理步骤。首先为所有迁移密钥构造一个分区。Map器将为需要迁移的键输出一个特殊的键值。然后对结果进行后处理,以便对标准分区进行某种附加。这是额外的麻烦,所以您需要在额外的步骤和管道复杂性中评估权衡。

相关问题