您能否建议在sparkDataframe中实现distinct的替代方法。
我尝试了sql和spark distinct,但是由于数据集大小(>20亿)在shuffle上失败了。
如果我将节点和内存增加到>250gb,进程将运行很长时间(超过7小时)。
val df = spark.read.parquet(out)
val df1 = df.
select($"ID", $"col2", $"suffix",
$"date", $"year", $"codes").distinct()
val df2 = df1.withColumn("codes", expr("transform(codes, (c,s) -> (d,s) )"))
df2.createOrReplaceTempView("df2")
val df3 = spark.sql(
"""SELECT
ID, col2, suffix
d.s as seq,
d.c as code,
year,date
FROM
df2
LATERAL VIEW explode(codes) exploded_table as d
""")
df3.
repartition(
600,
List(col("year"), col("date")): _*).
write.
mode("overwrite").
partitionBy("year", "date").
save(OutDir)
暂无答案!
目前还没有任何答案,快来回答吧!