如何提高spark distinct()在多列上的性能

vm0i2vca  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(287)

您能否建议在sparkDataframe中实现distinct的替代方法。
我尝试了sql和spark distinct,但是由于数据集大小(>20亿)在shuffle上失败了。
如果我将节点和内存增加到>250gb,进程将运行很长时间(超过7小时)。

val df = spark.read.parquet(out)

  val df1 = df.
   select($"ID", $"col2", $"suffix",
     $"date", $"year", $"codes").distinct()

   val df2 = df1.withColumn("codes", expr("transform(codes, (c,s) -> (d,s) )"))

   df2.createOrReplaceTempView("df2")

   val df3 = spark.sql(
     """SELECT
           ID, col2, suffix
           d.s as seq,
           d.c as code,
           year,date
           FROM
            df2
             LATERAL VIEW explode(codes) exploded_table as d
             """)

   df3.
     repartition(
       600,
       List(col("year"), col("date")): _*).
     write.
     mode("overwrite").
     partitionBy("year", "date").
     save(OutDir)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题