flatten+(~ self join)scala中带有结构数组的sparkDataframe

weylhg0b  于 2021-07-12  发布在  Spark
关注(0)|答案(2)|浏览(240)

输入Dataframe:

{
  "F1" : "A",
  "F2" : "B",
  "F3" : [
            {
              "name" : "N1",
              "sf1" : "val_1",
              "sf2" : "val_2"
            },
            {
              "name" : "N2",
              "sf1" : "val_3",
              "sf2" : "val_4"
            }
         ],
  "F4" : {
        "SF1" : "val_5",
        "SF2" : "val_6",
        "SF3" : "val_7"
  }
}

期望输出:

[
  {
    "F1" : "A",
    "F2" : "B",

    "F3_name" : "N1",
    "F3_sf1" : "val_1",
    "F3_sf2" : "val_2",

    "F4_SF1" : "val_7",
    "F4_SF2" : "val_8",
    "F4_SF3" : "val_9",
  },
  {
    "F1" : "A",
    "F2" : "B",

    "F3_name" : "N2",
    "F3_sf1" : "val_3",
    "F3_sf2" : "val_4",

    "F4_SF1" : "val_7",
    "F4_SF2" : "val_8",
    "F4_SF3" : "val_9",
  }
]
``` `F3` 是结构的数组。新的数据框应该是扁平的,并且已经根据数据框中的项目数将这一行转换成一行或多行(本例中为2行) `F3` .
我是spark&scala的新手。任何关于如何实现这一转变的想法都将非常有用。
谢谢!
eulz3vhy

eulz3vhy1#

你也可以先用 explode . 然后,您可以使用一系列别名(例如。, $"F3.name" as "F3_name" ):

scala> case class NameSF(name: String, sf1: String, sf2: String)
defined class NameSF

scala> case class SF(SF1: String, SF2: String, SF3: String)
defined class SF

scala> case class F(F1: String, F2: String, F3: Array[NameSF], F4: SF)
defined class F

scala> val elem = F("A",
     |              "B",
     |              Array(NameSF("N1", "val_1", "val_2"), NameSF("N2", "val_3", "val_4")),
     |              SF("val_5", "val_6", "val_7"))
elem: F = F(A,B,[LNameSF;@2939bfa0,SF(val_5,val_6,val_7))

scala> val df = spark.createDataset(Seq(elem)).toDF
df: org.apache.spark.sql.DataFrame = [F1: string, F2: string ... 2 more fields]

scala> df.withColumn("F3", explode($"F3")).select($"F1",
     |                                            $"F2",
     |                                            $"F3.name" as "F3_name",
     |                                            $"F3.sf1" as "F3_sf1",
     |                                            $"F3.sf2" as "F3_sf2",
     |                                            $"F4.SF1" as "F4_SF1",
     |                                            $"F4.SF2" as "F4_SF2",
     |                                            $"F4.SF3" as "F4_SF3").show
+---+---+-------+------+------+------+------+------+                            
| F1| F2|F3_name|F3_sf1|F3_sf2|F4_SF1|F4_SF2|F4_SF3|
+---+---+-------+------+------+------+------+------+
|  A|  B|     N1| val_1| val_2| val_5| val_6| val_7|
|  A|  B|     N2| val_3| val_4| val_5| val_6| val_7|
+---+---+-------+------+------+------+------+------+
ftf50wuq

ftf50wuq2#

你可以用 inline 展开f3,然后 * 要展开f4:

val df2 = df.selectExpr("F1","F2","inline(F3)","F4.*")

df2.show
+---+---+----+-----+-----+-----+-----+-----+
| F1| F2|name|  sf1|  sf2|  SF1|  SF2|  SF3|
+---+---+----+-----+-----+-----+-----+-----+
|  A|  B|  N1|val_1|val_2|val_5|val_6|val_7|
|  A|  B|  N2|val_3|val_4|val_5|val_6|val_7|
+---+---+----+-----+-----+-----+-----+-----+

相关问题