在我创建的一个名为 "sampleTopic"
```
sid,Believer
其中第一个参数是 `username` 第二个论点是 `song name` 用户经常听的。现在,我已经开始了 `zookeeper` , `Kafka server` ,和 `producer` 主题名称如上所述。我已使用 `CMD` . 现在,我想阅读spark中的主题,执行一些聚合,并将其写回stream。下面是我的代码:
package com.sparkKafka
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object SparkKafkaTopic {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("SparkKafka").master("local[*]").getOrCreate()
println("hey")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "sampleTopic1")
.load()
val query = df.writeStream
.outputMode("append")
.format("console")
.start().awaitTermination()
}
}
但是,当我执行上述代码时,它给出:
+----+--------------------+------------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+------------+---------+------+--------------------+-------------+
|null|[73 69 64 64 68 6...|sampleTopic1| 0| 4|2020-05-31 12:12:...| 0|
+----+--------------------+------------+---------+------+--------------------+-------------+
还有无限的循环信息
20/05/31 11:56:12 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-0d6807b9-fcc9-4847-abeb-f0b81ab25187--264582860-driver-0] Resetting offset for partition sampleTopic1-0 to offset 4.
20/05/31 11:56:12 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-0d6807b9-fcc9-4847-abeb-f0b81ab25187--264582860-driver-0] Resetting offset for partition sampleTopic1-0 to offset 4.
20/05/31 11:56:12 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-0d6807b9-fcc9-4847-abeb-f0b81ab25187--264582860-driver-0] Resetting offset for partition sampleTopic1-0 to offset 4.
20/05/31 11:56:12 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-0d6807b9-fcc9-4847-abeb-f0b81ab25187--264582860-driver-0] Resetting offset for partition sampleTopic1-0 to offset 4.
20/05/31 11:56:12 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-0d6807b9-fcc9-4847-abeb-f0b81ab25187--264582860-driver-0] Resetting offset for partition sampleTopic1-0 to offset 4.
20/05/31 11:56:12 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-0d6807b9-fcc9-4847-abeb-f0b81ab25187--264582860-driver-0] Resetting offset for partition sampleTopic1-0 to offset 4.
我需要如下输出:
![](https://i.stack.imgur.com/rJHn0.png)
根据斯里尼瓦斯的建议,我得到了以下结果:
![](https://i.stack.imgur.com/u0cVe.png)
不知道这里到底出了什么问题。请引导我通过它。
2条答案
按热度按时间xesrikrc1#
缺少spark sql kafka jar,该jar具有“kafka”数据源的实现。
您可以使用config选项添加jar,也可以构建fatjar,其中包括sparksqlkafka jar。请使用jar的相关版本
rryofs0p2#
尝试添加
spark-sql-kafka
库到生成文件。检查下面。构建.sbt
pom.xml文件
更改代码如下