flink 1.10 datastream api中的json数据聚合

lymgl2op  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(367)

我正在尝试使用kafka消息(作为flink1.10api流源)在elasticsearch中聚合数据。数据是以json格式接收的,json格式是动态的,下面给出了示例。我想用唯一的id将多个记录合并到一个文档中。数据是按顺序来的,它是时间序列数据。
源-汇kafka和目标-汇elasticseach 7.6.1 6
我没有找到任何好的例子,可以利用在下面的问题陈述。

Record : 1
{
"ID" : "1",
"timestamp" : "2020-05-07 14:34:51.325",
"Data" : 
{
 "Field1" : "ABC",
 "Field2" : "DEF"
}
}

Record : 2
{
"ID" : "1",
"timestamp" : "2020-05-07 14:34:51.725",
"Data" : 
{
 "Field3" : "GHY"
}
}

Result :

{
"ID" : "1",
"Start_timestamp" : "2020-05-07 14:34:51.325",
"End_timestamp" : "2020-05-07 14:34:51.725",
"Data" :
{
 "Field1" : "ABC",
 "Field2" : "DEF",
 "Field3" : "GHY"
}
}

以下是版本详细信息:
Flink1.10
flink kafka连接器2.11
flink elasticsearch连接器7.x
Kafka2.11
jdk 1.8版

umuewwlo

umuewwlo1#

你所要求的可以说是某种形式的加入,有很多方法可以通过flink来实现。apache flink培训中有一个状态丰富的示例,展示了如何使用 RichFlatMapFunction 这应该能帮助你开始。您需要首先阅读相关的培训材料——至少是关于数据管道和etl的部分。
使用这种方法最终要做的是按id(via)对流进行分区 keyBy ),然后使用键分区状态(可能是 MapState 在本例中,假设您要为每个id存储多个属性/值对)来存储来自记录(如记录1)的信息,直到您准备好发出结果为止。
顺便说一句,如果密钥集是无界的,您需要注意不要永远保持这种状态。或者在不再需要状态时清除它(如本例所示),或者使用statettl安排它的最终删除。
有关flink中其他类型联接的更多信息,请参阅此答案中的链接。

相关问题