scala—使用spark结构化流处理包含嵌套实体的json

2wnc66cl  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(300)

我想从kafka主题源中使用spark结构化流读取嵌套数据。我的scala代码(case类和spark处理代码):

case class Nested(attr_int: Integer, attr_string: String, attr_float: Float, attr_timestamp: java.sql.Timestamp)

case class Parent(a_str: String, a_long: Long, a_nested: Array[Nested])

import org.apache.spark.sql.Encoders
val jsonSchema = Encoders.product[Parent].schema

val df = sparkSession
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "testnested")
    .option("group.id", "testnested")
    .option("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
    .option("value.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
    .load()
    .select($"value" cast "string" as "json")
    .select(from_json($"json", jsonSchema) as "data")
    .select("data.*")
    .withColumn("nested", explode($"a_nested"))
    .select("nested.*")
    .as[Nested]
    .writeStream
    .format("console")
    .start()
    .awaitTermination()

当我向Kafka发送数据时:

{"a_str":"Str","a_long":100,"a_nested":[{"attr_int":0,"attr_string":"nested_0","attr_float":0.0,"attr_timestamp":"2018-01-01T11:00:00.123321+02:00"},{"attr_int":1,"attr_string":"nested_1","attr_float":1.0,"attr_timestamp":"2018-02-02T12:01:01.023321+02:00"}]}

我得到的结果是:

+--------+-----------+----------+--------------------+
|attr_int|attr_string|attr_float|      attr_timestamp|
+--------+-----------+----------+--------------------+
|       0|   nested_0|       0.0|2018-01-01 13:02:...|
|       1|   nested_1|       1.0|2018-02-02 14:01:...|
+--------+-----------+----------+--------------------+

现在我想让每个嵌套项都连接到父数据,例如:

+--------+-----------+----------+--------------------+-------+--------+
|attr_int|attr_string|attr_float|      attr_timestamp| a_str | a_long |
+--------+-----------+----------+--------------------+-------+--------+
|       0|   nested_0|       0.0|2018-01-01 13:02:...|   Str |    100 |
|       1|   nested_1|       1.0|2018-02-02 14:01:...|   Str |    100 |
+--------+-----------+----------+--------------------+-------+--------+

请注意 "a_str" 以及 "a_long" 是父实体中的列 "Parent" . 既然我不是spark结构化流处理的Maven,我想知道什么是最“惯用”的处理方法?目前我有以下假设:
创建自定义kafka值反序列化程序
在结构化流上编写某种类型的连接(我一直坚持这样做),但我认为这将需要更改json结构(例如,在嵌套中指定指向父数据的某个键值)
编写自定义方法,该方法将返回连接实体的非规范化数据并使用 flatMap 用这种方法
请告知。
谢谢
更新1:为了方便起见,我在github上创建了相应的项目:https://github.com/lospejos/spark-nested-classes-from-json

fd3cxomn

fd3cxomn1#

感谢glennie helles sindholt和其他Google用户:

.select($"nested.*", $"a_str", $"a_long")

github存储库也已更新。

相关问题