我正在尝试将spark应用程序提交到我机器上的本地kubernetes集群(通过docker dashboard创建)。应用程序依赖于python包,我们称之为x。
以下是申请代码:
import sys
from pyspark import SparkContext
from pyspark.sql import SparkSession
datafolder = "/opt/spark/data" # Folder created in container by spark's docker file
sys.path.append(datafolder) # X is contained inside of datafolder
from X.predictor import * # import functionality from X
def apply_x_functionality_on(item):
predictor = Predictor() # class from X.predictor
predictor.predict(item)
def main():
spark = SparkSession\
.builder\
.appName("AppX")\
.getOrCreate()
sc = spark.sparkContext
data = []
# Read data: [no problems there]
...
data_rdd = sc.parallelize(data) # create RDD
data_rdd.foreach(lambda item: apply_network(item)) # call function
if __name__ == "__main__":
main()
最初我希望通过将x文件夹放到spark的data文件夹来避免此类问题。构建容器时,数据文件夹的所有内容都将复制到/opt/spark/data。我的spark应用程序将data文件夹的内容附加到系统路径,例如使用包x。嗯,我想是的。
在调用.foreach函数之前,一切正常。以下是日志的一个片段,其中包含错误描述:
20/11/25 16:13:54 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.1.0.60, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 587, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 74, in read_command
command = serializer._read_with_length(file)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
return self.loads(obj)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'X'
这里有很多类似的问题:一、二、三,但到目前为止,没有一个答案对我有帮助。
我尝试过:
我使用.zip(ed)x提交了申请(我将其压缩到容器中,通过将zip应用到x):
$SPARK_HOME/bin/spark-submit \
--master k8s://https://kubernetes.docker.internal:6443 \
--deploy-mode cluster \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=kostjaigin/spark-py:v3.0.1-X_0.0.1 \
--py-files "local:///opt/spark/data/X.zip" \
local:///opt/spark/data/MyApp.py
我在spark上下文中添加了.zip(ed)x:
sc.addPyFile("opt/spark/data/X.zip")
1条答案
按热度按时间mqxuamgl1#
我已经解决了这个问题:
在/opt/spark/data下创建了dependencies文件夹
将x置于依赖项
cd/opt/spark/data/dependencies&&zip-r../dependencies.zip。
应用中:
将--py files标志添加到submit命令:
运行它
基本上,这都是关于添加一个dependencies.zip存档,其中包含所有必需的依赖项。