我使用hadoop3.3.0和spark3.0.1-bin-hadoop3.2。我的python ide是eclipse版本2020-12。我尝试用 KafkaUtils
pyspark模块。我对pyspark和eclipse的配置参考是这个站点。像下面这样的简单代码毫无例外地工作良好。
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("Kafka2RDD").setMaster("local[*]")
sc = SparkContext(conf = conf)
data = [1, 2, 3, 4, 5, 6]
distData = sc.parallelize(data)
print(distData.count())
但我发现spark3pyspark模块不包含 KafkaUtils
完全。以下代码无法导入 KafkaUtils
.
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.kafka import OffsetRange
所以,我把spark从 3.0.1-bin-hadoop3.2
至 2.4.7-bin-hadoop2.7
. 这样我就可以成功导入 KafkaUtils
在eclipseide上。但这一次,与spark版本相关的异常被连续抛出。
Traceback (most recent call last):
File "/home/jhwang/eclipse-workspace/BigData_Etl_Python/com/aaa/etl/kafka_spark_rdd.py", line 36, in <module>
print(distData.count())
File "/usr/local/spark/python/pyspark/rdd.py", line 1055, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/local/spark/python/pyspark/rdd.py", line 1046, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/usr/local/spark/python/pyspark/rdd.py", line 917, in fold
vals = self.mapPartitions(func).collect()
File "/usr/local/spark/python/pyspark/rdd.py", line 816, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/python/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/python/anaconda3/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentException: Unsupported class file major version 55
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:166)
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:148)
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:136)
我究竟如何才能在spark 3.0.1上导入kafkautils和相关模块。在哪里 KafkaUtils
spark3.0.1的pyspark模块,或者如何安装python模块?感谢您的回复。致以最诚挚的问候。
暂无答案!
目前还没有任何答案,快来回答吧!