我有一个案例课:
case class clickStream(userid:String, adId :String, timestamp:String)
我希望以Kafka制作人的身份发送的示例:
val record = new ProducerRecord[String,clickStream](
"clicktream",
"data",
clickStream(Random.shuffle(userIdList).head, Random.shuffle(adList).head, new Date().toString).toString
)
producer.send(record)
在主题队列中,它以字符串形式完美地发送记录:
clickStream(user5,ad2,Sat Jul 18 20:48:53 IST 2020)
然而,问题出在消费端:
val clickStreamDF = spark.readStream
.format("kafka")
.options(kafkaMap)
.option("subscribe","clicktream")
.load()
clickStreamDF
.select($"value".as("string"))
.as[clickStream] //trying to leverage DataSet APIs conversion
.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate","false")
.start()
.awaitTermination()
显然,使用.as[clickstream]api无法正常工作,例外情况是:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`userid`' given input columns: [value];
这是[value]列包含的内容:
Batch: 2
-------------------------------------------
+----------------------------------------------------+
|value |
+----------------------------------------------------+
|clickStream(user3,ad11,Sat Jul 18 20:59:35 IST 2020)|
+----------------------------------------------------+
我尝试将自定义序列化程序用作value.serializer和value.deserializer
但是在我的目录结构中面临一个不同的classnotfoundexception问题。
我有三个问题:
kafka如何在这里使用自定义反序列化器类来解析对象?
我不完全理解编码器的概念,以及如何在这种情况下使用它?
使用kafka发送/接收自定义case类对象的最佳方法是什么?
1条答案
按热度按时间bwleehnv1#
当你经过的时候
clickStream
对象数据为string
到kafka&spark将读取相同的字符串,在spark中您必须解析和提取所需字段clickStream(user3,ad11,Sat Jul 18 20:59:35 IST 2020)
检查以下代码。示例如何解析
clickStream
字符串数据。使用kafka发送/接收自定义case类对象的最佳方法是什么?
尝试将您的case类转换为
json
或者avro
或者csv
然后给Kafka发信息,用spark读同样的信息。