如何处理spark窗口函数中的数据倾斜?

baubqpgj  于 2021-05-26  发布在  Spark
关注(0)|答案(0)|浏览(1229)

我有一个数据集,我正试图在pyspark中处理。数据(在磁盘上作为parquet)包含用户id、会话id和与每个会话相关的元数据。我正在向我的Dataframe添加一些列,这些列是通过窗口聚合的结果。我遇到的问题是,除了4-6个执行者之外,所有的执行者都会很快完成,其余的执行者永远不会完成。我的代码如下所示:

import pyspark.sql.functions as f
from pyspark.sql.window import Window

empty_col_a_cond = ((f.col("col_A").isNull()) |
                         (f.col("col_A") == ""))

session_window = Window.partitionBy("user_id", "session_id") \
                       .orderBy(f.col("step_id").asc())

output_df = (
    input_df 
    .withColumn("col_A_val", f
                .when(empty_col_a_cond, f.lit("NA"))
                .otherwise(f.col("col_A"))) 
    # ... 10 more added columns replacing nulls/empty strings
    .repartition("user_id", "session_id")
    .withColumn("s_user_id", f.first("user_id", True).over(session_window)) 
    .withColumn("s_col_B", f.collect_list("col_B").over(session_window)) 
    .withColumn("s_col_C", f.min("col_C").over(session_window)) 
    .withColumn("s_col_D", f.max("col_D").over(session_window)) 
    # ... 16 more added columns aggregating over session_window
    .where(f.col("session_flag") == 1) 
    .where(f.array_contains(f.col("s_col_B"), "some_val"))
)

在我的日志中,我一遍又一遍地看到:

INFO ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
INFO UnsafeExternalSorter: Thread 92 spilling sort data of 9.2 GB to disk (2  times so far)
INFO UnsafeExternalSorter: Thread 91 spilling sort data of 19.3 GB to disk (0  time so far)

这表明spark不能在内存中保存所有的窗口数据。我试着增加内部设置 spark.sql.windowExec.buffer.in.memory.threshold 以及 spark.sql.windowExec.buffer.spill.threshold ,这有点帮助,但仍然有执行人没有完成。
我相信这都是由于数据中的一些偏差造成的。按两者分组 user_id 以及 session_id ,有5个条目的计数>=10000,100条记录的计数介于1000和10000之间,150000条记录的计数小于1000(通常计数=1)。

input_df \
    .groupBy(f.col("user_id"), f.col("session_id")) \
    .count() \
    .filter("count < 1000") \
    .count()

# >= 10k, 6

# < 10k and >= 1k, 108

# < 1k, 150k

这是生成的作业dag:

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题