如何创建一个Map列来计算没有udaf的出现次数

rqdpfwrv  于 2021-05-22  发布在  Spark
关注(0)|答案(4)|浏览(391)

我想创建一个 Map 计算出现次数的列。
例如:

+---+----+
|  b|   a|
+---+----+
|  1|   b|
|  2|null|
|  1|   a|
|  1|   a|
+---+----+

会导致

+---+--------------------+
|  b|                 res|
+---+--------------------+
|  1|[a -> 2.0, b -> 1.0]|
|  2|                  []|
+---+--------------------+

目前,在spark2.4.6中,我能够使用udaf实现它。
当我碰到spark3的时候,我想知道我是否能摆脱这个udaf(我试着使用新方法) aggregate 没有成功)
有没有有效的方法(对于效率部分,我可以很容易地测试)

rqenqsqc

rqenqsqc1#

你可以随时使用 collect_list 使用自定义项,但前提是您的分组不太大:

val udf_histo = udf((x:Seq[String]) => x.groupBy(identity).mapValues(_.size))

df.groupBy($"b")
  .agg(
    collect_list($"a").as("as")
  )
  .select($"b",udf_histo($"as").as("res"))
  .show()

给予:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+

这应该比udaf:spark custom aggregation:collect\u list+udf vs udaf快

w46czmvw

w46czmvw2#

以下是spark 3解决方案:

import org.apache.spark.sql.functions._

df.groupBy($"b",$"a").count()
  .groupBy($"b")
  .agg(
    map_from_entries(
      collect_list(
        when($"a".isNotNull,struct($"a",$"count"))
      )
    ).as("res")
  )
  .show()

给予:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+

这里的解决方案使用 Aggregator :

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Encoder

val countOcc = new Aggregator[String, Map[String,Int], Map[String,Int]] with Serializable {
    def zero: Map[String,Int] = Map.empty.withDefaultValue(0)
    def reduce(b: Map[String,Int], a: String) = if(a!=null) b + (a -> (b(a) + 1)) else b
    def merge(b1: Map[String,Int], b2: Map[String,Int]) = {
      val keys = b1.keys.toSet.union(b2.keys.toSet)
      keys.map{ k => (k -> (b1(k) + b2(k))) }.toMap
    }
    def finish(b: Map[String,Int]) = b
    def bufferEncoder: Encoder[Map[String,Int]] = implicitly(ExpressionEncoder[Map[String,Int]])
    def outputEncoder: Encoder[Map[String, Int]] = implicitly(ExpressionEncoder[Map[String, Int]])
}

val countOccUDAF = udaf(countOcc)

df
  .groupBy($"b")
  .agg(countOccUDAF($"a").as("res"))
  .show()

给予:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+
dxpyg8gm

dxpyg8gm3#

这里有一个单一的解决方案 groupBy 以及一个稍微复杂的sql表达式。此解决方案适用于spark 2.4+

df.groupBy("b")
  .agg(expr("sort_array(collect_set(a)) as set"),
       expr("sort_array(collect_list(a)) as list"))
  .withColumn("res",
       expr("map_from_arrays(set,transform(set, x -> size(filter(list, y -> y=x))))"))
  .show()

输出:

+---+------+---------+----------------+
|  b|   set|     list|             res|
+---+------+---------+----------------+
|  1|[a, b]|[a, a, b]|[a -> 2, b -> 1]|
|  2|    []|       []|              []|
+---+------+---------+----------------+

这个想法是从专栏文章中收集数据 a 两次:一次进入一个集合,一次进入一个列表。然后借助于集合中每个元素的变换,计算列表中特定元素的出现次数。最后,将集合和元素数与来自\数组的map\相结合。
但是,我不能说这种方法是否真的比udaf快。

dddzy1tm

dddzy1tm4#

我们能做到的就是spark 2.4

//GET THE COUNTS
val groupedCountDf = originalDf.groupBy("b","a").count

//CREATE MAPS FOR EVERY COUNT | EMPTY MAP FOR NULL KEY
//AGGREGATE THEM AS ARRAY 

val dfWithArrayOfMaps =  groupedCountDf
.withColumn("newMap",  when($"a".isNotNull, map($"a",$"count")).otherwise(map()))
.groupBy("b").agg(collect_list($"newMap") as "multimap")

//EXPRESSION TO CONVERT ARRAY[MAP] -> MAP

val mapConcatExpr = expr("aggregate(multimap, map(), (k, v) -> map_concat(k, v))")

val finalDf = dfWithArrayOfMaps.select($"b", mapConcatExpr.as("merged_data"))

相关问题