计算Spark框架的大小-SizeEstimate给出意外结果

whitzsjs  于 8个月前  发布在  Apache
关注(0)|答案(5)|浏览(93)

我试图找到一种可靠的方法来计算Spark框架的大小(以字节为单位)。
原因是我想有一个方法来计算一个“最佳”分区数(“最佳”在这里可能意味着不同的东西:它可能意味着having an optimal partition size,或者在写入Parquet表时是resulting in an optimal file size--但两者都可以被假设为框架大小的线性函数)。换句话说,我想在框架上调用coalesce(n)repartition(n),其中n不是一个固定的数字,而是一个矩阵大小的函数。
SO上的其他主题建议使用org.apache.spark.util中的SizeEstimator.estimate来获得以字节为单位的帧大小,但我得到的结果不一致。
首先,我坚持我的框架记忆:

df.cache().count

Spark UI在Storage选项卡中显示大小为4.8GB。然后,我运行以下命令从SizeEstimator获取大小:

import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)

这给出了115'715'808 bytes =~ 116 MB的结果。然而,将SizeEstimator应用于不同的对象会导致非常不同的结果。例如,我尝试分别计算数组中每一行的大小并将它们相加:

df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)

这将导致大小为12'084'698'256字节=~ 12 GB。或者,我可以尝试将SizeEstimator应用于每个分区:

df.mapPartitions(
    iterator => Seq(SizeEstimator.estimate(
        iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)

这再次导致10'792'965'376字节=~ 10.8GB的不同大小。
我知道这涉及到内存优化/内存开销,但是在执行这些测试之后,我不知道如何使用SizeEstimator来获得足够好的框架大小估计(以及分区大小,或产生的Parquet文件大小)。
什么是适当的方式(如果有的话)应用SizeEstimator,以获得一个很好的估计的一个框架的大小或其分区?如果没有任何,什么是建议的方法在这里?

rggaifut

rggaifut1#

不幸的是,我无法从SizeEstimator中获得可靠的估计值,但我可以找到另一种策略-如果缓存了该帧,我们可以从queryExecution中提取其大小,如下所示:

df.cache.foreach(_ => ())
val catalyst_plan = df.queryExecution.logical
val df_size_in_bytes = spark.sessionState.executePlan(
    catalyst_plan).optimizedPlan.stats.sizeInBytes

对于示例的Parquet rame,这正好是4.8GB(这也对应于写入未压缩的Parquet表时的文件大小)。
这有一个缺点,即需要缓存嵌套框架,但在我的情况下,这不是一个问题。
编辑:将df.cache.foreach(_=>_)替换为df.cache.foreach(_ => ()),感谢@DavidBenedeki在评论中指出。

rfbsl7qr

rfbsl7qr2#

SizeEstimator返回一个对象在JVM堆上占用的字节数。这包括对象引用的对象,实际对象大小几乎总是要小得多。
您所观察到的大小差异是因为在JVM上创建新对象时,引用也会占用内存,这正在被计算在内。
点击这里查看文档
https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.util.SizeEstimator$

krcsximq

krcsximq3#

除了尺寸估计器,你已经尝试过了(好的洞察力)。
下面是另一个选项

RDDInfo[] getRDDStorageInfo()

返回关于哪些RDD被缓存,它们是在缓存中还是在两者上,它们占用多少空间等的信息。
实际上spark存储选项卡使用了这个。Spark文档
下面是implementation from spark

/**
   * :: DeveloperApi ::
   * Return information about what RDDs are cached, if they are in mem or on disk, how much space
   * they take, etc.
   */
  @DeveloperApi
  def getRDDStorageInfo: Array[RDDInfo] = {
    getRDDStorageInfo(_ => true)
  }

  private[spark] def getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] = {
    assertNotStopped()
    val rddInfos = persistentRdds.values.filter(filter).map(RDDInfo.fromRdd).toArray
    rddInfos.foreach { rddInfo =>
      val rddId = rddInfo.id
      val rddStorageInfo = statusStore.asOption(statusStore.rdd(rddId))
      rddInfo.numCachedPartitions = rddStorageInfo.map(_.numCachedPartitions).getOrElse(0)
      rddInfo.memSize = rddStorageInfo.map(_.memoryUsed).getOrElse(0L)
      rddInfo.diskSize = rddStorageInfo.map(_.diskUsed).getOrElse(0L)
    }
    rddInfos.filter(_.isCached)
  }

来自RDD的yourRDD.toDebugString也使用了这个。代码here

一般说明:

在我看来,要获得每个分区中记录的最佳数量,并检查您的重新分区是否正确,并且它们是否均匀分布,我建议尝试如下.

yourdf.rdd.mapPartitionsWithIndex{case (index,rows) => Iterator((index,rows.size))}
  .toDF("PartitionNumber","NumberOfRecordsPerPartition")
  .show

或现有的spark功能(基于spark版本)

import org.apache.spark.sql.functions._ 

df.withColumn("partitionId", sparkPartitionId()).groupBy("partitionId").count.show
ws51t4hk

ws51t4hk4#

我在这里发表评论是因为我没有足够的代表来评论公认的答案。
从Spark核心的this commit开始,方法executionPlan需要两个参数,logicalPlanmode
添加到hyriu的答案,对于PySpark:

df.cache().foreach(lambda x: x)
spark._jsparkSession.sessionState() \
    .executePlan(
        df._jdf.queryExecution().logical(),
        df._jdf.queryExecution().mode()   
    ).optimizedPlan() \
    .stats() \
    .sizeInBytes()
uqjltbpv

uqjltbpv5#

我的建议是

from sys import getsizeof

def compare_size_two_object(one, two):
    '''compare size of two files in bites'''
    print(getsizeof(one), 'versus', getsizeof(two))

相关问题