pyspark应用程序仅部分利用dataproc集群资源

w1e3prcc  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(325)

我的pyspark应用程序在一个106,36mb的数据集(817.270条记录)上运行一个udf,使用常规python lambda函数大约需要100个小时。我已经生成了一个googledataproc集群,它有20个工作节点,每个节点有8个vcpu。但是,在执行时,总共只使用3个节点和3个vcpu。显然,我希望集群使用我提供的所有资源。
生成的Dataframe的默认分区数是8。我尝试将它重新分区为100,但是集群一直只使用3个节点和3个vcpu。另外,当我运行一个命令来检查spark看到的执行器数量时,它只有3个。
这是执行的pyspark代码:

from pyspark.sql.types import StringType, MapType
from pyspark.sql.functions import udf

customer_names = spark.createDataFrame(customer_names)

embargo_match_udf = udf(lambda x,y: embargoMatch(x,y), MapType(StringType(), StringType()))
customer_names = customer_names.withColumn('JaroDistance', embargo_match_udf('name','customer_code'))

result = customer_names.withColumn('jaro_similarity', customer_names.JaroDistance['max_jaro'])

result.write.format("com.databricks.spark.csv").save('gs://charles-embargo-bucket/sparkytuesday')

这里是一些Spark输出从我的jupyter笔记本看到

print(sc) -> <SparkContext master=yarn appName=PySparkShell>
print(result.rdd.getNumPartitions()) -> 8
result = result.repartition(100)
print(result.rdd.getNumPartitions()) -> 100
sc._jsc.sc().getExecutorMemoryStatus().size() -> 3
kq4fsx7k

kq4fsx7k1#

此外,您可能希望尝试以下操作,让pyspark通过动态分配动态调整应用程序中的执行器数量:

SparkContext.setSystemProperty("spark.dynamicAllocation.enabled", "true")
whitzsjs

whitzsjs2#

对于那些对我如何解决问题感兴趣的人:
默认情况下,我的spark上下文假设有两个工作节点,不管我在googlecloud的dataproc ui中生成了多少额外的节点。因此,我手动更改了spark上下文,如下所示:

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.conf import SparkConf

    sc.stop()
    SparkContext.setSystemProperty('spark.executor.cores', '4')
    SparkContext.setSystemProperty('spark.executor.instances', '5')
    sc = SparkContext("yarn", "embargotest")
    spark = SparkSession.builder.appName('embargotest').getOrCreate()

此外,在将.withcolumn函数应用于此Dataframe之前,我将customer\u names数据集显式划分为20个(4个核心x 5个示例)。

customer_names = spark.createDataFrame(customer_names).repartition(20)

希望这能对有类似问题的人有所帮助!

相关问题