我正在使用kafka中的一个主题,我使用from\ujson将json消息转换为Dataframe,并使用结构化流。
val agentStringDF = spark
.readStream
.format("kafka")
.option("subscribe", "testTopic")
.option("startingOffsets" , "latest")
.load()
val df = agentStringDF.select(from_json(col("value").cast("string"), testTopicSchema).alias("testTopic"))
有没有什么方法可以用fromëjson执行某种类型的操作,如果失败,记录错误并转到下一个微批处理?
我见过在模式转换后检查它的案例。。但没有这样的检查。
暂无答案!
目前还没有任何答案,快来回答吧!