我的pyspark应用程序在一个106,36mb的数据集(817.270条记录)上运行一个udf,使用常规python lambda函数大约需要100个小时。我已经生成了一个googledataproc集群,它有20个工作节点,每个节点有8个vcpu。但是,在执行时,总共只使用3个节点和3个vcpu。显然,我希望集群使用我提供的所有资源。
生成的Dataframe的默认分区数是8。我尝试将它重新分区为100,但是集群一直只使用3个节点和3个vcpu。另外,当我运行一个命令来检查spark看到的执行器数量时,它只有3个。
这是执行的pyspark代码:
from pyspark.sql.types import StringType, MapType
from pyspark.sql.functions import udf
customer_names = spark.createDataFrame(customer_names)
embargo_match_udf = udf(lambda x,y: embargoMatch(x,y), MapType(StringType(), StringType()))
customer_names = customer_names.withColumn('JaroDistance', embargo_match_udf('name','customer_code'))
result = customer_names.withColumn('jaro_similarity', customer_names.JaroDistance['max_jaro'])
result.write.format("com.databricks.spark.csv").save('gs://charles-embargo-bucket/sparkytuesday')
这里是一些Spark输出从我的jupyter笔记本看到
print(sc) -> <SparkContext master=yarn appName=PySparkShell>
print(result.rdd.getNumPartitions()) -> 8
result = result.repartition(100)
print(result.rdd.getNumPartitions()) -> 100
sc._jsc.sc().getExecutorMemoryStatus().size() -> 3
2条答案
按热度按时间kq4fsx7k1#
此外,您可能希望尝试以下操作,让pyspark通过动态分配动态调整应用程序中的执行器数量:
whitzsjs2#
对于那些对我如何解决问题感兴趣的人:
默认情况下,我的spark上下文假设有两个工作节点,不管我在googlecloud的dataproc ui中生成了多少额外的节点。因此,我手动更改了spark上下文,如下所示:
此外,在将.withcolumn函数应用于此Dataframe之前,我将customer\u names数据集显式划分为20个(4个核心x 5个示例)。
希望这能对有类似问题的人有所帮助!