在serializers.py中导致pyspark中的modulenotfounderror

xsuvu9jc  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(792)

我正在尝试将spark应用程序提交到我机器上的本地kubernetes集群(通过docker dashboard创建)。应用程序依赖于python包,我们称之为x。
以下是申请代码:

import sys
from pyspark import SparkContext
from pyspark.sql import SparkSession
datafolder = "/opt/spark/data" # Folder created in container by spark's docker file
sys.path.append(datafolder) # X is contained inside of datafolder
from X.predictor import * # import functionality from X

def apply_x_functionality_on(item):
    predictor = Predictor() # class from X.predictor
    predictor.predict(item)

def main():
    spark = SparkSession\
            .builder\
            .appName("AppX")\
            .getOrCreate()
    sc = spark.sparkContext
    data = []
    # Read data: [no problems there]
    ...
    data_rdd = sc.parallelize(data) # create RDD
    data_rdd.foreach(lambda item: apply_network(item)) # call function

if __name__ == "__main__":
    main()

最初我希望通过将x文件夹放到spark的data文件夹来避免此类问题。构建容器时,数据文件夹的所有内容都将复制到/opt/spark/data。我的spark应用程序将data文件夹的内容附加到系统路径,例如使用包x。嗯,我想是的。
在调用.foreach函数之前,一切正常。以下是日志的一个片段,其中包含错误描述:

20/11/25 16:13:54 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.1.0.60, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 587, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 74, in read_command
    command = serializer._read_with_length(file)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'X'

这里有很多类似的问题:一、二、三,但到目前为止,没有一个答案对我有帮助。
我尝试过:
我使用.zip(ed)x提交了申请(我将其压缩到容器中,通过将zip应用到x):

$SPARK_HOME/bin/spark-submit \
  --master k8s://https://kubernetes.docker.internal:6443 \
  --deploy-mode cluster \
  --conf spark.executor.instances=5 \
  --conf spark.kubernetes.container.image=kostjaigin/spark-py:v3.0.1-X_0.0.1 \
  --py-files "local:///opt/spark/data/X.zip" \
  local:///opt/spark/data/MyApp.py

我在spark上下文中添加了.zip(ed)x:

sc.addPyFile("opt/spark/data/X.zip")
mqxuamgl

mqxuamgl1#

我已经解决了这个问题:
在/opt/spark/data下创建了dependencies文件夹
将x置于依赖项
cd/opt/spark/data/dependencies&&zip-r../dependencies.zip。
应用中:

...
from X.predictor import * # import functionality from X
...

# zipped package

zipped_pkg = os.path.join(datafolder, "dependencies.zip")
assert os.path.exists(zipped_pkg)
sc.addPyFile(zipped_pkg)
...

将--py files标志添加到submit命令:

$SPARK_HOME/bin/spark-submit \
  --master k8s://https://kubernetes.docker.internal:6443 \
  --deploy-mode cluster \
  --conf spark.executor.instances=5 \
  --py-files "local:///opt/spark/data/dependencies.zip" \
  local:///opt/spark/data/MyApp.py

运行它
基本上,这都是关于添加一个dependencies.zip存档,其中包含所有必需的依赖项。

相关问题