如何使用customjsonparser在spark结构化流中解析json字符串?

vybvopom  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(282)

用户将提供一个customjsonparser来将部分json字符串解析为customobject,而不是解析整个json字符串。如何使用这个customjsonparser在spark structured streaming中转换json字符串,而不是使用from\u json和get\u json\u object方法?
示例代码如下:

val jsonDF = spark.readStream.format("kafka")
            .option("kafka.bootstrap.servers", kakfaBrokers)
            .option("subscribe", kafkaConsumeTopicName)
            .option("group.id", kafkaConsumerGroupId)
            .option("startingOffsets", startingOffsets)
            .option("auto.offset.reset", autoOffsetReset)
            .option("key.deserializer", classOf[StringDeserializer].getName)
            .option("value.deserializer", classOf[StringDeserializer].getName)
            .option("enable.auto.commit", "false")
            .load()

val messagesDF = jsonDF.selectExpr("CAST(value AS STRING)")

spark.udf.register("parseJson", (json: String) =>
    customJsonParser.parseJson(json)
)

val objDF = messagesDF.selectExpr("""parseJson(value) AS message""")

val query = objDF.writeStream
            .outputMode(OutputMode.Append())
            .format("console")
            .start()

query.awaitTermination()

它运行时出现以下错误:
线程“main”java.lang.unsupportedoperationexception中出现异常:org.apache.spark.sql.catalyst.scalareflection$.schemafor(scalareflection)不支持com..entity类型的架构。scala:755)位于org.apache.spark.sql.catalyst.scalareflection$.schemafor(scalareflection.com)。scala:693)在org.apache.spark.sql.udfregistration.register(udfregistration。scala:159)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题