Kafka - 02 - Python 简单实现生产者和消费者

x33g5p2x  于2021-09-19 转载在 Kafka  
字(1.5k)|赞(0)|评价(0)|浏览(570)

之前我们学习的时候,说是有四个核心的 API,现在是 5 个,可以一起探究探究。
本文简单的实现了生产者和消费者的 Python 代码,对初学者了解以下这两个接口有所帮助,更多详细的信息:

  • kafka中文教程:是基于 Java 的,但是部分知识点可以共享。
  • kafka-python:基于 Python 的,但是是英文的,英文不好的阅读起来不太方便,google 直接翻译的有点不太专业。

1. Kafka 的 5 个核心 API:

  • Producer API:应用程序发送数据流到 Kafka 集群中的 topic;

  • Consumer API:应用程序从 topic 中读取数据流

  • Streams API:从输入 topic 转换数据流到输出 topic;

  • Connect API:实现连接器(connector),进行系统与 Kafka之间的数据传输

  • Admin API管理和检查 topic,broker 和其他 Kafka 对象;
    因为教程是基于 Java 的,所以,我们这里有所不同;

  • 通过命令安装 kafka 包,pip install kafka

五个接口的详细信息

2. Producer Demo

  • 直接上代码演示:
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 默认异步
future = producer.send('my_topic', 'This is a message')

# 阻止“同步”发送
record_metadata = future.get(timeout=10)

print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
  • 对上述代码做一个解释:

  • from kafka import KafkaProducer:导入 KafkaProducer;

  • producer = KafkaProducer(bootstrap_servers='localhost:9092')创建连接到本机的9092端口;

  • future = producer.send('my_topic', 'This is a message'):向当前 my_topic发送一条信息 This is a message

  • record_metadata = future.get(timeout=10):等待十秒再次发送,防止同步。

  • 输出结果: 这是第三次发送消息,就是代码运行了三次,我们的偏移量为 3

my_topic
0
3

3. Consumer Demo

  • 代码:
from kafka import KafkaConsumer

consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092')
for message in consumer:
    print("topic = %s , partition = %d , offset = %d , value=%s" % (message.topic, message.partition, message.offset,message.value))

代码跟前边的生产者没啥区别,就是多了一个 for 循环来持续输出。
*
这是一个永久堵塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都有偏移。
*
consumer 是一个消息队列,当后台有消息时,这个消息队列就会自动增加.所以遍历也总是会有数据,当消息队列中没有数据时,就会堵塞等待消息带来。
*
每当你运行一次 Producer Demo,会在 Consumer Demo 里边出现一行信息;

请添加图片描述

相关文章

微信公众号

最新文章

更多