使用spark多次写入hadoop分布式文件系统

vqlkdk9b  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(424)

我创建了一个spark作业,每天从hdfs读取一个文本文件,并从文本文件的每一行提取唯一的键。每个文本文件中大约有50000个键。相同的数据然后被提取的密钥过滤并保存到hdfs。
我想在hdfs中创建一个目录,其结构如下:hdfs://.../date/key 包含筛选数据的。问题是写入hdfs需要很长时间,因为有太多的键。
现在的写法是:

val inputData = sparkContext.textFile(""hdfs://...", 2)
val keys = extractKey(inputData) //keys is an array of approx 50000 unique strings
val cleanedData = cleanData(inputData) //cleaned data is an RDD of strings
keys.map(key => {
    val filteredData = cleanedData.filter(line => line.contains(key))
    filteredData.repartition(1).saveAsTextFile("hdfs://.../date/key")
})

有没有办法让这更快?我曾考虑过将数据重新划分为提取的键的数量,但之后我无法以这种格式保存hdfs://.../date/key. 我也尝试过groupbykey,但是我无法保存这些值,因为它们不是rdd。
感谢您的帮助:)

yftpprvb

yftpprvb1#

您只为输入指定了2个分区,为输出指定了1个分区。这样做的一个影响是严重限制了这些操作的并行性。为什么需要这些?
与其计算50000个过滤过的RDD(速度也很慢),不如直接按键分组?我知道你想把它们输出到不同的目录,但这确实造成了这里的瓶颈。有没有另一种方法可以简单地让您读取(键、值)结果?

voj3qocg

voj3qocg2#

我认为这种方法应该类似于通过键spark-one-spark作业写入多个输出。分区号与目录号无关。要实现它,您可能需要用自定义版本覆盖generatefilenameforkeyvalue以保存到不同的目录。
关于可伸缩性,这不是spark的问题,而是hdfs的问题。但不管你怎么实现,只要要求不改变,都是不可避免的。但我认为hdfs可以处理50000个文件处理程序

nfeuvbwi

nfeuvbwi3#

def writeLines(iterator: Iterator[(String, String)]) = {
  val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map
  try {
  while (iterator.hasNext) {
    val item = iterator.next()
    val key = item._1
    val line = item._2
    val writer = writers.get(key) match {
      case Some(writer) => writer
      case None =>
        val path = arg(1) + key
        val outputStream = FileSystem.get(new Configuration()).create(new Path(path))
        writer = new BufferedWriter(outputStream)
    }
    writer.writeLine(line)
    } finally {
    writers.values.foreach(._close())
    }
}

val inputData = sc.textFile()    
val keyValue = inputData.map(line => (key, line))
val partitions = keyValue.partitionBy(new MyPartition(10))    
partitions.foreachPartition(writeLines)

class MyPartitioner(partitions: Int) extends Partitioner {
    override def numPartitions: Int = partitions

    override def getPartition(key: Any): Int = {
        // make sure lines with the same key in the same partition 
        (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions 
    }
}

相关问题