我正在尝试移植一个apache flink应用程序(scala)来激发结构化流媒体。应用程序的基本工作是:
读Kafka的留言
进行一些转换/处理
向Kafka输出零条或多条消息
在处理过程中,我想输出日志消息(一般处理信息、解析错误等)。然而-来自Flink-处理将在一个或多个时间内完成 .map
对我的 Dataset[Node]
/ Dataset[MyCaseClass]
物体。不幸的是,在这些操作符中,所有内容都必须是可序列化的,这对于我的记录器(使用 scala-logging
).
因此,在尝试使用记录器时,我得到: org.apache.spark.SparkException: Task not serializable
.
例子:
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", host + ":" + port)
.option("subscribe", topic)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.map(n =>
{
// processing here
log.warn("bla") // <-- no-go
<root></root>.asInstanceOf[Node]
})
.map(_.toString())
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", host.get + ":" + port.get)
.option("topic", topic.get)
.option("checkpointLocation", "myDir")
.start()
.awaitTermination()
对于不可序列化的日志记录之类的工作,建议采用什么方法?在flink中,可以选择子类 RichMapFunction
以及类似的类,在这些类中可以放置所有不可序列化的内容,这些内容将按操作符/并行性示例化。
1条答案
按热度按时间ia2d9nvy1#
如果您想在spark的map操作符(如数据库连接)中引入一个不可变量化的对象,那么您总是可以利用mappartition函数。