无法使用kafka流反序列化自定义serde

mpgws1up  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(357)

我正在尝试创建一个简单的大写拓扑 person 使用Kafka流的实体。

case class Person(id: Int, name: String, age: Int)

我的自定义序列化程序和反序列化程序如下所示:

class KafkaBytesSerializer[T] extends Serializer[T] {
      override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = 0

      override def serialize(topic: String, data: T): Array[Byte] = {
        val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
        val oos = new ObjectOutputStream(stream)
        oos.writeObject(data)
        oos.close()
        stream.toByteArray
      }
      override def close(): Unit = 0
    }

class KafkaBytesDeserializer[T] extends Deserializer[T]{
  override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = 0

  override def deserialize(topic: String, data: Array[Byte]): T = {
    val objIn = new ObjectInputStream(new ByteArrayInputStream(data))
    val obj = objIn.readObject().asInstanceOf[T]
    objIn.close
    obj
  }

  override def close(): Unit = 0
}

流媒体应用程序的主要调用代码如下:

val personSerde: Serde[Person] = 
 Serdes.serdeFrom(new KafkaBytesSerializer[Person], new KafkaBytesDeserializer[Person])

val builder = new StreamsBuilder()
    builder
.stream[String, Person](INPUT_TOPIC)(Consumed.`with`(Serdes.String(), personSerde))
.map[String, Person]((k,p) => (k, Person(p.id, p.name.toUpperCase(), p.age)))
.peek((k, p) => println("Key" + k + " Person: " + p))
.to(OUTPUT_TOPIC)(Produced.`with`(Serdes.String(), personSerde))

当我运行应用程序时,我得到类强制转换异常:

[MainApp-consumer-group-b45b436d-1412-494b-9733-f75a61c9b9e3-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [MainApp-consumer-group-b45b436d-1412-494b-9733-f75a61c9b9e3-StreamThread-1] Encountered the following error during processing:
java.lang.ClassCastException: [B cannot be cast to models.Person
    at org.apache.kafka.streams.scala.FunctionsCompatConversions$ValueMapperFromFunction$$anon$6.apply(FunctionsCompatConversions.scala:66)
    at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)

我怀疑反序列化级别出现了问题,但不确定原因是什么?
任何提示都会有帮助。

j0pj023g

j0pj023g1#

问题出在你的产品上。你准备好了吗 value.serializercom.thebigscale.serdes.PersonSerializer 然后尝试发送字节数组。你不应该序列化你的pojo。kafka序列化程序将为您完成-刚刚发送了person对象示例。
下面我用注解修复了你的代码

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "com.thebigscale.serdes.PersonSerializer")

val producer = new KafkaProducer[String, Person](props) // <-- Instead BYTE_ARRAY -> Person

val person = new Person(4, "user4", 27)
//val personSerializer = new KafkaBytesSerializer[Person]() // remove
//val bytePerson: BYTE_ARRAY = personSerializer.serialize("", person) // remove

val record = new ProducerRecord[String, Person](KafkaConf.INPUT_TOPIC, "key1", person) // instead BYTE_ARRAY -> Person, bytePerson -> person
producer.send(record, new Callback {
  override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
    if (exception != null ) {
      println("Exception thrown by producer: " + exception)
    } else {
      println("Record sent successfully: " + metadata)
    }
  }
})

相关问题