Spark来自_json无例外

unhi4e5o  于 2023-01-27  发布在  Spark
关注(0)|答案(3)|浏览(75)

我正在使用Spark 2.1(scala 2.11)。
我想从一个 Dataframe 加载一个定义了模式的json格式的字符串到另一个 Dataframe 。我尝试了一些解决方案,但最便宜的是来自_json的标准列函数。我尝试了一个例子(https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-collection.html#from_json),这个函数给了我意想不到的结果。

val df = spark.read.text("testFile.txt")

df.show(false)

+----------------+
|value           |
+----------------+
|{"a": 1, "b": 2}|
|{bad-record     |
+----------------+

df.select(from_json(col("value"),
      StructType(List(
                  StructField("a",IntegerType),
                  StructField("b",IntegerType)
                ))
    )).show(false)

+-------------------+
|jsontostruct(value)|
+-------------------+
|[1,2]              |
|null               |
+-------------------+

此行为类似于mode:PERMISSIVE,但不是默认值。默认情况下,它设置为FAILFAST模式,这意味着只要输入数据与强制模式不匹配,它就会抛出异常。
我尝试使用DataFrameReader(JSON数据源和FAILFAST模式)加载testFile.txt,并成功捕获了一个异常。

spark.read.option("mode","FAILFAST").json("test.txt").show(false)

---
Caused by: org.apache.spark.sql.catalyst.json.SparkSQLJsonProcessingException: Malformed line in FAILFAST mode: {bad-record
---

虽然解析模式在两种情况下是相同的,为什么各自的输出如此不同?

sg2wtvxw

sg2wtvxw1#

这是预期的行为。from_json是SQL函数,在此级别没有异常(故意的)的概念。如果操作失败,结果是未定义的NULL
虽然from_json提供了options参数,允许您设置JSON读取器选项,但是由于上面提到的原因,此行为不能被覆盖。
另一方面,DataFrameReader的默认模式是允许的。

wnvonmuf

wnvonmuf2#

请注意,您将文件作为文本文件读取并将其转换为json。默认情况下,换行符将作为文本文件的分隔符,如果您有一个有效的JSON字符串,则它将正确转换为您在from_json()方法中定义的模式。
如果有空行或无效的JSON文本,则会得到NULL。
看看这个:

val df = spark.read.text("in/testFile.txt")
println("Default show()")
df.show(false)

println("Using the from_json method ")
df.select(from_json(col("value"),
  StructType(List(
    StructField("a",IntegerType),
    StructField("b",IntegerType)
  ))
)).show(false)

当in/testFile.txt具有以下内容时,

{"a": 1, "b": 2 }

它打印

Default show()
+-----------------+
|value            |
+-----------------+
|{"a": 1, "b": 2 }|
+-----------------+

Using the from_json method 
+--------------------+
|jsontostructs(value)|
+--------------------+
|[1,2]               |
+--------------------+

当您的输入带有空行时

{"a": 1, "b": 2 }
// Blank line

结果是

Default show()
+-----------------+
|value            |
+-----------------+
|{"a": 1, "b": 2 }|
|                 |
+-----------------+

Using the from_json method 
+--------------------+
|jsontostructs(value)|
+--------------------+
|[1,2]               |
|null                |
+--------------------+
ljsrvy3e

ljsrvy3e3#

要添加到@user11022201答案-看起来options参数可以实现所需的FAILFAST行为。以下代码在pyspark中,并使用Spark 3.2.2进行了测试

import pyspark
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType

spark_session = pyspark.sql.SparkSession.builder.master("local[*]").appName("test").getOrCreate()

data = [
    {'value': '{"a": 1, "b": 2}'},
    {'value': '{bad-record'},
]

df = spark_session.createDataFrame(data)

schema = StructType([
    StructField("a", IntegerType()),
    StructField("b", IntegerType())
])

# If options are empty then the error does not happen and null values are added to the dataframe
# options = {}
options = {"mode": "FAILFAST"}

parsed_json_df = df.select(F.from_json(F.col("value"), schema, options))
parsed_json_df.show()

上述代码的结果是一个异常,这是所需的行为:

org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1236)
    at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:68)

相关问题