这是我的Dataframe:
val new_df = Seq(("mike","A","B","B","C","A"),
("bob","A","A","B","A","C")).toDF("name","math","science","english","history","art")
下面是我试图生成的结果Dataframe:
val new_df2 = Seq(("mike","A","B","B","C","A",2,2,1),
("bob","A","A","B","A","C",3,1,1)).toDF("name","math","science","english","history","art","A_count","B_count","C_count")
以下是表格视图中的前后快照:
所以我想添加一个列来获取每行的a的计数(或任何字符串匹配)。我该怎么办?我知道使用withcolumn可以工作,但我不知道如何在一行上进行字符串匹配。
非常感谢,祝你今天愉快!
2条答案
按热度按时间r7xajy2e1#
最好的方法是使用sparksql。虽然你可以用flatmap来解决这个问题,但是它的速度要慢得多,而且最好是试着在线完成它!我写这篇文章是为了让你可以很容易地扩展到更多的等级,而且有一点少锅炉板。
cwxwcias2#
这可以在一个过程中通过对数据行然后列的平面Map来动态地解决
Dataframe
然后做一个.pivot().count()
结果如何。这将导致:
---编辑
我可以完成每个主要步骤,并解释它们的作用:
一
.flatmap()
就像一个.map()
,只不过输入记录和输出记录的比率不是1:1,而是1:n。换句话说,它可以为处理的每条记录发出0条或更多条记录。在这种情况下,我们遍历每一行,然后遍历每一列,为每个人的每个等级生成一个记录(只要单元格的值存在于我们感兴趣的等级的初始数组中)。这个.toDF()
方法简单地转换我们的结果Dataset[(String, String)]
回到一个Dataframe
. 这段代码导致Dataframe
看起来是这样的:中间产物
Dataframe
我们可以做一个这个
groupBy()
按所提供列中包含的值创建记录组。然后,.pivot()
将在它提供的列中获取不同的值,并为每个值创建一个新列。最后是.count()
方法确定如何聚合不同列组的值。这个
.withColumnRenamed()
我们有很多方法来做最后的决定Dataframe
“look”就像您通过实际重命名列请求的那样(因为实际的上一个等级值变成了列名)。如果换个方式解决这个问题会更好Some(name, grade)
至Some(name, grade+"_count")
以避免静态列重命名。关于你得到的csv错误,我必须看到实际的代码和csv头,以了解可能是什么原因造成的。
---替代解决方案
我还制定了一个黑客替代解决方案,需要在您的源代码中的列
Dataframe
是固定的,可能不是最有效的事情。这与上面的更好的解决方案几乎没有关系,但我提供它是为了以防万一:再说一次,这是一个老套的解决方案,但可能适用于少量的“等级”,一个固定的列集,如果你不太关心效率的话。