scala—遍历Dataframe的行

hrirmatl  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(1579)

因为我对spark scala有点陌生,所以我发现很难遍历Dataframe。我的数据框包含两列,一列是 path 另一个是 ingestiontime . 示例-

现在我想遍历这个Dataframe并使用 Path 以及 ingestiontime 列以准备 Hive 查询并运行它,这样运行的查询-

ALTER TABLE <hiveTableName> ADD PARTITON (ingestiontime=<Ingestiontime_From_the_DataFrame_ingestiontime_column>) LOCATION (<Path_From_the_dataFrames_path_column>)

为了达到这个目的,我用-

allOtherIngestionTime.collect().foreach {
  row =>
     var prepareHiveQuery = "ALTER TABLE myhiveTable ADD PARTITION (ingestiontime = "+row.mkString("<SomeCustomDelimiter>").split("<SomeCustomDelimiter>")(1)+" LOCATION ( " + row.mkString("<SomeCustomDelimiter>").split("<SomeCustomDelimiter>")(0) + ")"
      spark.sql(prepareHiveQuery)

}

但我觉得这可能非常危险,即当我的数据由类似的分隔符组成时。我非常有兴趣找出遍历Dataframe的行/列的其他方法。

izj3ouym

izj3ouym1#

检查以下代码。

df
.withColumn("query",concat_ws("",lit("ALTER TABLE myhiveTable ADD PARTITON (ingestiontime="),col("ingestiontime"),lit(") LOCATION (\""),col("path"),lit("\"))")))
.select("query")
.as[String]
.collect
.foreach(q => spark.sql(q))
lo8azlld

lo8azlld2#

以便访问您的列 path 以及 ingestiontime 你可以你 row.getString(0) 以及 row.getString(1) .

Dataframe

val allOtherIngestionTime: DataFrame = ???
    allOtherIngestionTime.foreach {
      row =>
        val prepareHiveQuery = "ALTER TABLE myhiveTable ADD PARTITION (ingestiontime = "+row.getString(1)+" LOCATION ( " + row.getString(0) + ")"
        spark.sql(prepareHiveQuery)
    }

数据集

如果使用数据集而不是Dataframe,则可以使用 row.path 以及 row.ingestiontime 以更简单的方式。

case class myCaseClass(path: String, ingestionTime: String)

val ds: Dataset[myCaseClass] = ???

ds.foreach({ row =>
  val prepareHiveQuery = "ALTER TABLE myhiveTable ADD PARTITION (ingestiontime = " + row.ingestionTime + " LOCATION ( " + row.path + ")"
  spark.sql(prepareHiveQuery)
})

在任何情况下,都可以使用 foreach ,或 map 如果要将内容转换为其他内容。
另外,使用 collect() 您正在将所有数据带到驱动程序,但不建议这样做,您可以使用 foreach 或者 map 没有 collect() 如果你想迭代 row 菲尔兹,你可以做一个 Seq 并迭代:

row.toSeq.foreach{column => ...}

相关问题