我使用structure streaming从kafka读取json数据,一些窗口时间序列数据存储在json数据中。json格式如下:
{"id": "fd78sfsdfsd8vs",
"item": [{"data_identifier": "algid1_set1_totalcount_lstm",
"time_series": [{"time": "20200903 00:00:00", "value": 342342.12},
{"time": "20200903 00:00:05", "value": 342421.88},
{"time": "20200903 00:00:10", "value": 351232.92}]},
{"data_identifier": "algid2_set2_totalcount_lstm",
"time_series": [{"time": "20200903 00:00:00", "value": 342342.12},
{"time": "20200903 00:00:05", "value": 342421.88},
{"time": "20200903 00:00:10", "value": 351232.92}]}
]
}
然后对json数据进行处理,得到一个dataframe,并对dataframe中的时间序列数据进行异常检测。Dataframe如下:
+--------------+----------------------+-----------------+---------+
| id|data_identifier_method| time| value|
+--------------+----------------------+-----------------+---------+
|fd78sfsdfsd8vs| algid1_set1_total...|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs| algid1_set1_total...|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs| algid1_set1_total...|20200903 00:00:10|351232.92|
|fd78sfsdfsd8vs| algid2_set2_total...|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs| algid2_set2_total...|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs| algid2_set2_total...|20200903 00:00:10|351232.92|
|fd78sfsdfsd8vs| algid1_set1_total...|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs| algid1_set1_total...|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs| algid1_set1_total...|20200903 00:00:10|351232.92|
|fd78sfsdfsd8vs| algid2_set2_total...|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs| algid2_set2_total...|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs| algid2_set2_total...|20200903 00:00:10|351232.92|
|fd78sfsdfsd8vs| algid1_set1_total...|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs| algid1_set1_total...|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs| algid1_set1_total...|20200903 00:00:10|351232.92|
|fd78sfsdfsd8vs| algid2_set2_total...|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs| algid2_set2_total...|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs| algid2_set2_total...|20200903 00:00:10|351232.92|
|fd78sfsdfsd8vs| algid1_set1_total...|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs| algid1_set1_total...|20200903 00:00:05|342421.88|
+--------------+----------------------+-----------------+---------+
由于structure streaming的特性,我希望每个json都能独立处理,而与其他json无关。我不知道我的这个想法能否实现?如果可能的话,如何实现这一目标。
暂无答案!
目前还没有任何答案,快来回答吧!