scala—spark structured streaming 2.2.0中附加的前批,其中源是kafka

sshcrbum  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(158)

我使用spark structured streaming 2.2.0,kafka作为源,redis作为接收器作为缓存。缓存中应该只填充最新的数据。但问题是,每当我得到新的批时,它总是附加上以前的批并被存储,这是错误的。我只需要存储当前批。
下一批只是上一批的更新版本。所以我试着用“更新”作为输出模式。但它并没有像预期的那样起作用。
还有什么我可以试试的吗?提前谢谢。
编辑
我加了一句 Trigger.ProcessingTime("1 second") -当我使用控制台作为格式时,它似乎可以工作。但在redis中,数据仍在追加。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题