带有spark流的kafka问题:无法从包含现有数据的主题中读取数据

o2rvlv0m  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(345)

我试图阅读Kafka经纪人与Spark流,但我面临着一些问题。

def spark_streaming_from_STABLE_kafka_topic():
    conf = SparkConf().setMaster("spark://antonis-dell:7077").setAppName("Kafka_Spark")
    sc = SparkContext(conf=conf) 
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, 2)

    topic = "stable_topic"
    kvs = KafkaUtils.createDirectStream(ssc,
                                    [topic],
                                    {"metadata.broker.list": "my-broker",
                                    "auto.offset.reset": "smallest"},
                                    keyDecoder=lambda x: x,
                                    valueDecoder=lambda x: x
                                    )

    lines = kvs.window(2, 2).map(lambda x: x[1])
    lines.pprint()
    return ssc

if __name__ == "__main__":
    ssc = StreamingContext.getOrCreate('/home/antonis/Desktop/tmp/checkpoint_v06', lambda: spark_streaming_from_STABLE_kafka_topic())
    ssc.start()
    ssc.awaitTermination()

上面的代码只获取空批:

-------------------------------------------
Time: 2020-05-29 09:32:38
-------------------------------------------

-------------------------------------------
Time: 2020-05-29 09:32:40
-------------------------------------------

主题 stable_topic 包含固定大小的数据。它不会改变。我还有一个主题,它每秒钟接收一次数据。如果我用这个主题代替 stable_topic 然后取下 "auto.offset.reset": "smallest" 然后代码获取数据。
我想这是有问题的 {"auto.offset.reset": "smallest"} 但我想不通。
有人知道我做错了什么吗?

fnx2tebb

fnx2tebb1#

在以后的版本中, smallest 被替换为 earliest . 确保您检查了所使用版本的文档。
此外,还有 auto.offset.reset 如果使用者组已在使用主题中的某些数据,则配置将不会生效 stable_topic . 因此,您可以考虑更改 group.id 在你的流媒体工作中。
如果要分配新的 group.id ,确保设置 auto.offset.resetsmalles (或 earliest 在较新版本中)。

相关问题