在spark中将多列合并为单列

x9ybnkn6  于 2021-05-24  发布在  Spark
关注(0)|答案(3)|浏览(698)

我的Parquet文件中有以下格式的扁平传入数据:

我想把它转换成下面的格式,在这里我不展平我的结构:

我尝试了以下方法:

Dataset<Row> rows = df.select(col("id"), col("country_cd"),
                explode(array("fullname_1", "fullname_2")).as("fullname"),
                explode(array("firstname_1", "firstname_2")).as("firstname"));

但它给出了以下错误:
线程“main”org.apache.spark.sql.analysisexception中出现异常:每个select子句只允许一个生成器,但找到2:explode(array(fullname_1,fullname_2)),explode(array(firstname_1,firstname_2));
我理解这是因为在一个查询中不能使用超过1个explode。我正在寻找在spark java中执行上述操作的选项。

x759pob2

x759pob21#

这类问题最容易用简单的方法解决 .flatMap() . 一 .flatMap() 就像一个 .map() 但它允许您为每个输入记录输出n个记录,而不是1:1的比率。

val df = Seq(
    (1, "USA", "Lee M", "Lee", "Dan A White", "Dan"),
    (2, "CAN", "Pate Poland", "Pate", "Don Derheim", "Don")
    ).toDF("id", "country_code", "fullname_1", "firstname_1", "fullname_2", "firstname_2")

df.flatMap(row => {
    val id = row.getAs[Int]("id")
    val cc = row.getAs[String]("country_code")
    Seq(
        (id, cc, row.getAs[String]("fullname_1"), row.getAs[String]("firstname_1")),
        (id, cc, row.getAs[String]("fullname_1"), row.getAs[String]("firstname_1"))
    )
}).toDF("id", "country_code", "fullname", "firstname").show()

结果如下:

+---+------------+-----------+---------+
| id|country_code|   fullname|firstname|
+---+------------+-----------+---------+
|  1|         USA|      Lee M|      Lee|
|  1|         USA|      Lee M|      Lee|
|  2|         CAN|Pate Poland|     Pate|
|  2|         CAN|Pate Poland|     Pate|
+---+------------+-----------+---------+
a9wyjsp7

a9wyjsp72#

作为一个数据库人员,我喜欢对这样的事情使用基于集合的操作,例如 union ```
val df = Seq(
("1", "USA", "Lee M", "Lee", "Dan A White", "Dan"),
("2", "CAN", "Pate Poland", "Pate", "Don Derheim", "Don")
).toDF("id", "country_code", "fullname_1", "firstname_1", "fullname_2", "firstname_2")

val df_new = df
.select("id", "country_code", "fullname_1", "firstname_1").union(df.select("id", "country_code", "fullname_2", "firstname_2"))
.orderBy("id")

df_new.show
df.createOrReplaceTempView("tmp")

或等效的sql:

%sql
SELECT id, country_code, fullname_1 AS fullname, firstname_1 AS firstname
FROM tmp
UNION
SELECT id, country_code, fullname_2, firstname_2
FROM tmp

我的结果:
![](https://i.stack.imgur.com/TL0qJ.png)
我想与flatmap技术相比的一个优点是您不必指定数据类型,而且表面上看起来更简单。当然由你决定。
xriantvc

xriantvc3#

您需要将名字和姓氏 Package 到一个结构数组中,然后将其分解:

Dataset<Row> rows = df.select(col("id"), col("country_cd"),
  explode(
    array(
      struct(
        col("firstname_1").as("firstname"), col("fullname_1").as("fullname")),
      struct(
        col("firstname_2").as("firstname"), col("fullname_2").as("fullname"))
    )
  )
)

这样,您将获得快速的窄范围转换,具有scala/python/r可移植性,并且它应该比 df.flatMap 解决方案,它将把dataframe转换为rdd,而查询优化器无法改进rdd。由于从不安全的字节数组复制到java对象,java垃圾收集器可能会带来额外的压力。

相关问题