我的Kafka消费者代码中有以下几行。
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
如何将此流“行”反序列化为原始对象?通过将类扩展为serialisable,在kafka生产者中实现了可序列化性。我用scala在spark中实现了这个。
3z6pesqy1#
您需要实现一个自定义解码器,并将预期的类型信息与解码器一起提供给createstream函数。 KafkaUtils.createStream[KeyType, ValueType, KeyDecoder, ValueDecoder] (...) 例如,如果您使用 String 作为关键和 CustomContainer 作为价值,您的流创建将如下所示:
KafkaUtils.createStream[KeyType, ValueType, KeyDecoder, ValueDecoder] (...)
String
CustomContainer
val stream = KafkaUtils.createStream[String, CustomContainer, StringDecoder, CustomContainerDecoder](...)
既然你把这些信息重新传达给Kafka new KeyedMessage[String,String] ,右边的解码器是字符串解码器,如下所示:
new KeyedMessage[String,String]
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](topic,...)
那会给你一个 DStream[String,String] 作为处理的基础。如果你想发送/接收一个特定的对象类型,你需要为它实现一个Kafka编码器和解码器。你很幸运, PcapPacket 已经实现了执行此操作所需的方法:pcappacket->byte[]:public int transferstateanddatato(byte[]缓冲区)byte[]->pcappacket:公共pcappacket(byte[]缓冲区)其余部分是实现kafka所需的编码器/解码器接口的样板代码。
DStream[String,String]
PcapPacket
1条答案
按热度按时间3z6pesqy1#
您需要实现一个自定义解码器,并将预期的类型信息与解码器一起提供给createstream函数。
KafkaUtils.createStream[KeyType, ValueType, KeyDecoder, ValueDecoder] (...)
例如,如果您使用String
作为关键和CustomContainer
作为价值,您的流创建将如下所示:既然你把这些信息重新传达给Kafka
new KeyedMessage[String,String]
,右边的解码器是字符串解码器,如下所示:那会给你一个
DStream[String,String]
作为处理的基础。如果你想发送/接收一个特定的对象类型,你需要为它实现一个Kafka编码器和解码器。你很幸运,
PcapPacket
已经实现了执行此操作所需的方法:pcappacket->byte[]:public int transferstateanddatato(byte[]缓冲区)
byte[]->pcappacket:公共pcappacket(byte[]缓冲区)
其余部分是实现kafka所需的编码器/解码器接口的样板代码。