结构化流-foreach接收器

anauzrmj  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(245)

我基本上是在阅读Kafka的资料,把每一条信息都传递给我的朋友 foreach 处理器(感谢jacek的页面提供了简单的示例)。
如果这真的有效,我将在 process 方法,但是,这不起作用。我相信 println 因为它在执行器上运行,所以无法将这些日志恢复到驱动程序。然而,这 insert into 一个temp表至少应该可以工作,并向我显示消息实际上已经被消费并处理到了接收器。
我错过了什么?
真的在寻找第二双眼睛来检查我的努力:

val stream = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker")) 
      .option("subscribe", src_topic) 
      .load()

    val rec = stream.selectExpr("CAST(value AS STRING) as txnJson").as[(String)]

    val df = stream.selectExpr("cast (value as string) as json")

    val writer = new ForeachWriter[Row] {
      val scon = new SConConnection
      override def open(partitionId: Long, version: Long) = {
        true
      }
      override def process(value: Row) = {
        println("++++++++++++++++++++++++++++++++++++" + value.get(0))
        scon.executeUpdate("insert into rs_kafka10(miscCol) values("+value.get(0)+")")
      }
      override def close(errorOrNull: Throwable) = {
        scon.closeConnection
      }
    }

    val yy = df.writeStream
      .queryName("ForEachQuery")
      .foreach(writer)
      .outputMode("append")
      .start()

    yy.awaitTermination()
wz3gfoph

wz3gfoph1#

感谢harald和其他人的评论,我发现了一些事情,这使我实现了正常的处理行为-
用本地模式测试代码,对调试没有最大帮助
由于某些原因,foreach sink的process方法不允许调用其他方法。当我把我的业务逻辑直接放进去的时候,它就工作了。
希望它能帮助别人。

相关问题