scala—如何在写入之前从sparkDataframe中删除小分区

jm2pwxwz  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(529)

我根据列中的值对Dataframe进行分区,如下所示:

val dfPartitioned = df.repartition(col("my_col"))

我想从小于n行的Dataframe中删除分区。在写到磁盘之前,我该怎么做?
或者在分区之前我必须过滤掉那些我不想要的?

oxcyiej7

oxcyiej71#

在划分之前,通过获取窗口上的计数,然后按阈值过滤:

val df = Seq(1, 2, 1).toDF("my_col")
val rowCountThreshold = 1

df
  .withColumn("colNumber", count("my_col").over(Window.partitionBy("my_col")))
  .where($"colNumber" > rowCountThreshold)

输出:

+------+---------+
|my_col|colNumber|
+------+---------+
|1     |2        |
|1     |2        |
+------+---------+
jgovgodb

jgovgodb2#

分区和筛选行:

import org.apache.spark.sql.functions._
import sparkSession.implicits._

df.withColumn("cnt", count($"*").over(Window.partitionBy("my_col").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))
  .where($"cnt" > N)
  .drop($"cnt")

相关问题