无法使用spark读取kafka主题数据

trnvg8h3  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(391)

在我创建的一个名为 "sampleTopic" ```
sid,Believer

其中第一个参数是 `username` 第二个论点是 `song name` 用户经常听的。现在,我已经开始了 `zookeeper` ,  `Kafka server` ,和 `producer` 主题名称如上所述。我已使用 `CMD` . 现在,我想阅读spark中的主题,执行一些聚合,并将其写回stream。下面是我的代码:

package com.sparkKafka
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object SparkKafkaTopic {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("SparkKafka").master("local[*]").getOrCreate()
println("hey")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "sampleTopic1")
.load()
val query = df.writeStream
.outputMode("append")
.format("console")
.start().awaitTermination()

}
}

但是,当我执行上述代码时,它给出:

+----+--------------------+------------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+------------+---------+------+--------------------+-------------+
|null|[73 69 64 64 68 6...|sampleTopic1| 0| 4|2020-05-31 12:12:...| 0|
+----+--------------------+------------+---------+------+--------------------+-------------+

还有无限的循环信息

20/05/31 11:56:12 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-0d6807b9-fcc9-4847-abeb-f0b81ab25187--264582860-driver-0] Resetting offset for partition sampleTopic1-0 to offset 4.
20/05/31 11:56:12 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-0d6807b9-fcc9-4847-abeb-f0b81ab25187--264582860-driver-0] Resetting offset for partition sampleTopic1-0 to offset 4.
20/05/31 11:56:12 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-0d6807b9-fcc9-4847-abeb-f0b81ab25187--264582860-driver-0] Resetting offset for partition sampleTopic1-0 to offset 4.
20/05/31 11:56:12 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-0d6807b9-fcc9-4847-abeb-f0b81ab25187--264582860-driver-0] Resetting offset for partition sampleTopic1-0 to offset 4.
20/05/31 11:56:12 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-0d6807b9-fcc9-4847-abeb-f0b81ab25187--264582860-driver-0] Resetting offset for partition sampleTopic1-0 to offset 4.
20/05/31 11:56:12 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-0d6807b9-fcc9-4847-abeb-f0b81ab25187--264582860-driver-0] Resetting offset for partition sampleTopic1-0 to offset 4.

我需要如下输出:
![](https://i.stack.imgur.com/rJHn0.png)
根据斯里尼瓦斯的建议,我得到了以下结果:
![](https://i.stack.imgur.com/u0cVe.png)
不知道这里到底出了什么问题。请引导我通过它。
xesrikrc

xesrikrc1#

缺少spark sql kafka jar,该jar具有“kafka”数据源的实现。
您可以使用config选项添加jar,也可以构建fatjar,其中包括sparksqlkafka jar。请使用jar的相关版本

val spark = SparkSession.builder()
  .appName("SparkKafka").master("local[*]")
  .config("spark.jars","/path/to/spark-sql-kafka-xxxxxx.jar")
  .getOrCreate()
rryofs0p

rryofs0p2#

尝试添加 spark-sql-kafka 库到生成文件。检查下面。
构建.sbt

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0"  
// Change to Your spark version

pom.xml文件

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
    <version>2.3.0</version>    // Change to Your spark version
</dependency>

更改代码如下

package com.sparkKafka
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    case class KafkaMessage(key: String, value: String, topic: String, partition: Int, offset: Long, timestamp: String)

    object SparkKafkaTopic {

      def main(args: Array[String]) {
        //val spark = SparkSession.builder().appName("SparkKafka").master("local[*]").getOrCreate()
        println("hey")
        val spark = SparkSession.builder().appName("SparkKafka").master("local[*]").getOrCreate()
        import spark.implicits._
        val mySchema = StructType(Array(
          StructField("userName", StringType),
          StructField("songName", StringType)))
        val df = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092")
          .option("subscribe", "sampleTopic1")
          .load()

        val query = df
          .as[KafkaMessage]
          .select(split($"value", ",")(0).as("userName"),split($"value", ",")(1).as("songName"))
          .writeStream
          .outputMode("append")
          .format("console")
          .start()
          .awaitTermination()
      }
    }

     /*
        +------+--------+
        |userid|songname|
        +------+--------+
        |   sid|Believer|
        +------+--------+
       */

      }
    }

相关问题