如何获得字符串匹配的行计数并将其添加为scala中的新列?

bwleehnv  于 2021-05-26  发布在  Spark
关注(0)|答案(2)|浏览(403)

这是我的Dataframe:

val new_df = Seq(("mike","A","B","B","C","A"),
    ("bob","A","A","B","A","C")).toDF("name","math","science","english","history","art")

下面是我试图生成的结果Dataframe:

val new_df2 = Seq(("mike","A","B","B","C","A",2,2,1),
    ("bob","A","A","B","A","C",3,1,1)).toDF("name","math","science","english","history","art","A_count","B_count","C_count")

以下是表格视图中的前后快照:


所以我想添加一个列来获取每行的a的计数(或任何字符串匹配)。我该怎么办?我知道使用withcolumn可以工作,但我不知道如何在一行上进行字符串匹配。
非常感谢,祝你今天愉快!

r7xajy2e

r7xajy2e1#

最好的方法是使用sparksql。虽然你可以用flatmap来解决这个问题,但是它的速度要慢得多,而且最好是试着在线完成它!我写这篇文章是为了让你可以很容易地扩展到更多的等级,而且有一点少锅炉板。

import org.apache.spark.sql.Column

def colIsGrade(col:Column, grade:String) = when(col === lit(grade), lit(1)).otherwise(lit(0))

def countOccurenceOf(grade:String) = (List($"math", $"science", $"english", $"history", $"art").foldLeft(lit(0)) {
  case (count, subject) => colIsGrade(subject, grade) + count
}).as(s"${grade}_count")

val grades = List("A","B","C","D","E","F")
val gradesColumnStatement = grades.map(countOccurenceOf)

new_df.select(col("*") +: gradesColumnStatement :_*)
cwxwcias

cwxwcias2#

这可以在一个过程中通过对数据行然后列的平面Map来动态地解决 Dataframe 然后做一个 .pivot().count() 结果如何。

val grades = Array[String]("A", "B", "C")
val cols = new_df.columns
val gcount = new_df.flatMap( row => {
    val name = row.getAs[String]("name")
    cols.flatMap( c => {
        val grade = row.getAs[String](c)
        if (grades.contains(grade)) {
            Some(name, grade)    
        } else {
            None
        }
    })
}).toDF("name", "grade")
  .groupBy("name")
  .pivot("grade")
  .count()
  .withColumnRenamed("A", "A_count")
  .withColumnRenamed("B", "B_count")
  .withColumnRenamed("C", "C_count")

new_df.join(gcount, "name").show()

这将导致:

+----+----+-------+-------+-------+---+-------+-------+-------+
|name|math|science|english|history|art|A_count|B_count|C_count|
+----+----+-------+-------+-------+---+-------+-------+-------+
|mike|   A|      B|      B|      C|  A|      2|      2|      1|
| bob|   A|      A|      B|      A|  C|      3|      1|      1|
+----+----+-------+-------+-------+---+-------+-------+-------+

---编辑
我可以完成每个主要步骤,并解释它们的作用:

val gcount = new_df.flatMap( row => {
    val name = row.getAs[String]("name")
    cols.flatMap( c => {
        val grade = row.getAs[String](c)
        if (grades.contains(grade)) {
            Some(name, grade)    
        } else {
            None
        }
    })
}).toDF("name", "grade")

.flatmap() 就像一个 .map() ,只不过输入记录和输出记录的比率不是1:1,而是1:n。换句话说,它可以为处理的每条记录发出0条或更多条记录。在这种情况下,我们遍历每一行,然后遍历每一列,为每个人的每个等级生成一个记录(只要单元格的值存在于我们感兴趣的等级的初始数组中)。这个 .toDF() 方法简单地转换我们的结果 Dataset[(String, String)] 回到一个 Dataframe . 这段代码导致 Dataframe 看起来是这样的:

+----+-----+
|name|grade|
+----+-----+
|mike|    A|
|mike|    B|
|mike|    B|
|mike|    C|
|mike|    A|
| bob|    A|
| bob|    A|
| bob|    B|
| bob|    A|
| bob|    C|
+----+-----+

中间产物 Dataframe 我们可以做一个

groupBy("name").pivot("grade").count()

这个 groupBy() 按所提供列中包含的值创建记录组。然后, .pivot() 将在它提供的列中获取不同的值,并为每个值创建一个新列。最后是 .count() 方法确定如何聚合不同列组的值。
这个 .withColumnRenamed() 我们有很多方法来做最后的决定 Dataframe “look”就像您通过实际重命名列请求的那样(因为实际的上一个等级值变成了列名)。如果换个方式解决这个问题会更好 Some(name, grade)Some(name, grade+"_count") 以避免静态列重命名。
关于你得到的csv错误,我必须看到实际的代码和csv头,以了解可能是什么原因造成的。
---替代解决方案
我还制定了一个黑客替代解决方案,需要在您的源代码中的列 Dataframe 是固定的,可能不是最有效的事情。这与上面的更好的解决方案几乎没有关系,但我提供它是为了以防万一:

var df = new_df
val grades = Array[String]("A", "B", "C")

grades.foreach(g => {
    df = df.withColumn(g +"_count", ($"math" === lit(g)).cast("Int") + ($"science" === lit(g)).cast("Int") + ($"english" === lit(g)).cast("Int") + ($"history" === lit(g)).cast("Int") + ($"art" === lit(g)).cast("Int"))
})

再说一次,这是一个老套的解决方案,但可能适用于少量的“等级”,一个固定的列集,如果你不太关心效率的话。

相关问题