spark pyspark输出一列值作为键的json

aij0ehis  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(596)

如果我在dataframe中有一个简单的表,并且带有schema:

a string, b string, c string

例如:

a     b      c
cat   3-3    78-b
cat   3-3    89-0
cat   4-4    78-n 
dog   4-4    89-b

等等。我想按列a对这个表进行分区,并将每个分区保存为单独的json。
此外,我希望每个分区都是一个json文件,其中b列中的值是键。例如:

File cat.json:
     {
       "3-3": {"b": "3-3", "c": "78-b"},
       "3-3": {"b": "3-3", "c": "89-0"},
       "4-4": {"b": "4-4", "c": "78-n"}
     }
File dog.json:
     {
       "4-4": {"b": 4-4, "c": "89-b"}
     }

有没有办法在Pypark中做到这一点?谢谢

xwbd5t1u

xwbd5t1u1#

只需为dataframe添加一个行Map逻辑,就可以了,请在内联找到代码解释

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._

object CatDog {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    val df = List(("cat",   "3-3"    ,"78-b"),
    ("cat"   ,"3-3",    "89-0"),
    ("cat"   ,"4-4"    ,"78-n"),
      ("dog"   ,"4-4",    "89-b")).toDF("a","b","c")

//    df.show()

    //Write your out as JSON
    df.select("a").distinct().map((a: Row) =>
      (a, df.filter(col("a") === a).map(row => parseDF(row)))
    ).foreachPartition((iterator) => {
      iterator.foreach(record => {
        val aVal = record._1.getString(0)
        record._2.write.json(s"src/main/resources/$aVal.json")
      })
    })
  }

  //Row mapping logic
  def parseDF(row: Row): Map[String, Map[String, String]] = {

    val b = row.getString(1)
    val c = row.getString(2)
    Map(b -> Map("b" -> b, "c" -> c))

  }

}
gmol1639

gmol16392#

尝试使用以下解决方案-

1. 加载数据

val data =
      """
        |a    | b   |   c
        |cat  | 3-3 |   78-b
        |cat  | 3-3 |   89-0
        |cat  | 4-4 |   78-n
        |dog  | 4-4 |   89-b
      """.stripMargin
    val stringDS = data.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .csv(stringDS)
    df.show(false)
    df.printSchema()
    /**
      * +---+---+----+
      * |a  |b  |c   |
      * +---+---+----+
      * |cat|3-3|78-b|
      * |cat|3-3|89-0|
      * |cat|4-4|78-n|
      * |dog|4-4|89-b|
      * +---+---+----+
      *
      * root
      * |-- a: string (nullable = true)
      * |-- b: string (nullable = true)
      * |-- c: string (nullable = true)
      */

2. 根据需要创建Map

val processedDF = df
        .groupBy("a")
        .agg(
          collect_list(struct(col("b"), col("c"))).as("value"),
          collect_list(col("b")).as("key")
        )
      .withColumn("map", map_from_arrays(col("key"), col("value")))

    processedDF.show(false)
    processedDF.printSchema()

    /**
      * +---+---------------------------------------+---------------+------------------------------------------------------------+
      * |a  |value                                  |key            |map                                                         |
      * +---+---------------------------------------+---------------+------------------------------------------------------------+
      * |cat|[[3-3, 78-b], [3-3, 89-0], [4-4, 78-n]]|[3-3, 3-3, 4-4]|[3-3 -> [3-3, 78-b], 3-3 -> [3-3, 89-0], 4-4 -> [4-4, 78-n]]|
      * |dog|[[4-4, 89-b]]                          |[4-4]          |[4-4 -> [4-4, 89-b]]                                        |
      * +---+---------------------------------------+---------------+------------------------------------------------------------+
      *
      * root
      * |-- a: string (nullable = true)
      * |-- value: array (nullable = true)
      * |    |-- element: struct (containsNull = true)
      * |    |    |-- b: string (nullable = true)
      * |    |    |-- c: string (nullable = true)
      * |-- key: array (nullable = true)
      * |    |-- element: string (containsNull = true)
      * |-- map: map (nullable = true)
      * |    |-- key: string
      * |    |-- value: struct (valueContainsNull = true)
      * |    |    |-- b: string (nullable = true)
      * |    |    |-- c: string (nullable = true)
      */

3. 保存Dataframe

processedDF.select(col("a"), to_json(col("map"))).write
      .mode(SaveMode.Overwrite)
      .partitionBy("a")
      .text("/Users/sokale/models/run_2")

    /**
      * File directory and content of file
      * a=cat
      * |-  {"3-3":{"b":"3-3","c":"78-b"},"3-3":{"b":"3-3","c":"89-0"},"4-4":{"b":"4-4","c":"78-n"}}
      * a=dog
      * |-  {"4-4":{"b":"4-4","c":"89-b"}}
      */

相关问题

微信公众号