spark stream不返回来自kafka主题的数据

bsxbgnwa  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(235)

我正在尝试使用spark结构化流来读取Kafka的数据cdh 6.3.2(具有一个节点的概念验证版本)和spark 2.4.0、kafka 2.2.1、pyspark 2.4.0)基本上,我在kafka中有一个主题,我正在使用python发布一些测试数据:

from time import sleep
from numpy.random import choice,randint
from kafka import KafkaProducer
import json

def get_random_value():
    new_dict={}
    branch_list=['MSK','SPB','KSM']
    currency_list=['RUB','USD','EUR']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(0,100)
    return new_dict

if name == 'main':
    producer = KafkaProducer(bootstrap_servers=['myserver:9092'],
                    value_serializer=lambda x: json.dumps(x).encode('utf-8'))
    my_topic = 'test'
    while True:
        for e in range(10):
            data = get_random_value()
            future = producer.send(topic = my_topic,value=data)
            sleep(5)

至于spark,我有以下几点:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('firstStream').getOrCreate()
df = spark.readStream.format('kafka')\
    .option('kafka.bootstrap.servers','myserver:9092')\
    .option('subscribe','test')\
    .option('startingOffsets','earliest')\
    .load()\
    .selectExpr('cast(value as string)')\
    .writeStream\
    .format('console')\
    .start()\
    .awaitTermination()

据我所知,我应该能够在控制台中看到输出,从中我提交了spark应用程序,但毕竟没有数据,只是我订阅了一个主题:
在此处输入图像描述
spark提交字符串:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0  /root/myfolder/testStream.py

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题