scala—在sparkDataframe中强制转换列而不创建空值

hsgswve4  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(419)

有没有办法在spark中强制转换列,并在类型不匹配的情况下使其失败,而不是返回null?
作为一个例子,我有一个包含所有字符串列的df,但其中一个我想转换为最新的

+----------+------------+------------+
|   service|   eventType|process_date|
+----------+------------+------------+
| myservice| myeventtype|  2020-10-15|
| myservice| myeventtype|  2020-02-15|
|myservice2|myeventtype3|  notADate  |
+----------+------------+------------+

如果我试着把这个和主 cast 功能 df.withColumn("process_date", df("process_date").cast(targetType)) 它将用一个 null ```
+----------+------------+------------+
| service| eventType|process_date|
+----------+------------+------------+
| myservice| myeventtype| 2020-10-15|
| myservice| myeventtype| 2020-02-15|
|myservice2|myeventtype3| null|
+----------+------------+------------+

在我当前的程序中使用此函数可能会导致危险的数据丢失,我可能无法捕获这些数据,直到太迟了。
fnx2tebb

fnx2tebb1#

我找到两种方法来做你想做的事。
首先,如果您确实希望在无法解析日期时进程失败,可以使用udf:

import java.time.LocalDate
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DateType

object Data {
  val tuples = List(
    ("myservice", "myeventtype", "2020-10-15"),
    ("myservice", "myeventtype", "2020-02-15"),
    ("myservice2", "myeventtype3", "notADate")
  )
}

object BadDates {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.master("local[2]").appName("Simple Application").getOrCreate()

    import spark.implicits._
    val dfBad = Data.tuples.toDF("service","eventType","process_date")

    val dateConvertUdf = udf({str : String => java.sql.Date.valueOf(LocalDate.parse(str))})

    dfBad
      .withColumn("process_date", dateConvertUdf(col("process_date")))
      .show()
  }
}

此操作将失败,但出现以下异常:

Exception in thread "main" org.apache.spark.SparkException: Failed to execute user defined function(BadDates$$$Lambda$1122/934288610: (string) => date)
    at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1130)
    at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:156)
...
Caused by: java.time.format.DateTimeParseException: Text 'notADate' could not be parsed at index 0
    at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)

或者,您可以执行转换并检查转换后的值是否为null,但原始值是否不适用于任何行:

object BadDates2 {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.master("local[2]").appName("Simple Application").getOrCreate()

    import spark.implicits._
    val dfBad = Data.tuples.toDF("service","eventType","process_date")

    val df = dfBad
      .withColumn("process_date_dat", col("process_date").cast(DateType))

    val badLines = df
      .filter(col("process_date").isNotNull && col("process_date_dat").isNull)
      .count()

    assert(badLines==0) //This will fail, badLines is 1
  }
}

相关问题