多scalaDataframeconcat

j9per5c4  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(187)

我有多个df,我想把它们合并成一个大df

+----+----------+----------+
|year|   state  |     count|
+----+----------+----------+
|2014|        CT|    343477|
|2014|        DE|    123431|
|2014|        MD|    558686|
|2014|        NJ|    773321|
|2015|        CT|    343477|
|2015|        DE|    123431|
|2015|        MD|    558686|
|2015|        NJ|    773321|
|2016|        CT|    343477|
|2016|        DE|    123431|
|2016|        MD|    558686|
|2016|        NJ|    773321|
|2017|        CT|    343477|
|2017|        DE|    123431|
|2017|        MD|    558686|
|2017|        NJ|    773321|
+----+----------+----------+
+-----------------+
|count_2          |
-----------------+
|           343477|
|           123431|
|           558686|
|           773321|
|           343477|
|           123431|
|           558686|
|           773321|
|           343477|
|           123431|
|           558686|
|           773321|
|           343477|
|           123431|
|           558686|
|           773321|
+-----------------+

我想把它们合并成一个df

+----+----------+----------+--------------------
        |year|   state  |     count| count_2
        +----+----------+----------+--------------------
        |2014|        CT|    343477|343477
        |2014|        DE|    123431|123431
        |2014|        MD|    558686|558686
        |2014|        NJ|    773321|773321
        |2015|        CT|    343477|343477
        |2015|        DE|    123431|123431
        |2015|        MD|    558686|558686
        |2015|        NJ|    773321|773321
        |2016|        CT|    343477|343477
        so on...

我用了sql(),但没用。。我还试图加入df(左连接)它也不工作,这是什么样的连接没有重复?谢谢!

ddarikpa

ddarikpa1#

我想,解决你的问题没有捷径。请找到我下面的解决方案

//Inputs: 
val df1=Seq((2014,"CT",343477),(2014,"DE",123431),(2014,"MD",558686),(2014,"NJ",773321),(2015,"CT",343477),(2015,"DE",123431),(2015,"MD",558686),(2015,"NJ",773321)).toDF("year","state","count")

val df2=Seq(343477,123431,558686,773321,343477,123431,558686,773321).toDF("count_2")

//Solution: 

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val winFun=Window.partitionBy("year","state","count").orderBy("year")
df1.join(df2,df1("count")===df2("count_2")).withColumn("row_no",row_number over winFun).filter("row_no =1").drop("row_no").orderBy("year").show

样本输出:

+----+-----+------+-------+
|year|state| count|count_2|
+----+-----+------+-------+
|2014|   DE|123431| 123431|
|2014|   MD|558686| 558686|
|2014|   CT|343477| 343477|
|2014|   NJ|773321| 773321|
|2015|   MD|558686| 558686|
|2015|   DE|123431| 123431|
|2015|   CT|343477| 343477|
|2015|   NJ|773321| 773321|
+----+-----+------+-------+

相关问题