如何创建第三列包含第二个Dataframe的数组并按第一个Dataframe的id过滤的其他Dataframe?

c8ib6hqw  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(350)

我有两个Dataframe和人的地址。表包含可靠的地址源和其他不受信任的地址源。我们想知道地址是真人的可证明性。
我用的是spark 1.5
我有两个Dataframe:
df1型

COL1  | COL2 
00001 | Street 1    
00001 | Street 2    
00002 | Street 1    
00002 | Street 2    
00002 | Street 1

df2型

COL1  | COL2   
00001 | Street 1    
00001 | Street 2    
00001 | Street 2    
00001 | Street 2    
00002 | Street 1    
00002 | Street 2    
00002 | Street 1

我的问题是如何转换成这个Dataframe/Map/其他?我的意思是转换成下面的代码。

COL1  | COL2        | COL3 (Array or Vector)   
00001 | Street 1    | [00001 | Street 1, 00001 | Street 2, 00001 | Street 2, 00001 | Street 2]    
00001 | Street 2    | [00001 | Street 1, 00001 | Street 2, 00001 | Street 2, 00001 | Street 2]    
00002 | Street 1    | [00002 | Street 1, 00002 | Street 2, 00002 | Street 1]    
00002 | Street 2    | [00002 | Street 1, 00002 | Street 2, 00002 | Street 1]   
00002 | Street 1    | [00002 | Street 1, 00002 | Street 2, 00002 | Street 1]

最后一个表只是一个例子,我需要连接两个独立于文件格式表的Dataframe。我们需要处理第三个表的数据以获得统计数据。

ukdjmx9f

ukdjmx9f1#

我用spark1.6尝试了下面的代码。
我已经编写了假设col1是字符串的解决方案。

步骤:

转换 DF1 到rdd[(字符串,字符串)]
DF2COL1 加入结果 Step-1Step-2 将结果Map到 Step-3RDD[Row] 定义结构
创建新的Dataframe

代码:

val aggRDD = df1.rdd.map(r=>         // Converting DF1 to RDD[(String,String)]
  (r.getString(0),                   //Mapping COL1
    r.getString(1))                  //Mapping COL2
).join(                              //Joining DF1 with grouped DF2 
  df2.rdd.groupBy(r=>r.getString(0)) // Grouping DF2 by COL1
).map(r=>                            // r -> (String, (String, Iterable[Row]))
  Row.fromSeq(Seq(                   // Creating a Row Object
    r._1,                            // Setting COL1
    r._2._1,                         // Setting COL2
    r._2._2.map(i=>                  // Converting the Row object to String
      i.getString(0)+"|"+i.getString(1) // Extracting COL1 and COL2 Value
    )
  ))
)

// Defining the Struct for the new DataFrame
val struct = new StructType(Array(
    StructField("COL1",StringType,nullable = true),
    StructField("COL2",StringType,nullable = true),
    StructField("Array",
    ArrayType(StringType,containsNull = true),nullable = true)
))    

// Creating a new DataFrame from RDD[Row]
val df = sq.createDataFrame(aggRDD,struct)

df.show(truncate=false)

+-----+--------+----------------------------------------------------------------+
|COL1 |COL2    |Array                                                           |
+-----+--------+----------------------------------------------------------------+
|00002|Street 1|[00002|Street 1, 00002|Street 2, 00002|Street 1]                |
|00002|Street 2|[00002|Street 1, 00002|Street 2, 00002|Street 1]                |
|00002|Street 1|[00002|Street 1, 00002|Street 2, 00002|Street 1]                |
|00001|Street 1|[00001|Street 1, 00001|Street 2, 00001|Street 2, 00001|Street 2]|
|00001|Street 2|[00001|Street 1, 00001|Street 2, 00001|Street 2, 00001|Street 2]|
+-----+--------+----------------------------------------------------------------+

相关问题