将cache()和count()应用于databricks中的spark dataframe非常慢[pyspark]

yi0zb3m4  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(678)

我在databricks集群中有一个sparkDataframe,有500万行。我想要的是缓存这个sparkDataframe,然后应用.count(),以便下一个操作运行得非常快。我以前做过两万行,效果不错。然而,在我尝试这样做的过程中,我遇到了以下悖论:
Dataframe创建
步骤1:从azure data lake存储帐户读取800万行

read_avro_data=spark.read.format("avro").load(list_of_paths) #list_of_paths[0]='abfss://storage_container_name@storage_account_name.dfs.core.windows.net/folder_1/folder_2/0/2020/06/02/00/00/27.avro'
avro_decoded=read_avro_data.withColumn('Body_decoded', sql_function.decode(read_avro_data.Body, charset="UTF-8")).select("Body_decoded")
datalake_spark_dataframe=datalake_spark_dataframe.union(avro_decoded.withColumn("Body_decoded", sql_function.from_json("Body_decoded", schema)).select(*['Body_decoded.{}'.format(x) for x in columns_selected]))

datalake_spark_dataframe.printSchema()
"root
 |-- id: string (nullable = true)
 |-- BatteryPercentage: float (nullable = true)
 |-- SensorConnected: integer (nullable = false)
 |-- TemperatureOutside: float (nullable = true)
 |-- ReceivedOn: string (nullable = true)"

datalake_spark_dataframe.rdd.getNumPartitions() # 635 partitions

这个Dataframe有800万行。我的应用程序有800万行,运行得很好,但我想在大数据环境中对我的应用程序进行压力测试。因为800万行不是大数据。因此,我复制了800万行sparkDataframe287次~22亿行。要进行复制,我执行了以下操作:
步骤2:复制800万行Dataframe

datalake_spark_dataframe_new=datalake_spark_dataframe
for i in range(287):
  print(i)
  datalake_spark_dataframe_new=datalake_spark_dataframe_new.union(datalake_spark_dataframe)
  print("done on iteration: {0}".format(i))

datalake_spark_dataframe_new.rdd.getNumPartitions() #182880

在最后的22亿行Dataframe中,我对数据进行了一个时间窗口groupby,最终得到了数百万行。我已经写了大约500万行的分组数据集在我的问题的顶部。
第3步:将22亿行Dataframe按6小时的时间窗口分组并应用.cache()和.count()

%sql set spark.sql.shuffle.partitions=100
import pyspark.sql.functions as sql_function
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType, DateType, DoubleType, ArrayType

datalake_spark_dataframe_downsampled=datalake_spark_dataframe_new.withColumn(timestamp_column, sql_function.to_timestamp(timestamp_column, "yyyy-MM-dd HH:mm"))
datalake_spark_dataframe_downsampled=datalake_spark_dataframe_downsampled.groupBy("id", sql_function.window("ReceivedOn","{0} minutes".format(time_interval)))\
                                                                         .agg(
                                                                              sql_function.mean("BatteryPercentage").alias("BatteryPercentage"),
                                                                              sql_function.mean("SensorConnected").alias("OuterSensorConnected"),
                                                                              sql_function.mean("TemperatureOutside").alias("averageTemperatureOutside"))

columns_to_drop=['window']
datalake_spark_dataframe_downsampled=datalake_spark_dataframe_downsampled.drop(*columns_to_drop)

# From 2.2 billion rows down to 5 million rows after the GroupBy...

datalake_spark_dataframe_downsampled.repartition(100)
datalake_spark_dataframe_downsampled.cache()
datalake_spark_dataframe_downsampled.count() # job execution takes for ever

datalake_spark_dataframe_downsampled.rdd.getNumPartitions() #100 after re-partition

在显示.count()之前触发ui

计数执行期间触发ui

当我对sparkDataframe应用以下命令时,完成此任务需要3个多小时,但最终失败。
我想补充一点,在重新分区之前和之后,作业在时间执行中有相同的行为。所以,我做了重新分区,以防默认值使作业运行非常慢。因此,我一直在添加分区,以防作业执行得更快。

%sql set spark.sql.shuffle.partitions=1000000
datalake_spark_dataframe_downsampled.repartition(1000000)

datalake_spark_dataframe_downsampled.cache()
datalake_spark_dataframe_downsampled.count()

以下是spark作业的输出:

我得到的错误是:

我的群集资源:

正如你所看到的,这不是一个ram或cpu核心的问题,因为我有很多。为什么即使我应用了重新分区,作业也只分为5个阶段?基于我的48个vcpu内核,如何分割作业以使.cache()和.count()命令运行得更快?
在8000万行(8m*10次迭代=80m行)上执行每个作业时提供的屏幕截图

k2fxgqgv

k2fxgqgv1#

我认为您使用了非常大的洗牌分区号 1000000 这就是为什么要花更多的时间来完成这项工作。
我将按照下面的逻辑来计算基于数据大小的洗牌分区。例如
假设有500万个数据来自20gb左右的数据。
洗牌阶段输入=20 gb
所以洗牌分区的总数是20000mb/200mb=100,
假设集群中只有50个核心,在这种情况下,shuffle partition值为50,或者集群中有200个核心,在这种情况下,shuffle partition值将为200。
选择高值作为shuffle分区值将有大量的shuffling数据&因此任务将需要更多的时间来完成,有时可能会失败。
spark.sql.shuffle.partitions=50//50或100是更好的选择。

fv2wmkja

fv2wmkja2#

我在过去迭代for循环时遇到了类似的问题,因为我的迭代是动态的,取决于输入组合。
我通过在每次迭代中持久化数据(您可以尝试在adls2中持久化,或者在prem-then-hdfs/hive表中持久化)来解决性能问题。在下一次迭代中再次从该位置读取,并集并再次覆盖相同的位置。网络存在滞后,效率不高。但它还是把执行时间缩短了10倍。
可能的原因可能是spark血统(我相信每次迭代它都会一次又一次地执行之前的所有迭代)。用覆盖保存数据可以避免这种情况。我也尝试了cache()和其他选项,但没有帮到我。
我试着这样做

datalake_spark_dataframe_new=datalake_spark_dataframe
datalake_spark_dataframe.write.mode("overwrite").option("header", "true").format("parquet").save("abfss://<ADLS_PATH>")
for i in range(287):
  print(i)
  datalake_spark_dataframe_new=spark.read.parquet("abfss://<ADLS_PATH>")
  datalake_spark_dataframe_new.union(datalake_spark_dataframe).write.mode("overwrite").option("header", "true").format("parquet").save("abfss://<ADLS_PATH>")
  print("done on iteration: {0}".format(i))

编辑#2这应该比上一版更有效,

for i in range(287):
  print(i)
  datalake_spark_dataframe.write.mode("append").option("header", "true").format("parquet").save("abfss://<ADLS_PATH>")
  print("done on iteration: {0}".format(i))

datalake_spark_dataframe_new=spark.read.parquet("abfss://<ADLS_PATH>")

相关问题