spark structured streaming+kafka:当kafka消息与json模式不匹配时,停止查询崩溃

k3bvogb1  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(206)

有点像我一个月前的一篇文章的分支。我有一个spark结构的蒸汽应用程序,我在读Kafka。这是我的代码的基本结构。
我创建了spark会话。

val spark = SparkSession
  .builder
  .appName("app_name")
  .getOrCreate()

然后我在小溪里看书

val data_stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "server_list")
  .option("subscribe", "topic")
  .load()

在Kafka记录中,我将“值”转换为字符串。它从二进制转换为字符串。

val df = data_stream
    .select($"value".cast("string") as "json")

基于预定义的模式,我尝试将json结构解析为列。但是,这里的问题是,如果数据是“坏的”或不同的格式,那么它与定义的模式不匹配。我需要筛选出与我的架构不匹配的行。不管它们是空的,数字,一些随机的文本,比如“你好”。如果它不是一个json,那么它就不应该继续到下一个dataframe进程

val df2 = df.select(from_json($"json", schema) as "data")
  .select("data.*")

如果我通过console producer传入一个空的kafka消息,spark查询就会崩溃
java.util.nosuchelementexception:scala.collection.immutable.nil$.head(list。scala:420)在scala.collection.immutable.nil$.head(列表。scala:417)位于org.apache.spark.sql.catalyst.expressions.jsontostruct.nullsafeeval(jsonexpressions)。scala:500)在org.apache.spark.sql.catalyst.expressions.unaryexpression.eval(表达式。scala:325)位于org.apache.spark.sql.catalyst.expressions.generatedclass$specificpredicate.eval(未知源代码),位于org.apache.spark.sql.execution.filterexec$$anonfun$17$$anonfun$apply$2.apply(basicphysicaloperators)。scala:219)在org.apache.spark.sql.execution.filterexec$$anonfun$17$$anonfun$apply$2.apply(基本物理运算符)。scala:218)在scala.collection.iterator$$anon$13.hasnext(iterator。scala:463)在scala.collection.iterator$$anon$11.hasnext(iterator。scala:408)位于org.apache.spark.sql.execution.streaming.foreachsink$$anonfun$addbatch$1.apply(foreachsink)。scala:52)在org.apache.spark.sql.execution.streaming.foreachsink$$anonfun$addbatch$1.apply(foreachsink)。scala:49)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1$$anonfun$apply$29.apply(rdd。scala:925)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1$$anonfun$apply$29.apply(rdd。scala:925)在org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext。scala:1944)在org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext。scala:1944)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:87)在org.apache.spark.scheduler.task.run(task。scala:99)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题