向spark dataframe添加其他列

vwoqyblh  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(599)

我使用文件路径解析sparkDataframe,但是现在我想将路径和时间一起作为一个单独的列添加到结果Dataframe中。以下是当前解决方案(pathtodf是一个helper方法):

val paths = pathsDF
  .orderBy($"time")
  .select($"path")
  .as[String]
  .collect()

if(paths.nonEmpty) {
  paths
    .grouped(groupsNum.getOrElse(paths.length))
    .map(_.map(pathToDF).reduceLeft(_ union _))
} else {
  Seq.empty[DataFrame]
}

我正在尝试这样做,但我不知道如何使用withcolumn添加时间列:

val orderedPaths = pathsDF
      .orderBy($"time")
      .select($"path")
   //.select($"path", $"time") for both columns

    val paths = orderedPaths
      .as[String]
      .collect()

    if (paths.nonEmpty) {
      paths
        .grouped(groupsNum.getOrElse(paths.length))
        .map(group => group.map(pathToDataDF).reduceLeft(_ union _)
          .withColumn("path", orderedPaths("path")))
    //.withColumn("time", orderedPaths("time") something like this
    } else {
      Seq.empty[DataFrame]
    }

有什么更好的方法来实施它?
输入测向:

time Long
path String

当前结果:

resultDF schema
field1 Int
field2 String
....
fieldN String

预期结果:

resultDF schema
field1 Int
field2 String
....
path   String
time   Long
bejyjqdl

bejyjqdl1#

请检查下面的代码。
1改变 groupedpar 用于并行数据加载的函数。
2改变

// Below code will add same path for multiple files content.
paths.grouped(groupsNum.getOrElse(paths.length))
     .map(group => group.map(pathToDataDF).reduceLeft(_ union _)
     .withColumn("path", orderedPaths("path")))

// Below code will add same path for same file content.
paths
.grouped(groupsNum.getOrElse(paths.length))
.flatMap(group => {
    group.map(path => {
        pathToDataDF(path).withColumn("path", lit(path)) 
        }
    )
})
.reduceLeft(_ union _)

例如,我两个都用过 par & grouped 给你看。 Note 忽略一些方法,比如 pathToDataDF 我试着复制你的方法。

scala> val orderedPaths = Seq(("/tmp/data/foldera/foldera.json","2020-05-29 01:30:00"),("/tmp/data/folderb/folderb.json","2020-05-29 02:00:00"),("/tmp/data/folderc/folderc.json","2020-05-29 03:00:00")).toDF("path","time")
orderedPaths: org.apache.spark.sql.DataFrame = [path: string, time: string]

scala> def pathToDataDF(path: String) = spark.read.format("json").load(path)
pathToDataDF: (path: String)org.apache.spark.sql.DataFrame

//Sample File content I have taken.

scala> "cat /tmp/data/foldera/foldera.json".!
{"name":"Srinivas","age":29}

scala> "cat /tmp/data/folderb/folderb.json".!
{"name":"Ravi","age":20}

scala> "cat /tmp/data/folderc/folderc.json".!
{"name":"Raju","age":25}

使用 par ```
scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)

scala> val parDF = paths match {
case p if !p.isEmpty => {
p.par
.map(path => {
pathToDataDF(path)
.withColumn("path",lit(path))
}).reduceLeft(_ union _)
}
case _ => spark.emptyDataFrame
}
parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]

scala> parDF.show(false)
+---+--------+------------------------------+
|age|name |path |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi |/tmp/data/folderb/folderb.json|
|25 |Raju |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+

// With time column.

scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))

scala> val parDF = paths match {
case p if !p.isEmpty => {
p.par
.map(path => {
pathToDataDF(path._1)
.withColumn("path",lit(path._1))
.withColumn("time",lit(path.2))
}).reduceLeft(
union _)
}
case _ => spark.emptyDataFrame
}

parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]

scala> parDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name |path |time |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+

使用 `grouped` ```
scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)

scala> val groupedDF = paths match {
            case p if !p.isEmpty => {
                paths
                .grouped(groupsNum.getOrElse(paths.length))
                .flatMap(group => {
                    group
                    .map(path => { 
                        pathToDataDF(path)
                        .withColumn("path", lit(path))
                    })
                }).reduceLeft(_ union _)
            }
            case _ => spark.emptyDataFrame
        }

groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]

scala> groupedDF.show(false)

+---+--------+------------------------------+
|age|name    |path                          |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi    |/tmp/data/folderb/folderb.json|
|25 |Raju    |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+

// with time column.

scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))

scala> val groupedDF = paths match {
            case p if !p.isEmpty => {
                paths
                .grouped(groupsNum.getOrElse(paths.length))
                .flatMap(group => {
                    group
                    .map(path => {
                        pathToDataDF(path._1)
                        .withColumn("path",lit(path._1))
                        .withColumn("time",lit(path._2))
                    })
                }).reduceLeft(_ union _)
            }
            case _ => spark.emptyDataFrame
        }

groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]

scala> groupedDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name    |path                          |time               |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi    |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju    |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+

相关问题