我们有一个标准的Kafka - Spark - Cassandra管道结构。(使用scala)消息由来自Kafka的spark作业消耗,如果spark作业由于某些问题(作业停机,资源问题等)无法处理数据,我们将未处理的消息存储在Cassandra的表中(列-原始消息,时间戳,主题和错误)。作业启动并重新运行后,重新处理此数据的最佳方法是什么?因为到那时,Kafka也会有其他新的信息。
krugob8w1#
Kafka有一种机制,可以跟踪哪些消息已经被消费,并将每个消费者组的状态存储为“偏移量”。您的Spark应用程序使用偏移量,因此它不会返回并再次处理所有消息-它只是返回到流中消耗的最后一条消息并处理下一条消息。干杯!
1条答案
按热度按时间krugob8w1#
Kafka有一种机制,可以跟踪哪些消息已经被消费,并将每个消费者组的状态存储为“偏移量”。
您的Spark应用程序使用偏移量,因此它不会返回并再次处理所有消息-它只是返回到流中消耗的最后一条消息并处理下一条消息。干杯!