spark结构化流媒体中如何选择case类对象作为Dataframe

luaexgnf  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(324)

我有一个案例课:

case class clickStream(userid:String, adId :String, timestamp:String)

我希望以Kafka制作人的身份发送的示例:

val record = new ProducerRecord[String,clickStream](
  "clicktream",
  "data",
  clickStream(Random.shuffle(userIdList).head, Random.shuffle(adList).head, new Date().toString).toString
)
producer.send(record)

在主题队列中,它以字符串形式完美地发送记录:

clickStream(user5,ad2,Sat Jul 18 20:48:53 IST 2020)

然而,问题出在消费端:

val clickStreamDF = spark.readStream
.format("kafka")
.options(kafkaMap)
.option("subscribe","clicktream")
.load()

clickStreamDF 
.select($"value".as("string"))
.as[clickStream]       //trying to leverage DataSet APIs conversion
.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate","false")
.start()
.awaitTermination()

显然,使用.as[clickstream]api无法正常工作,例外情况是:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`userid`' given input columns: [value];

这是[value]列包含的内容:

Batch: 2
-------------------------------------------
+----------------------------------------------------+
|value                                               |
+----------------------------------------------------+
|clickStream(user3,ad11,Sat Jul 18 20:59:35 IST 2020)|
+----------------------------------------------------+

我尝试将自定义序列化程序用作value.serializer和value.deserializer
但是在我的目录结构中面临一个不同的classnotfoundexception问题。
我有三个问题:
kafka如何在这里使用自定义反序列化器类来解析对象?
我不完全理解编码器的概念,以及如何在这种情况下使用它?
使用kafka发送/接收自定义case类对象的最佳方法是什么?

bwleehnv

bwleehnv1#

当你经过的时候 clickStream 对象数据为 string 到kafka&spark将读取相同的字符串,在spark中您必须解析和提取所需字段 clickStream(user3,ad11,Sat Jul 18 20:59:35 IST 2020) 检查以下代码。

clickStreamDF 
.select(split(regexp_extract($"value","\\(([^)]+)\\)",1),"\\,").as("value"))
.select($"value"(0).as("userid"),$"value"(1).as("adId"),$"value"(2).as("timestamp"))
.as[clickStream] # Extract all fields from the value string & then use .as[clickStream] option. I think this line is not required as data already parsed to required format. 
.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate","false")
.start()
.awaitTermination()

示例如何解析 clickStream 字符串数据。

scala> df.show(false)
+---------------------------------------------------+
|value                                              |
+---------------------------------------------------+
|clickStream(user5,ad2,Sat Jul 18 20:48:53 IST 2020)|
+---------------------------------------------------+
scala> df
.select(split(regexp_extract($"value","\\(([^)]+)\\)",1),"\\,").as("value"))
.select($"value"(0).as("userid"),$"value"(1).as("adId"),$"value"(2).as("timestamp"))
.as[clickStream]
.show(false)

+------+----+----------------------------+
|userid|adId|timestamp                   |
+------+----+----------------------------+
|user5 |ad2 |Sat Jul 18 20:48:53 IST 2020|
+------+----+----------------------------+

使用kafka发送/接收自定义case类对象的最佳方法是什么?
尝试将您的case类转换为 json 或者 avro 或者 csv 然后给Kafka发信息,用spark读同样的信息。

相关问题