我根据列中的值对Dataframe进行分区,如下所示:
val dfPartitioned = df.repartition(col("my_col"))
我想从小于n行的Dataframe中删除分区。在写到磁盘之前,我该怎么做?或者在分区之前我必须过滤掉那些我不想要的?
oxcyiej71#
在划分之前,通过获取窗口上的计数,然后按阈值过滤:
val df = Seq(1, 2, 1).toDF("my_col") val rowCountThreshold = 1 df .withColumn("colNumber", count("my_col").over(Window.partitionBy("my_col"))) .where($"colNumber" > rowCountThreshold)
输出:
+------+---------+ |my_col|colNumber| +------+---------+ |1 |2 | |1 |2 | +------+---------+
jgovgodb2#
分区和筛选行:
import org.apache.spark.sql.functions._ import sparkSession.implicits._ df.withColumn("cnt", count($"*").over(Window.partitionBy("my_col").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))) .where($"cnt" > N) .drop($"cnt")
2条答案
按热度按时间oxcyiej71#
在划分之前,通过获取窗口上的计数,然后按阈值过滤:
输出:
jgovgodb2#
分区和筛选行: