目前我正在与kafka/zookeeper和pyspark(1.6.0)合作。我已经成功地创建了一个kafka消费者,它使用 KafkaUtils.createDirectStream()
.
所有的流媒体都没有问题,但我认识到,在我使用了一些消息之后,我的Kafka主题没有更新到当前偏移量。
因为我们需要更新主题,在这里有一个监控,这是有点奇怪。
在spark的文档中,我发现了以下评论:
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
directKafkaStream\
.transform(storeOffsetRanges)\
.foreachRDD(printOffsetRanges)
如果您希望基于zookeeper的kafka监视工具显示流应用程序的进度,可以使用该工具来更新zookeeper。
以下是文档:http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html#approach-2个直接接近无接收机
我在scala中找到了一个解决方案,但在python中找不到等效的解决方案。下面是scala示例:http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
问题
但问题是,从那时起,我如何才能更新Zookeeper?
2条答案
按热度按时间fcipmucu1#
我也遇到了类似的问题。您是对的,使用directstream意味着直接使用kafka低级api,它没有更新reader offset。scala/java有几个例子,但python没有。但是你自己做很容易,你需要做的是:
从开头的偏移量读取
在末尾保存偏移
例如,我在redis中保存每个分区的偏移量,方法是:
首先,您可以使用:
对于一些使用zk跟踪偏移的工具,最好将偏移保存在zookeeper中。本页:https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html 描述如何设置偏移量,基本上,zk节点是:/consumers/[consumer\u name]/offsets/[topic name]/[partition id],因为我们使用的是directstream,所以您必须组成一个consumer name。
h7wcgrx32#
我用python-kazoo库编写了一些函数来保存和读取kafka偏移量。
获取kazoo客户端单例的第一个函数:
然后读取和写入偏移量的函数:
然后,在开始流之前,您可以从zookeeper读取偏移量,并将它们传递给createdirectstream
fromOffsets
参数: