将json提取到case类scala flink时出错

klh5stk1  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(303)

问题出在map函数中,在进行case类提取时。case类不可序列化。我已经定义了格式 DefaultFormats 含蓄地。

package org.apache.flink.quickstart
import java.util.Properties

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s.DefaultFormats
import org.json4s._
import org.json4s.native.JsonMethods
import scala.util.Try

case class CC(key:String)

object WordCount{
  def main(args: Array[String]) {

    implicit val formats = org.json4s.DefaultFormats

    // kafka properties
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "***.**.*.***:9093")
    properties.setProperty("zookeeper.connect", "***.**.*.***:2181")
    properties.setProperty("group.id", "afs")
    properties.setProperty("auto.offset.reset", "earliest")
    val env = StreamExecutionEnvironment.getExecutionEnvironment

   val st = env
       .addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
       .flatMap(raw => JsonMethods.parse(raw).toOption)
//       .map(_.extract[CC])

    val l = st.map(_.extract[CC])

    st.print()
      env.execute()
  }
}

错误:
信息[主](typeextractor。java:1804)-未检测到类org.json4s.jsonast$jvalue的字段。不能用作pojotype。将作为线程“main”org.apache.flink.api.common.invalidProgrameException中的generictype异常处理:任务不能在org.apache.flink.api.scala.closurecleaner$.ensureserializable(closurecleaner)上序列化。scala:172)在org.apache.flink.api.scala.closurecleaner$.clean(closurecleaner。scala:164)在org.apache.flink.streaming.api.scala.streamexecutionenvironment.scalaclean(streamexecutionenvironment。scala:666)在org.apache.flink.streaming.api.scala.datastream.clean(datastream。scala:994)在org.apache.flink.streaming.api.scala.datastream.map(datastream。scala:519)在org.apache.flink.quickstart.wordcount$.main(wordcount。scala:38)在org.apache.flink.quickstart.wordcount.main(wordcount.scala)原因:java.io.notserializableeexception:org.json4s.defaultformats$$anon$4 at java.io.objectoutputstream.writeobject0(objectoutputstream)。java:1184)在java.io.objectoutputstream.defaultwritefields(objectoutputstream。java:1548)在java.io.objectoutputstream.writeserialdata(objectoutputstream。java:1509)在java.io.objectoutputstream.writeordinaryobject(objectoutputstream。java:1432)在java.io.objectoutputstream.writeobject0(objectoutputstream。java:1178)在java.io.objectoutputstream.defaultwritefields(objectoutputstream。java:1548)在java.io.objectoutputstream.writeserialdata(objectoutputstream。java:1509)在java.io.objectoutputstream.writeordinaryobject(objectoutputstream。java:1432)在java.io.objectoutputstream.writeobject0(objectoutputstream。java:1178)在java.io.objectoutputstream.writeobject(objectoutputstream。java:348)在org.apache.flink.util.instantiationutil.serializeobject(instantiationutil。java:317)在org.apache.flink.api.scala.closurecleaner$.ensureserializable(closurecleaner。scala:170) ... 6个以上

Process finished with exit code 1
htzpubme

htzpubme1#

解决方法

implicit val formats = org.json4s.DefaultFormats

主要功能外如

object WordCount{

    implicit val formats = org.json4s.DefaultFormats
    def main(args: Array[String])

或者懒洋洋地初始化格式,比如

implicit lazy val formats = org.json4s.DefaultFormats

内部主要功能如下

def main(args: Array[String]) {
    implicit lazy val formats = org.json4s.DefaultFormats

相关问题