spark mapinpandas中有多少迭代器?

umuewwlo  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(490)

我试图理解“mapinpandas”在spark中是如何工作的。databricks博客上引用的例子是:

from typing import Iterator
import pandas as pd

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def pandas_filter(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(pandas_filter, schema=df.schema).show()

问题是,迭代器中有多少“pdf”?我猜它们可能和分区的数量一样多,但当我进一步测试代码时,它们似乎太多了(在一个具有~100m条记录的不同数据集上)
那么,有没有办法知道迭代次数是如何确定的,有没有办法使它等于分区的数量呢?

z3yyvxxp

z3yyvxxp1#

您可以在文档中找到:
spark中的数据分区被转换成arrow记录批处理,这可能会暂时导致jvm中的高内存使用率。为了避免可能的内存不足异常,可以通过将conf“spark.sql.execution.arrow.maxrecordsperbatch”设置为一个整数来调整arrow记录批的大小,该整数将确定每个批的最大行数。默认值为每批10000条记录。如果列数较大,则应相应调整该值。使用此限制,每个数据分区将被划分为1个或多个记录批进行处理。
因此,如果您有10m条记录,那么您将有~10000个迭代器

相关问题