enocder问题-spark结构化流媒体-仅适用于repl

wbrvyc0a  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(338)

我有一个使用schema reg接收和反序列化kafka avro消息的工作流程。它在repl中工作得很好,但是当我尝试编译时

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
[error]       .map(x => {

我不确定是否需要修改我的对象,但是如果repl工作正常,为什么要修改呢。

object AgentDeserializerWrapper {
      val props = new Properties()
      props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL)
      props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
      val vProps = new kafka.utils.VerifiableProperties(props)
      val deser = new KafkaAvroDecoder(vProps)
      val avro_schema = new RestService(schemaRegistryURL).getLatestVersion(subjectValueNameAgentRead)
      val messageSchema = new Schema.Parser().parse(avro_schema.getSchema)
    }

    case class DeserializedFromKafkaRecord( value: String)

    import spark.implicits._

    val agentStringDF = spark
      .readStream
      .format("kafka")
      .option("subscribe", "agent")
      .options(kafkaParams)
      .load()
      .map(x => {
        DeserializedFromKafkaRecord(AgentDeserializerWrapper.deser.fromBytes(x.getAs[Array[Byte]]("value"), AgentDeserializerWrapper.messageSchema).asInstanceOf[GenericData.Record].toString)
      })
58wvjzkj

58wvjzkj1#

添加为[deserializedfromkafkarecord],以便静态键入数据集:

val agentStringDF = spark
      .readStream
      .format("kafka")
      .option("subscribe", "agent")
      .options(kafkaParams)
      .load()
      .as[DeserializedFromKafkaRecord]
      .map(x => {
        DeserializedFromKafkaRecord(AgentDeserializerWrapper.deser.fromBytes(x.getAs[Array[Byte]]("value"), AgentDeserializerWrapper.messageSchema).asInstanceOf[GenericData.Record].toString)
      })

相关问题