ksql-从json数组创建流

rta7y2nd  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(421)

我的Kafka主题是以这种格式推送数据(来自collectd):

[{"values":[100.000080140372],"dstypes":["derive"],"dsnames":["value"],"time":1529970061.145,"interval":10.000,"host":"k5.orch","plugin":"cpu","plugin_instance":"23","type":"cpu","type_instance":"idle","meta":{"network:received":true}}]

它是数组、int和float的组合。。。整个过程都在一个json数组中。结果,我花了很多时间使用ksql来处理这些数据。
当我创建一个'默认'流作为

create stream cd_temp with (kafka_topic='ctd_test', value_format='json');

我得到这个结果:

ksql> describe cd_temp;

 Field   | Type                      
-------------------------------------
 ROWTIME | BIGINT           (system) 
 ROWKEY  | VARCHAR(STRING)  (system) 
-------------------------------------

任何选择都将返回rowtime和rowkey的8位十六进制值。
我花了一些时间试图提取json字段,但没有效果。我关心的是:

ksql> print 'ctd_test' from beginning;
Format:JSON
com.fasterxml.jackson.databind.node.ArrayNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode

有没有可能这个主题不能用在ksql中?是否有一种技术可以将外部数组解包以获取内部有趣的位?

62lalag4

62lalag41#

在撰写本文时(2018年6月),ksql无法处理一个json消息,其中整个内容都嵌入到顶级数组中。有一个github问题来跟踪这个。我建议在这个问题上增加一个1+1的投票来提高它的优先权。
另外,我注意到create stream语句没有定义json消息的模式。虽然这在这种情况下没有帮助,但对于其他json输入格式,您需要这样做,即您创建的语句应该类似于:

create stream cd_temp (values ARRAY<DOUBLE>, dstypes ARRAY<VARCHAR>, etc) with (kafka_topic='ctd_test', value_format='json');

相关问题