streaming-kafka-0-10的消息

7lrncoxx  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(619)

我在用Kafka 1.0.1-kafka-3.1.0-SNAPSHOT 来自cdh(hadoop的cloudera发行版)
在我的batch-1边缘服务器上,我可以生成具有以下内容的消息:

kafka-console-producer --broker-list batch-1:9092 --topic MyTopic

感谢zookeeper在我的第一个节点上提供了以下功能,我可以使用这些信息:

kafka-console-consumer --zookeeper data1:2181 --topic MyTopic --from-beginning

但我没有得到任何引导服务器选项:

kafka-console-consumer --bootstrap-server batch-1:9092 --topic MyTopic --from-beginning

问题是我把Kafka用在了spark上: libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0" ```
val df = spark.readStream
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.option("kafka.bootstrap.servers", "batch-1:9092")
.option("subscribe", "MyTopic")
.load()

println("Select :")

val df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)")
.as[(String, String, String)]

println("Show :")

val query = df2.writeStream
.outputMode("append")
.format("console")
.start()

query.awaitTermination()

我做了一个 `export SPARK_KAFKA_VERSION=0.10` 在我的边缘。然后

spark2-submit --driver-memory 2G --jars spark-sql-kafka-0-10_2.11-2.3.0.cloudera4.jar --class "spark.streaming.Poc" poc_spark_kafka_2.11-0.0.1.jar

这迫使我使用 `kafka.bootstrap.servers` ,似乎有联系,但我没有收到任何信息。
输出与 `kafka-console-consumer` 与 `--bootstrap-server` 选项:

18/10/30 16:11:48 INFO utils.AppInfoParser: Kafka version : 0.10.0-kafka-2.1.0
18/10/30 16:11:48 INFO utils.AppInfoParser: Kafka commitId : unknown
18/10/30 16:11:48 INFO streaming.MicroBatchExecution: Starting new streaming query.

然后,什么也没有。我要接Zookeeper吗?怎样?
他们在这里说的“结构化流媒体+Kafka集成指南(Kafka代理版本0.10.0或更高版本)”是否存在版本冲突:https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html ?
我错过了什么?
aurhwmvo

aurhwmvo1#

解决方案
这个 /var/log/kafka/kafka-broker-batch-1.log 说: 2018-10-31 13:40:08,284 ERROR kafka.server.KafkaApis: [KafkaApi-51] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. 因此,我在集群节点上部署了3个代理,边缘上有一个网关,现在可以使用:
kafka-console-producer --broker-list data1:9092,data2:9092,data3:9092 --topic Test kafka-console-consumer --bootstrap-server data1:9092 --topic Test --from-beginning spark也很好用。

相关问题