一个基于流结构的实时流作业

drnojrws  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(147)

我使用structure streaming从kafka读取json数据,一些窗口时间序列数据存储在json数据中。json格式如下:

{"id": "fd78sfsdfsd8vs", 
 "item": [{"data_identifier": "algid1_set1_totalcount_lstm",
           "time_series": [{"time": "20200903 00:00:00", "value": 342342.12},
                           {"time": "20200903 00:00:05", "value": 342421.88},
                           {"time": "20200903 00:00:10", "value": 351232.92}]},
          {"data_identifier": "algid2_set2_totalcount_lstm",
           "time_series": [{"time": "20200903 00:00:00", "value": 342342.12},
                           {"time": "20200903 00:00:05", "value": 342421.88},
                           {"time": "20200903 00:00:10", "value": 351232.92}]}
         ]
}

然后对json数据进行处理,得到一个dataframe,并对dataframe中的时间序列数据进行异常检测。Dataframe如下:

+--------------+----------------------+-----------------+---------+
|            id|data_identifier_method|             time|    value|
+--------------+----------------------+-----------------+---------+
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:10|351232.92|
|fd78sfsdfsd8vs|  algid2_set2_total...|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs|  algid2_set2_total...|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs|  algid2_set2_total...|20200903 00:00:10|351232.92|
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:10|351232.92|
|fd78sfsdfsd8vs|  algid2_set2_total...|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs|  algid2_set2_total...|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs|  algid2_set2_total...|20200903 00:00:10|351232.92|
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:10|351232.92|
|fd78sfsdfsd8vs|  algid2_set2_total...|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs|  algid2_set2_total...|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs|  algid2_set2_total...|20200903 00:00:10|351232.92|
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:05|342421.88|
+--------------+----------------------+-----------------+---------+

由于structure streaming的特性,我希望每个json都能独立处理,而与其他json无关。我不知道我的这个想法能否实现?如果可能的话,如何实现这一目标。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题