scala 在spark中将 Dataframe 的数组列拆分为多列

oymdgrw7  于 2023-03-18  发布在  Scala
关注(0)|答案(1)|浏览(236)

我是spark的新手,我有一个数组列,想把它切成多列,该怎么操作呢?期待你的解答。

Input:
+-------+-----------------------------------------------------------------+
|name   |subjects                                                         |
+-------+-----------------------------------------------------------------+
|Michael|[{"subject": "english", score:100},{"subject": "math", score:80}]|
|John   |[{"subject": "english", score:75},{"subject": "math", score:66}] |
+-------+-----------------------------------------------------------------+

Output:
+-------+-------+----+
|name   |english|math|
+-------+-------+----+
|Michael|100    |80  |
|John   |75     |66  |
+-------+-------+----+
guz6ccqo

guz6ccqo1#

你能保证subjects列中的所有值都有相同的“键”,也就是subjects吗?在一般情况下,你不能把数组值转换成列,因为不同行中的数组可能有不同的(而且很大)数量的元素。如果你确实知道所有行中subjects字段的长度相同,顺序相似,你可以手动完成

df.withColumn("english", element_at(element_at(col("subjects"),1),"score").
   withColumn("math",

等等
如果不确定顺序,可以对数组进行排序,虽然我不确定数组Map是如何排序的,可以使用UDF来提取分数,这样就不依赖于顺序了。
如果事先不知道主题,可以先收集所有主题名称,然后生成添加列所需的代码。
另一种处理数组中条目顺序的方法是将map数组转换为一个以subject为键的map。使用udf很容易,但可以使用spark函数完成,其中包含两个explode,然后是groupBymap_from_entriesmap_from_arrays。您仍然需要使用序列withColumn将map条目转换为列
实际上,看起来你可以用transform把map转换成map,用struct替换map,然后用map_from_entries把它转换成map,不需要udfexplode

val df1 = df.withColumn("subjects_map",map_from_entries(
  transform(col("subjects"), 
     m => struct(element_at(m, "subject"), element_at(m, "score")))))

这将创建下表

+-------+--------------------------------------------------------------------+----------------------------+
|name   |subject                                                             |map                         |
+-------+--------------------------------------------------------------------+----------------------------+
|Michael|[{subject -> english, score -> 100}, {subject -> math, score -> 90}]|{english -> 100, math -> 90}|
|John   |[{subject -> english, score -> 60}, {subject -> math, score -> 50}] |{english -> 60, math -> 50} |
+-------+--------------------------------------------------------------------+----------------------------+

如果数组是JSON编码的字符串数组而不是Map数组,则代码更改如下

val df1 = df.withColumn("subjects_map",map_from_entries(
  transform(col("subjects"), 
     m => struct(get_json_object(m, "$.subject"), get_json_object(m, "$.score")))))

最后,创建列的代码

val subjs = df1.select(explode(map_keys(col("subjects_map"))) as "subj").
              distinct.as[String].collect // gather all the subj values

subjs.foldLeft(df1){ case (df, c) => 
    df.withColumn(c, element_at(col("subjects_map"), c)) } // add column for each one

相关问题