apache spark与Kafka的集成

yeotifhr  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(393)

我正在学习关于Kafka和spark的udemy课程,我正在学习apachespark与Kafka的集成
下面是apachespark的代码

SparkSession session = SparkSession.builder().appName("KafkaConsumer").master("local[*]").getOrCreate();
  session.sparkContext().setLogLevel("ERROR");
  Dataset<Row> df = session
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "second_topic").load();

df.show();

下面是pom.xml文件的内容

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example.kafka.spark</groupId>
  <artifactId>Kafka-Spark-Integration-Code</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.0.0</version>
    </dependency> 
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<!--    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.0.0</version>
    </dependency> -->

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
        <version>3.0.0</version>
   </dependency>

 </dependencies>
</project>

然而,当我运行的代码,我得到下面的错误,我无法解决。我在mx-linux上使用openjdk8和spark3。谢谢

exception in thread "main" java.lang.ClassFormatError: Invalid code attribute name index 24977 in class file org/apache/spark/sql/execution/columnar/InMemoryRelation
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:83)
    at org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:132)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:132)
    at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:131)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:323)
    at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1107)
    at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:157)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:155)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:152)
    at org.apache.spark.sql.streaming.DataStreamReader.<init>(DataStreamReader.scala:519)
    at org.apache.spark.sql.SparkSession.readStream(SparkSession.scala:657)
    at example.code.spark.kafka.KafkaSparkConsumer.main(KafkaSparkConsumer.java:19)
moiiocjp

moiiocjp1#

您可以遵循《结构化流媒体+Kafka集成指南》中给出的示例:

SparkSession session = SparkSession.builder()
  .appName("KafkaConsumer")
  .master("local[*]")
  .getOrCreate();

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "second_topic")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

使用数据。《结构化流媒体编程指南》向您展示了如何将数据打印到控制台:

StreamingQuery query = df
  .writeStream()
  .format("console")
  .outputMode("append")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .start();

query.awaitTermination();

相关问题