我使用spark structured streaming 2.2.0,kafka作为源,redis作为接收器作为缓存。缓存中应该只填充最新的数据。但问题是,每当我得到新的批时,它总是附加上以前的批并被存储,这是错误的。我只需要存储当前批。
下一批只是上一批的更新版本。所以我试着用“更新”作为输出模式。但它并没有像预期的那样起作用。
还有什么我可以试试的吗?提前谢谢。
编辑
我加了一句 Trigger.ProcessingTime("1 second")
-当我使用控制台作为格式时,它似乎可以工作。但在redis中,数据仍在追加。
暂无答案!
目前还没有任何答案,快来回答吧!