spark streaming为每个触发器进程间隔的每条记录选取最新事件

7gcisfzg  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(339)

我们有一个spark streaming(spark版本2.4.0)作业,它使用一个kafka主题(4个分区),其中包含作为json的id的业务更改。这些kafka值还包括recordtime字段和json对象中的其他字段。这个流作业根据id字段向上插入一个kudu表。
过了一段时间,我们注意到,有些更新实际上并没有反映某些id字段值的最新状态。我们假设每个分区有4个不同的执行器处理,当其中一个比另一个更早完成时,它会更新目标kudu表。如果我们有如下值:

(Id=1, val=A, RecordTime: 10:00:05 ) partition1
(Id=2, val=A, RecordTime: 10:00:04 ) partition1
(Id=1, val=B, RecordTime: 10:00:07 ) partition2
(Id=1, val=C, RecordTime: 10:00:06 ) partition3
(Id=2, val=D, RecordTime: 10:00:05 ) partition1
(Id=2, val=C, RecordTime: 10:00:06 ) partition4
(Id=1, val=E, RecordTime: 10:00:03 ) partition4

那么kudu表应该是这样的:
idvaluerecordtime1b10:00:072c10:00:06
但是,有时我们看到Kudu的table是这样的:
idvaluerecordtime1a10:00:052c10:00:06
触发间隔为1分钟。
那么,如何实现目标kudu表的有序更新呢。
我们应该使用单个分区进行排序,但如果我们这样做的利弊?
对于spark streaming,我们如何在每个触发间隔挑选最新的记录和值
根据id和recordtime更新kudu表,但是如何?
我们还有别的办法可以考虑吗?
希望我能充分解释我的问题。简单地说,我们如何在spark streaming中实现每个微批间隔的事件排序?
特别感谢任何能帮助我的人。

0lvr5msh

0lvr5msh1#

当您从kafka获取数据时,回想一下kafka只在主题分区中提供排序保证是很有用的。
因此,如果让kafka生产者将相同id的所有消息生成到同一分区中,就可以解决问题。这可以通过kafkaproducer中的自定义paritioner来实现,也可以简单地将id的值用作kafka消息的“key”部分。
如果你没有控制Kafka生产者你将需要使你的Spark流作业状态。在这里,具有挑战性的部分是定义一个时间框架,即您的作业应该等待具有相同id的其他消息到达的时间。就几秒钟?也许几个小时?我的经验是,这个问题很难回答,有时答案是“几个小时”,这意味着你需要保持状态几个小时,这可能会让你的工作失去记忆。

相关问题