如果mappartitions与todf()一起使用,为什么很少的分区会被处理两次

lmyy7pcs  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(448)

我需要处理每个分区的分区(长话短说)。
使用 mapPartitions 使用RDD时工作正常。在本例中,使用 rdd.mapPartitions(mapper).collect() 一切正常。
但是,当转换为Dataframe时,一个分区会被处理两次。
为什么会发生这种情况以及如何避免?
下面是下一个简单示例的输出。当只有两个分区时,我们可以读取函数是如何执行3次的。其中一个分区 [Row(id=1), Row(id=2)] 处理两次。很有可能其中一个执行被忽略了,正如我们在datadrame中看到的那样。

size: 2 > values: [Row(id=1), Row(id=2)]
size: 2 > values: [Row(id=1), Row(id=2)]
size: 2 > values: [Row(id=3), Row(id=4)]
                                                                                +---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
+---+
> Mapper executions: 3

使用的简单示例:

from typing import Iterator

from pyspark import Row
from pyspark.sql import SparkSession

def gen_random_row(id: str):
    return Row(id=id)

if __name__ == '__main__':
    spark = SparkSession.builder.master("local[1]").appName("looking for the error").getOrCreate()
    executions_counter = spark.sparkContext.accumulator(0)

    rdd = spark.sparkContext.parallelize([
        gen_random_row(1),
        gen_random_row(2),
        gen_random_row(3),
        gen_random_row(4),
    ], 2)

    def mapper(iterator: Iterator[Row]) -> Iterator[Row]:
        executions_counter.add(1)
        lst = list(iterator)
        print(f"size: {len(lst)} > values: {lst}")
        for r in lst:
            yield r

    # rdd.mapPartitions(mapper).collect()
    rdd.mapPartitions(mapper).toDF().show()

    print(f"> Mapper executions: {executions_counter.value}")

    spark.stop()
e7arh2l6

e7arh2l61#

解决方案是将模式传递给 toDF 看起来spark正在处理一个分区来推断模式。
要解决这个问题:

schema = StructType([StructField("id", IntegerType(), True)])
    rdd.mapPartitions(mapper).toDF(schema).show()

使用这段代码,每个分区处理一次。

相关问题