查找sparkDataframe中每组的最大行数

8ehkhllq  于 2021-07-14  发布在  Spark
关注(0)|答案(2)|浏览(259)

我尝试使用sparkDataframe而不是rdd,因为它们看起来比rdd更高级,并且倾向于生成更可读的代码。
在一个14节点的google dataproc集群中,我有大约600万个名字被两个不同的系统翻译成ids: sa 以及 sb . 每个 Row 包含 name , id_sa 以及 id_sb . 我的目标是从 id_said_sb 这样每个 id_sa ,对应的 id_sb 是附加到的所有名称中最常见的id id_sa .
让我们用一个例子来说明。如果我有以下行:

[Row(name='n1', id_sa='a1', id_sb='b1'),
 Row(name='n2', id_sa='a1', id_sb='b2'),
 Row(name='n3', id_sa='a1', id_sb='b2'),
 Row(name='n4', id_sa='a2', id_sb='b2')]

我的目标是从 a1b2 . 事实上,与 a1n1 , n2 以及 n3 ,分别Map到 b1 , b2 以及 b2 ,所以 b2 是关联到的名称中最频繁的Map a1 . 同样地, a2 将Map到 b2 . 我们可以假设总会有一个赢家:不需要打破僵局。
我希望我能 groupBy(df.id_sa) 但我不知道下一步该怎么办。我希望聚合最终能产生以下行:

[Row(id_sa=a1, max_id_sb=b2),
 Row(id_sa=a2, max_id_sb=b2)]

但也许我尝试使用错误的工具,我应该回去使用RDD。

bihw5rsg

bihw5rsg1#

使用 join (如果是平局,将导致多行分组):

import pyspark.sql.functions as F
from pyspark.sql.functions import count, col 

cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")

cnts.join(maxs, 
  (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))

使用窗口函数(将删除关系):

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())

(cnts
  .withColumn("rn", row_number().over(w))
  .where(col("rn") == 1)
  .select("id_sa", "id_sb"))

使用 struct 订购:

from pyspark.sql.functions import struct

(cnts
  .groupBy("id_sa")
  .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
  .select(col("id_sa"), col("max.id_sb")))

另请参见如何选择每组的第一行?

h43kikqp

h43kikqp2#

我认为您可能需要的是窗口功能:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.window
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
下面是scala中的一个示例(我现在没有带配置单元的spark shell,所以我无法测试代码,但我认为它应该可以工作):

case class MyRow(name: String, id_sa: String, id_sb: String)

val myDF = sc.parallelize(Array(
    MyRow("n1", "a1", "b1"),
    MyRow("n2", "a1", "b2"),
    MyRow("n3", "a1", "b2"),
    MyRow("n1", "a2", "b2")
)).toDF("name", "id_sa", "id_sb")

import org.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc)

myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb")

可能有更有效的方法来实现与窗口函数相同的结果,但我希望这为您指明了正确的方向。

相关问题