我需要处理每个分区的分区(长话短说)。
使用 mapPartitions
使用RDD时工作正常。在本例中,使用 rdd.mapPartitions(mapper).collect()
一切正常。
但是,当转换为Dataframe时,一个分区会被处理两次。
为什么会发生这种情况以及如何避免?
下面是下一个简单示例的输出。当只有两个分区时,我们可以读取函数是如何执行3次的。其中一个分区 [Row(id=1), Row(id=2)]
处理两次。很有可能其中一个执行被忽略了,正如我们在datadrame中看到的那样。
size: 2 > values: [Row(id=1), Row(id=2)]
size: 2 > values: [Row(id=1), Row(id=2)]
size: 2 > values: [Row(id=3), Row(id=4)]
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+
> Mapper executions: 3
使用的简单示例:
from typing import Iterator
from pyspark import Row
from pyspark.sql import SparkSession
def gen_random_row(id: str):
return Row(id=id)
if __name__ == '__main__':
spark = SparkSession.builder.master("local[1]").appName("looking for the error").getOrCreate()
executions_counter = spark.sparkContext.accumulator(0)
rdd = spark.sparkContext.parallelize([
gen_random_row(1),
gen_random_row(2),
gen_random_row(3),
gen_random_row(4),
], 2)
def mapper(iterator: Iterator[Row]) -> Iterator[Row]:
executions_counter.add(1)
lst = list(iterator)
print(f"size: {len(lst)} > values: {lst}")
for r in lst:
yield r
# rdd.mapPartitions(mapper).collect()
rdd.mapPartitions(mapper).toDF().show()
print(f"> Mapper executions: {executions_counter.value}")
spark.stop()
1条答案
按热度按时间e7arh2l61#
解决方案是将模式传递给
toDF
看起来spark正在处理一个分区来推断模式。要解决这个问题:
使用这段代码,每个分区处理一次。