无法将pyspark 3.0.0-preview2与kafka集成

btxsgosb  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(251)

我正在尝试创建一个pyspark应用程序,从kafka生产者那里读取流数据。我也开始在“Kafka制作人”localhost:9092". 我用的是Kafka2.13-2.4.1。
这是我的pyspark流媒体应用程序代码:

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
import os

# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:2.4.5'

topic = "covid"

if __name__ == "__main__":
    print("Starting Structured Streaming")

    #schema = S

    spark = SparkSession \
        .builder \
        .appName("twitterStreamer") \
        .master("local[*]") \
        .config("spark.jars","file:///C://Users//Kaushik Bhartiya//Desktop//Kartik//BI-Dashboard//jars//spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar") \
        .config("spark.driver.extraClassPath", "file:///C://Users//Kaushik Bhartiya//Desktop//Kartik//BI-Dashboard//jars//kafka-clients-2.5.0.jar") \
        .getOrCreate()

    kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","covid").load()

    print("=========================")
    #sc = SparkContext(appName="twitterStreamer")
    #sc.setLogLevel("WARN")

运行时,会出现以下错误:

20/04/21 15:41:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/04/21 15:41:24 INFO SparkContext: Running Spark version 3.0.0-preview2
20/04/21 15:41:24 INFO ResourceUtils: ==============================================================
20/04/21 15:41:24 INFO ResourceUtils: Resources for spark.driver:

20/04/21 15:41:24 INFO ResourceUtils: ==============================================================
20/04/21 15:41:24 INFO SparkContext: Submitted application: StructuredKafkaWordCount
20/04/21 15:41:25 INFO SecurityManager: Changing view acls to: Kaushik Bhartiya
20/04/21 15:41:25 INFO SecurityManager: Changing modify acls to: Kaushik Bhartiya
20/04/21 15:41:25 INFO SecurityManager: Changing view acls groups to:
20/04/21 15:41:25 INFO SecurityManager: Changing modify acls groups to:
20/04/21 15:41:25 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(Kaushik Bhartiya); groups with view permissions: Set(); users  with modify permissions: Set(Kaushik Bhartiya); groups with modify permissions: Set()
20/04/21 15:41:28 INFO Utils: Successfully started service 'sparkDriver' on port 63341.
20/04/21 15:41:28 INFO SparkEnv: Registering MapOutputTracker
20/04/21 15:41:28 INFO SparkEnv: Registering BlockManagerMaster
20/04/21 15:41:28 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/04/21 15:41:28 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/04/21 15:41:28 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
20/04/21 15:41:28 INFO DiskBlockManager: Created local directory at C:\Users\Kaushik Bhartiya\AppData\Local\Temp\blockmgr-01094264-a7b0-4e00-9fd0-8532dda94a0e
20/04/21 15:41:28 INFO MemoryStore: MemoryStore started with capacity 366.3 MiB
20/04/21 15:41:29 INFO SparkEnv: Registering OutputCommitCoordinator
20/04/21 15:41:30 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/04/21 15:41:30 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://DESKTOP-TJ2G2FM:4040
20/04/21 15:41:31 INFO Executor: Starting executor ID driver on host DESKTOP-TJ2G2FM
20/04/21 15:41:31 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63398.
20/04/21 15:41:31 INFO NettyBlockTransferService: Server created on DESKTOP-TJ2G2FM:63398
20/04/21 15:41:31 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/04/21 15:41:31 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, DESKTOP-TJ2G2FM, 63398, None)
20/04/21 15:41:31 INFO BlockManagerMasterEndpoint: Registering block manager DESKTOP-TJ2G2FM:63398 with 366.3 MiB RAM, BlockManagerId(driver, DESKTOP-TJ2G2FM, 63398, None)
20/04/21 15:41:31 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, DESKTOP-TJ2G2FM, 63398, None)
20/04/21 15:41:31 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, DESKTOP-TJ2G2FM, 63398, None)
20/04/21 15:41:33 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/Users/Kaushik%20Bhartiya/Desktop/Kartik/BI-Dashboard/spark-warehouse').
20/04/21 15:41:33 INFO SharedState: Warehouse path is 'file:/C:/Users/Kaushik%20Bhartiya/Desktop/Kartik/BI-Dashboard/spark-warehouse'.
Traceback (most recent call last):
  File "C:/spark-3.0.0-bin-hadoop2.7/examples/src/main/python/sql/streaming/structured_kafka_wordcount.py", line 64, in <module>
    lines = spark\
  File "C:\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\streaming.py", line 406, in load
  File "C:\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py", line 1285, in __call__
  File "C:\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 98, in deco
  File "C:\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o28.load.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamWriteSupport
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(Unknown Source)
        at java.security.SecureClassLoader.defineClass(Unknown Source)
        at java.net.URLClassLoader.defineClass(Unknown Source)
        at java.net.URLClassLoader.access$100(Unknown Source)
        at java.net.URLClassLoader$1.run(Unknown Source)
        at java.net.URLClassLoader$1.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Unknown Source)
        at java.util.ServiceLoader$LazyIterator.nextService(Unknown Source)
        at java.util.ServiceLoader$LazyIterator.next(Unknown Source)
        at java.util.ServiceLoader$1.next(Unknown Source)
        at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)
        at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
        at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
        at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
        at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
        at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:644)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:170)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport
        at java.net.URLClassLoader.findClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        ... 44 more

20/04/21 15:41:34 INFO SparkContext: Invoking stop() from shutdown hook
20/04/21 15:41:35 INFO SparkUI: Stopped Spark web UI at http://DESKTOP-TJ2G2FM:4040
20/04/21 15:41:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/04/21 15:41:35 INFO MemoryStore: MemoryStore cleared
20/04/21 15:41:35 INFO BlockManager: BlockManager stopped
20/04/21 15:41:35 INFO BlockManagerMaster: BlockManagerMaster stopped
20/04/21 15:41:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/04/21 15:41:35 INFO SparkContext: Successfully stopped SparkContext
20/04/21 15:41:35 INFO ShutdownHookManager: Shutdown hook called
20/04/21 15:41:35 INFO ShutdownHookManager: Deleting directory C:\Users\Kaushik Bhartiya\AppData\Local\Temp\spark-639a5526-614d-4399-bda7-e113206f3a76
20/04/21 15:41:35 INFO ShutdownHookManager: Deleting directory C:\Users\Kaushik Bhartiya\AppData\Local\Temp\spark-385da83d-384e-4e56-af42-af5614985797
20/04/21 15:41:35 INFO ShutdownHookManager: Deleting directory C:\Users\Kaushik Bhartiya\AppData\Local\Temp\spark-385da83d-384e-4e56-af42-af5614985797\pyspark-bc843f59-81b8-431a-b88a-b21e0bcf4b4b

我甚至试着从spark的example文件夹中运行kafka示例,它给出了相同的错误。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题