pyspark-sparkstreaming-python v3.5-java.lang.classcastexception:java.lang.integer不能转换为java.lang.long

os8fio9y  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(452)

我是新的Spark流,试图读取数据在Kafka经纪人。
下面是我的代码:

def __init__(self):
    self.spark = SparkSession \
        .builder \
        .appName("TestApp") \
        .config("k1", "v1") \
        .getOrCreate()
    self.ssc = StreamingContext(self.spark.sparkContext, 1)

def StreamingObject(self):
    kafkaParams = {'metadata.broker.list': 'localhost:9092'}
    topic = "Topic2"
    topicpartion = TopicAndPartition(topic, 0)
    fromoffset = {topicpartion: 0}
    kvs = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets = fromoffset)
    words = kvs.flatMap(lambda line: line.split(","))
    words.pprint()

    self.ssc.start()
    self.ssc.awaitTermination()

最后一步是打印我从代理获取的任何内容,但得到下面的错误消息。

Traceback (most recent call last):
  File "C:/Users/<user>/PycharmProjects/GCPProject/SStreaming.py", line 72, in <module>
    objss.StreamingObject()
  File "C:/Users/<user>/PycharmProjects/GCPProject/SStreaming.py", line 40, in StreamingObject
    kvs = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets = fromoffset)
  File "C:\spark\spark-2.4.0-bin-hadoop2.7\spark-2.4.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\kafka.py", line 130, in createDirectStream
  File "C:\spark\spark-2.4.0-bin-hadoop2.7\spark-2.4.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py", line 1133, in __call__
  File "C:\spark\spark-2.4.0-bin-hadoop2.7\spark-2.4.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 63, in deco
  File "C:\spark\spark-2.4.0-bin-hadoop2.7\spark-2.4.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o37.createDirectStreamWithoutMessageHandler.
: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper$$anonfun$17.apply(KafkaUtils.scala:717)
        at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
        at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.copyToBuffer(TraversableOnce.scala:275)
        at scala.collection.AbstractTraversable.copyToBuffer(Traversable.scala:104)
        at scala.collection.MapLike$class.toBuffer(MapLike.scala:326)
        at scala.collection.AbstractMap.toBuffer(Map.scala:59)
        at scala.collection.MapLike$class.toSeq(MapLike.scala:323)
        at scala.collection.AbstractMap.toSeq(Map.scala:59)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:717)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:688)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Unknown Source)

19/09/18 23:23:43 INFO SparkContext: Invoking stop() from shutdown hook
19/09/18 23:23:43 INFO SparkUI: Stopped Spark web UI at http://192.168.1.6:4040
19/09/18 23:23:43 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/09/18 23:23:43 INFO MemoryStore: MemoryStore cleared
19/09/18 23:23:43 INFO BlockManager: BlockManager stopped
19/09/18 23:23:43 INFO BlockManagerMaster: BlockManagerMaster stopped
19/09/18 23:23:43 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/09/18 23:23:43 INFO SparkContext: Successfully stopped SparkContext
19/09/18 23:23:43 INFO ShutdownHookManager: Shutdown hook called
19/09/18 23:23:43 INFO ShutdownHookManager: Deleting directory C:\Users\<user>\AppData\Local\Temp\spark-4ac3750b-cdf3-4d1d-823c-2b60f62db15a
19/09/18 23:23:43 INFO ShutdownHookManager: Deleting directory C:\Users\<user>\AppData\Local\Temp\spark-4ac3750b-cdf3-4d1d-823c-2b60f62db15a\pyspark-e791b26d-bacb-47ab-b7ae-2ae66a811158

数据是csv格式的,存在于kafka代理中。我不确定问题出在哪里。请帮我从Kafka经纪人那里取些信息。
我正在研究 Spark 2.2.0 以及 spark-streaming-kafka 0.9.0 并在中设置此环境 windows .

bvjveswy

bvjveswy1#

错误 java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class 发生,可能是因为您的本地scala版本与spark依赖的scala版本不匹配。
请检查您的scala版本。spark 2.2.0使用scala 2.11

相关问题