Pypark结构化流kafka配置错误

8zzbczxx  于 2021-06-08  发布在  Kafka
关注(0)|答案(4)|浏览(472)

我之前已经成功地将pyspark用于spark流(spark 2.0.2)和kafka(0.10.1.0)一起使用,但是我的目的更适合结构化流。我尝试在网上使用这个例子:https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html
使用以下类似代码:

ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
query = ds1
  .writeStream
  .outputMode('append')
  .format('console')
  .start()
query.awaitTermination()

但是,我总是会出现以下错误:

: org.apache.kafka.common.config.ConfigException: 
Missing required configuration "partition.assignment.strategy" which has no default value

在创建ds1时,我还尝试将此添加到我的选项集中:

.option("partition.assignment.strategy", "range")

但是,即使显式地给它赋值也不能阻止错误,我在网上或Kafka文档中找到的任何其他值(如“roundrobin”)也不能阻止错误。
我还使用“assign”选项尝试了这个方法,并得到了相同的错误(我们的kafka主机是为assign设置的——每个使用者只分配了一个分区,我们没有任何重新平衡)。
你知道这是怎么回事吗?文档并没有什么帮助(可能因为它还处于实验阶段)。另外,是否有使用kafkautils的结构化流媒体?或者这是唯一的入口?

9fkzdhlc

9fkzdhlc1#

如何从pyspark启动的笔记本运行这个?
我给os.environ['pyspark\u submit\u args']='--jars kafka-clients-0.10.0.1.jar pyspark shell'
即使在这之后,我也会
“:org.apache.spark.sql.streaming.streamingqueryexception:缺少没有默认值的必需配置”partition.assignment.strategy“。”

eivgtgni

eivgtgni2#

kafka 0.10.1.*客户端中存在一个已知问题,您不应将其与spark一起使用,因为它可能会由于https://issues.apache.org/jira/browse/kafka-4547 . 您可以使用0.10.0.1客户端,它应该与0.10.1.*kafka集群一起工作。
要在结构化流媒体中将kafka配置发送到kafka consumer客户端,需要添加 kafka. 前缀,例如 .option("kafka.partition.assignment.strategy", "range") . 但是,您不需要设置 kafka.partition.assignment.strategy 因为它有一个默认值。我的直觉是你可能把kafka0.8.*和0.10.*jar都放在类路径上,然后加载错误的类。
您想使用kafkautils中的哪个api,但在结构化流媒体中缺少了它?spark 2.2.0刚刚推出,您可以在结构化流媒体中使用带有kafka的批处理或流式查询。阅读http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html 例如。

a6b3iqyw

a6b3iqyw3#

我在spark 2.3.2中使用结构化流媒体时遇到了这个问题。就像@bruce.liu在他的回答中暗示的那样,当spark的jvm的类路径中没有kafka客户机…jar文件时,就会发生这种情况。
我通过下载kafka客户端jar修复了它(https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/0.10.0.1)然后提供给spark提交使用 --jars 以及 --driver-class-path 选项。
像这样:

spark-submit --class MainClass --master local[*] --jars local:///root/sources/jars/kafka-clients-0.10.0.1.jar --driver-class-path local:///root/sources/jars/kafka-clients-0.10.0.1.jar app.jar
2guxujil

2guxujil4#

添加 kafka-clients-*.jar 到您的spark jar文件夹,然后重新启动spark主程序和从程序。那你就不用再加了 .option("partition.assignment.strategy", "range")

相关问题