使用烫伤读取多个文件并输出单个文件

qmelpv7a  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(395)

最近我遇到了一个问题,我试图使用烫伤从多个文件中读取数据,并使用单个文件创建输出。我的代码是:

def getFilesSource (paths: Seq[String]) = {
    new MultipleTextLineFiles(paths: _*) {
      override protected def createHdfsReadTap(hdfsMode: Hdfs): Tap[JobConf, _, _] =  {
        val taps = goodHdfsPaths(hdfsMode).toList.map {
          path => CastHfsTap (new Hfs (hdfsScheme, path, sinkMode))
        }

        taps.size match {
          case 0 => {
            CastHfsTap (new Hfs(hdfsScheme, hdfsPaths.head, sinkMode))
          }
          case 1 => taps.head
          case _ => new ScaldingMultiSourceTap(taps)
        }
      }
    }
  }

但是当我运行这个代码时,它将我的输出分割成许多文件,但是里面的数据非常少:只有几个k。相反,我希望能够将所有输出文件聚合到一个文件中。
我的密码是:

val source = getFilesSource(mapped) // where mapped is a Sequence of valid HDFS paths (Seq [String])

TypedPipe.from(source).map(a => Try{
  val json = JSON.parseObject(a)
  (json.getInteger("prop1"), json.getInteger("prop2"), json.getBoolean("prop3"))
}.toOption).filter(a => a.nonEmpty)
  .map(a => a.get)
  .filter(a => !a._3)
  .map (that => MyScaldingType (that._1, that._2))
  .write(MyScaldingType.typedSink(typedArgs))

我想我必须重写类型为boldingmultisourcetap的“sourceconfinit”方法,但是我不知道在里面写什么。。。

wecizke3

wecizke31#

考虑到数据较小,可以使用groupall将所有Map输出(作业是仅Map的作业)发送到单个reducer,然后执行写操作。输出将写入单个文件。

.
.
.
.filter(a => !a._3)
.map (that => MyScaldingType (that._1, that._2))
.groupAll
.write(MyScaldingType.typedSink(typedArgs))

相关问题