flink中的事件重试机制

cyej8jka  于 2021-07-15  发布在  Flink
关注(0)|答案(1)|浏览(535)

我正在读Kafka的多个主题,然后做状态操作,然后再次保存到Kafka。这里是流程;

val stream1 = topic1.map(prepareData())
val stream2 = topic2.map(prepareData1())
val delayStream = delayqueue.map(prepareData2()) // these messages comes from delay

val enrichedResults = stream1.union(stream2).union(delayStream).map(stateOperation)
val taggedStream = enrichedResults.process(tagStreamAsRetryOrNot)
val retryMessages = taggedStream.getSideOutput(tag)

retryMessages.addSink(kafkaDelayQueue1)
taggedStream.addSink(targetTopic)

在此流中,如果找不到任何状态,则此消息将发送到 delay queue . 稍后我将处理此消息。
延迟流的另一侧: delayQueue(written by flink) => consumerAPP(check retrycounts and timestamps) => anotherQueue (这将再次被Flink消耗。我不想再向主主题发送消息,因为主主题也被其他团队占用。)
这个方法好吗?这个流程可以用更少的努力来改善吗?或者有什么最佳实践吗?

krugob8w

krugob8w1#

通常处理这种情况的方法是缓冲状态中的早期消息,直到承载扩展所需数据的预期消息到达另一个流。你可以在短时间内完成 KeyedCoProcessFunction ,例如。
flinksql(和表api)的设置使这种流连接更容易。
有关该主题的更多信息,请参阅有关sql连接的文档。

相关问题