pyspark:java.lang.noclassdeffounderror:无法初始化类org.apache.spark.sql.kafka010.kafkadataconsumer$

cdmah0mi  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(394)

我试图从Kafka主题中获取消息并在控制台中打印它。我可以通过reader成功获取消息,但是当我试图通过writer在控制台中打印消息时,出现以下错误,
java.lang.noclassdeffounderror:无法初始化类org.apache.spark.sql.kafka010.kafkadataconsumer$

from pyspark.sql import SparkSession, Row
from pyspark.streaming import StreamingContext

spark = SparkSession.builder\
  .appName("Kafka Spark")\
  .config("spark.jars", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar")\
  .config("spark.executor.extraClassPath", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar")\
  .config("spark.executor.extraLibrary", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar")\
  .config("spark.driver.extraClassPath", "/C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar")\
  .getOrCreate()

dataFrameRead = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "Jim_Topic")\
    .load()\
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
    .writeStream\
    .format("console")\
    .trigger(continuous="1 second")\
    .start()

dataFrameRead.awaitTermination()```

Complete error, 

```Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/06/13 21:23:03 ERROR Utils: Aborting task
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.KafkaDataConsumer$
    at org.apache.spark.sql.kafka010.KafkaContinuousPartitionReader.<init>(KafkaContinuousStream.scala:195)
    at org.apache.spark.sql.kafka010.KafkaContinuousReaderFactory$.createReader(KafkaContinuousStream.scala:174)
    at org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD.compute(ContinuousDataSourceRDD.scala:83)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.$anonfun$compute$1(ContinuousWriteRDD.scala:53)
    at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD$$Lambda$2098/1825151985.apply$mcV$sp(Unknown Source)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
    at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.compute(ContinuousWriteRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2019/307923369.apply(Unknown Source)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
20/06/13 21:23:03 ERROR Utils: Aborting task

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:/Users/Macaulay/PycharmProjects/Spark/KafkaSpark/KafkaTopic2CSV.py", line 39, in <module>
    dataFrameRead.awaitTermination()
  File "C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\sql\streaming.py", line 103, in awaitTermination
    return self._jsq.awaitTermination()
  File "C:\Hadoop\Spark\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py", line 1285, in __call__
  File "C:/Hadoop/Spark/spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\sql\utils.py", line 102, in deco
    raise converted
pyspark.sql.utils.StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: [id = 7f1ca9c7-6345-46a2-9c94-cf22c31c30ff, runId = f540fad6-8797-489e-8fd3-00581282689a]
Current Committed Offsets: {}
Current Available Offsets: {}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToContinuousDataSource ConsoleWriter[numRows=20, truncate=true]
+- Project [cast(key#7 as string) AS key#21, cast(value#8 as string) AS value#22]
   +- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6d216413, KafkaSource[Subscribe[Jim_Topic]]

Process finished with exit code 1```

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题