pyspark Azure DataBricks中未释放内存

h6my8fg2  于 5个月前  发布在  Spark
关注(0)|答案(1)|浏览(72)

在Azure DataBricks(12.2 LTS(包括Apache Spark 3.3.2,Scala 2.12)中,我正在计算Standard_DS5_v2上的笔记本中运行以下代码:

df_spark = spark.sql(my_qyery_to_delta_table)
df_pandas = df_spark.toPandas()
df_spark.unpersist()

import gc
del df_pandas
del df_spark
gc.collect()

字符串
数据提取需要几分钟的时间来执行(这是大量的数据,包含例如列中的大型嵌套json)。但是我观察到内存在这个过程(屏幕)之后没有释放。如何解决这个问题,这是什么原因?


的数据
[编辑]使用df.unpersist()也没有帮助。此外,gc.collect()之前的“del df”和“df = None”都没有改变Ganglia集群面板中可见的内存级别中的任何内容。

kx1ctssn

kx1ctssn1#

  • 您遇到的问题是因为toPandas()方法返回了一个Pandas DataFrame,它存储在驱动程序节点的内存中。

当您删除df变量并运行gc.collect()时,您只释放了Pandas DataFrame使用的内存,而不是之前创建的Spark DataFrame使用的内存。
为了释放Spark DataFrame使用的内存,你可以在使用完DataFrame后调用它的unpersist()方法。我创建了一个例子:

delta_df = spark.read.format("delta").load(delta_table_path)
df_pandas = delta_df.toPandas()
df_filtered.unpersist()
gc.collect()
print(df_pandas.head())

字符串

结果:

The count of filtered rows is: 500
    id      name
0  751  Name_751
1  752  Name_752
2  753  Name_753
3  754  Name_754
4  755  Name_755


通过调用df_filtered.unpersist(),我释放了Spark DataFrame使用的内存。这个方法应该可以帮助您减少驱动程序节点上的内存使用,并防止内存不足错误。
在上面的代码中使用Unpersist或drop Spark DataFrame将Spark DataFrame转换为Pandas DataFrame。这一步对于释放Spark端资源很重要您可以使用df_filtered.unpersist()df_filtered = None然后显式调用垃圾收集,然后您可以根据需要继续使用df_pandas

相关问题