尝试使用spark结构化流来消费kafka流

6kkfgxo0  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(321)

我是新来Kafka的。我使用python设置了一个twitter侦听器,它正在localhost:9092 kafka 服务器。我可以使用kafka客户机工具(conduktor)和命令“bin/kafka-console-consumer.sh——bootstrap server”来使用侦听器生成的流localhost:9092 --topic twitter——从一开始,“但是当我尝试使用spark结构化流媒体来使用同一个流时,它没有捕获并抛出错误-找不到数据源:kafka。请按照“结构化流媒体+Kafka集成指南”的部署部分部署应用程序。;找到下面的截图
命令输出-消耗数据
spark consumer的jupyter输出-不使用数据
我的生产者或听众代码:

auth = tweepy.OAuthHandler("**********", "*************")
auth.set_access_token("*************", "***********************")

# session.set('request_token', auth.request_token)

api = tweepy.API(auth)
class KafkaPushListener(StreamListener):          
    def __init__(self):
        #localhost:9092 = Default Zookeeper Producer Host and Port Adresses
        self.client = pykafka.KafkaClient("0.0.0.0:9092")

    #Get Producer that has topic name is Twitter
        self.producer = self.client.topics[bytes("twitter", "ascii")].get_producer()

    def on_data(self, data):
        #Producer produces data for consumer
        #Data comes from Twitter
        self.producer.produce(bytes(data, "ascii"))
        return True

    def on_error(self, status):
        print(status)
        return True
twitter_stream = Stream(auth, KafkaPushListener())
twitter_stream.filter(track=['#fashion'])

spark结构化流媒体的用户访问

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "twitter") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
xv8emn3q

xv8emn3q1#

当我提交spark作业时,发现缺少了什么,我必须包含正确的依赖包版本。我有spark 3.0.0,因此,我包括了-org.apache。spark:spark-sql-kafka-0-10_2.12:3.0.0包

vdzxcuhz

vdzxcuhz2#

添加 sink 它将启动Kafka的消费数据。
检查以下代码。

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "twitter") \
  .load()

query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .outputMode("append") \
    .format("console") \ # here I am using console format .. you may change as per your requirement.
    .start()

query.awaitTermination()

相关问题