scala spark,如何将一组列合并到Dataframe上的单个列?

8e2ybdfx  于 2021-07-14  发布在  Java
关注(0)|答案(2)|浏览(402)

我正在寻找一种方法来做到这一点没有自定义项,我想知道它是否可能。假设我有一个df,如下所示:

Buyer_name  Buyer_state  CoBuyer_name  CoBuyers_state  Price  Date
Bob         CA           Joe           CA              20     010119
Stacy       IL           Jamie         IL              50     020419
... about 3 millions more rows...

我想把它变成:

Buyer_name Buyer_state Price Date
Bob        CA          20    010119
Joe        CA          20    010119
Stacy      IL          50    020419
Jamie      IL          50    020419
...

编辑:我也可以,
创建两个Dataframe,从一个Dataframe中删除“buyer”列,从另一个Dataframe中删除“cobuyer”列。
将带有“cobuyer”列的dataframe重命名为“buyer”列。
连接两个Dataframe。

y53ybaqx

y53ybaqx1#

你可以分组 struct(Buyer_name, Buyer_state) 以及 struct(CoBuyer_name, CoBuyer_state) 变成一个 Array 然后使用 explode ,如下图:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  ("Bob", "CA", "Joe", "CA", 20, "010119"),
  ("Stacy", "IL", "Jamie", "IL", 50, "020419")
).toDF("Buyer_name", "Buyer_state", "CoBuyer_name", "CoBuyer_state", "Price", "Date")

df.
  withColumn("Buyers", array(
    struct($"Buyer_name".as("_1"), $"Buyer_state".as("_2")),
    struct($"CoBuyer_name".as("_1"), $"CoBuyer_state".as("_2"))
  )).
  withColumn("Buyer", explode($"Buyers")).
  select(
    $"Buyer._1".as("Buyer_name"), $"Buyer._2".as("Buyer_state"), $"Price", $"Date"
  ).show
// +----------+-----------+-----+------+
// |Buyer_name|Buyer_state|Price|  Date|
// +----------+-----------+-----+------+
// |       Bob|         CA|   20|010119|
// |       Joe|         CA|   20|010119|
// |     Stacy|         IL|   50|020419|
// |     Jamie|         IL|   50|020419|
// +----------+-----------+-----+------+
qc6wkl3g

qc6wkl3g2#

对我来说,这听起来像是一个可以通过 union scala中的函数:

val df = Seq(
  ("Bob", "CA", "Joe", "CA", 20, "010119"),
  ("Stacy", "IL", "Jamie", "IL", 50, "020419")
).toDF("Buyer_name", "Buyer_state", "CoBuyer_name", "CoBuyer_state", "Price", "Date")

val df_new = df.select("Buyer_name", "Buyer_state", "Price", "Date").union(df.select("CoBuyer_name", "CoBuyer_state", "Price", "Date"))

df_new.show

感谢leo提供了我使用的Dataframe定义。

相关问题