如何使用scala在kafka consumer中实现反序列化?

n8ghc7c1  于 2021-06-09  发布在  Kafka
关注(0)|答案(1)|浏览(316)

我的Kafka消费者代码中有以下几行。

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)

如何将此流“行”反序列化为原始对象?通过将类扩展为serialisable,在kafka生产者中实现了可序列化性。我用scala在spark中实现了这个。

3z6pesqy

3z6pesqy1#

您需要实现一个自定义解码器,并将预期的类型信息与解码器一起提供给createstream函数。 KafkaUtils.createStream[KeyType, ValueType, KeyDecoder, ValueDecoder] (...) 例如,如果您使用 String 作为关键和 CustomContainer 作为价值,您的流创建将如下所示:

val stream = KafkaUtils.createStream[String, CustomContainer, StringDecoder, CustomContainerDecoder](...)

既然你把这些信息重新传达给Kafka new KeyedMessage[String,String] ,右边的解码器是字符串解码器,如下所示:

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](topic,...)

那会给你一个 DStream[String,String] 作为处理的基础。
如果你想发送/接收一个特定的对象类型,你需要为它实现一个Kafka编码器和解码器。你很幸运, PcapPacket 已经实现了执行此操作所需的方法:
pcappacket->byte[]:public int transferstateanddatato(byte[]缓冲区)
byte[]->pcappacket:公共pcappacket(byte[]缓冲区)
其余部分是实现kafka所需的编码器/解码器接口的样板代码。

相关问题