登录spark structured streaming/sparkexception:任务不可序列化

vaj7vani  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(312)

我正在尝试移植一个apache flink应用程序(scala)来激发结构化流媒体。应用程序的基本工作是:
读Kafka的留言
进行一些转换/处理
向Kafka输出零条或多条消息
在处理过程中,我想输出日志消息(一般处理信息、解析错误等)。然而-来自Flink-处理将在一个或多个时间内完成 .map 对我的 Dataset[Node] / Dataset[MyCaseClass] 物体。不幸的是,在这些操作符中,所有内容都必须是可序列化的,这对于我的记录器(使用 scala-logging ).
因此,在尝试使用记录器时,我得到: org.apache.spark.SparkException: Task not serializable .
例子:

spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", host + ":" + port)
      .option("subscribe", topic)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]
      .map(n =>
      {
        // processing here

        log.warn("bla")      // <-- no-go

        <root></root>.asInstanceOf[Node]
      })
      .map(_.toString())
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", host.get + ":" + port.get)
      .option("topic", topic.get)
      .option("checkpointLocation", "myDir")
      .start()
      .awaitTermination()

对于不可序列化的日志记录之类的工作,建议采用什么方法?在flink中,可以选择子类 RichMapFunction 以及类似的类,在这些类中可以放置所有不可序列化的内容,这些内容将按操作符/并行性示例化。

ia2d9nvy

ia2d9nvy1#

如果您想在spark的map操作符(如数据库连接)中引入一个不可变量化的对象,那么您总是可以利用mappartition函数。

mapPartition(iter => {
    val log = LoggerFactory.getLogger
    iter.map(row => {
        ....
    })
})

相关问题