使用localhost服务器从结构化spark流写入kafka

xt0899hw  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(175)

我正在开发结构化的sparkstream应用程序,经过分析,我最终将输出写入kafka。Dataframe的架构是

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)

写给Kafka的代码是

query = testDataFrame \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("topic", output_topic) \
    .option("checkpointLocation", ./tmp") \
    .start()

执行此代码后,我没有收到任何错误,不幸的是,当我从终端运行kafka consumer命令时,我没有收到任何错误:

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output_topic --from-beginning

请注意,Kafka和zookeeper都在本地运行。
我用来提交项目的命令:

bin/spark-submit --master local --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.apache.kafka:kafka-clients:1.0.0 pathtoPythonClass

任何我在哪里做的错误或任何类型的帮助的想法将高度赞赏。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题