用户将提供一个customjsonparser来将部分json字符串解析为customobject,而不是解析整个json字符串。如何使用这个customjsonparser在spark structured streaming中转换json字符串,而不是使用from\u json和get\u json\u object方法?
示例代码如下:
val jsonDF = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", kakfaBrokers)
.option("subscribe", kafkaConsumeTopicName)
.option("group.id", kafkaConsumerGroupId)
.option("startingOffsets", startingOffsets)
.option("auto.offset.reset", autoOffsetReset)
.option("key.deserializer", classOf[StringDeserializer].getName)
.option("value.deserializer", classOf[StringDeserializer].getName)
.option("enable.auto.commit", "false")
.load()
val messagesDF = jsonDF.selectExpr("CAST(value AS STRING)")
spark.udf.register("parseJson", (json: String) =>
customJsonParser.parseJson(json)
)
val objDF = messagesDF.selectExpr("""parseJson(value) AS message""")
val query = objDF.writeStream
.outputMode(OutputMode.Append())
.format("console")
.start()
query.awaitTermination()
它运行时出现以下错误:
线程“main”java.lang.unsupportedoperationexception中出现异常:org.apache.spark.sql.catalyst.scalareflection$.schemafor(scalareflection)不支持com..entity类型的架构。scala:755)位于org.apache.spark.sql.catalyst.scalareflection$.schemafor(scalareflection.com)。scala:693)在org.apache.spark.sql.udfregistration.register(udfregistration。scala:159)
暂无答案!
目前还没有任何答案,快来回答吧!