我正在尝试使用spark结构化流来读取Kafka的数据cdh 6.3.2(具有一个节点的概念验证版本)和spark 2.4.0、kafka 2.2.1、pyspark 2.4.0)基本上,我在kafka中有一个主题,我正在使用python发布一些测试数据:
from time import sleep
from numpy.random import choice,randint
from kafka import KafkaProducer
import json
def get_random_value():
new_dict={}
branch_list=['MSK','SPB','KSM']
currency_list=['RUB','USD','EUR']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(0,100)
return new_dict
if name == 'main':
producer = KafkaProducer(bootstrap_servers=['myserver:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
my_topic = 'test'
while True:
for e in range(10):
data = get_random_value()
future = producer.send(topic = my_topic,value=data)
sleep(5)
至于spark,我有以下几点:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('firstStream').getOrCreate()
df = spark.readStream.format('kafka')\
.option('kafka.bootstrap.servers','myserver:9092')\
.option('subscribe','test')\
.option('startingOffsets','earliest')\
.load()\
.selectExpr('cast(value as string)')\
.writeStream\
.format('console')\
.start()\
.awaitTermination()
据我所知,我应该能够在控制台中看到输出,从中我提交了spark应用程序,但毕竟没有数据,只是我订阅了一个主题:
在此处输入图像描述
spark提交字符串:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 /root/myfolder/testStream.py
暂无答案!
目前还没有任何答案,快来回答吧!