spark structured streaming json展平(explode+pivot不可能)

eeq64g8w  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(194)

我有一个结构化的流式处理作业,它从kafka队列中获取json消息并将它们存储到parquet/hdfs中。从那里,消息被按照时间表获取和分析,并存储到mysql数据库中。
现在我想实现流式传输,从kafka那里获取json消息,分析它并将其存储到mysql中。
我现在的问题是spark结构化流媒体不能让我使用pivot。
我现在的代码如下所示:

val df = readFromHdfsParquet
val df_exploded= df.select($"device", $"owner", $"timestamp", explode($"channels").as("exploded")
.withColumn("channelName", $"exploded.channelName")
.withColumn($"value", $"exploded.value")
.drop("exploded")

val df_grouped = df_exploded
.groupBy($"device, $"owner", $"timestamp")
.pivot("channelName")
.agg(first($"value", false)

这将导致所需的输出结构包含所有可用通道。
我的json如下所示:

{
  "device": "TypeA",
  "owner": "me",
  "timestamp": "2019-05-12 17:27:59",
  "channels": [
    {
      "channelName": "temperature",
      "state": 0,
      "value": "27"
    },
    {
      "channelName": "humidity",
      "state": 0,
      "value": "10"
    }
  ]
}

通道阵列的长度未设置,可以在不同设备之间更改。
我想要的是具有以下结构的Dataframe,并将其存储到mysql中。

|device|owner|timestamp          |temperature|humidity|
|TypeA |me   |2019-05-12 17:27:59|27         |10      |

结构化流媒体是如何做到这一点的?通过显式选择几个通道而不是所有通道也就足够了(f、 e.温度(不含湿度)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题