我使用文件路径解析sparkDataframe,但是现在我想将路径和时间一起作为一个单独的列添加到结果Dataframe中。以下是当前解决方案(pathtodf是一个helper方法):
val paths = pathsDF
.orderBy($"time")
.select($"path")
.as[String]
.collect()
if(paths.nonEmpty) {
paths
.grouped(groupsNum.getOrElse(paths.length))
.map(_.map(pathToDF).reduceLeft(_ union _))
} else {
Seq.empty[DataFrame]
}
我正在尝试这样做,但我不知道如何使用withcolumn添加时间列:
val orderedPaths = pathsDF
.orderBy($"time")
.select($"path")
//.select($"path", $"time") for both columns
val paths = orderedPaths
.as[String]
.collect()
if (paths.nonEmpty) {
paths
.grouped(groupsNum.getOrElse(paths.length))
.map(group => group.map(pathToDataDF).reduceLeft(_ union _)
.withColumn("path", orderedPaths("path")))
//.withColumn("time", orderedPaths("time") something like this
} else {
Seq.empty[DataFrame]
}
有什么更好的方法来实施它?
输入测向:
time Long
path String
当前结果:
resultDF schema
field1 Int
field2 String
....
fieldN String
预期结果:
resultDF schema
field1 Int
field2 String
....
path String
time Long
1条答案
按热度按时间bejyjqdl1#
请检查下面的代码。
1改变
grouped
至par
用于并行数据加载的函数。2改变
至
例如,我两个都用过
par
&grouped
给你看。Note
忽略一些方法,比如pathToDataDF
我试着复制你的方法。使用
par
```scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)
scala> val parDF = paths match {
case p if !p.isEmpty => {
p.par
.map(path => {
pathToDataDF(path)
.withColumn("path",lit(path))
}).reduceLeft(_ union _)
}
case _ => spark.emptyDataFrame
}
parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]
scala> parDF.show(false)
+---+--------+------------------------------+
|age|name |path |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi |/tmp/data/folderb/folderb.json|
|25 |Raju |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+
// With time column.
scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))
scala> val parDF = paths match {
case p if !p.isEmpty => {
p.par
.map(path => {
pathToDataDF(path._1)
.withColumn("path",lit(path._1))
.withColumn("time",lit(path.2))
}).reduceLeft( union _)
}
case _ => spark.emptyDataFrame
}
parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]
scala> parDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name |path |time |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+