如何在sparkDataframe中进行采样

ntjbwcob  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(472)

我有一个Dataframe,它有两列状态,投票人ID。它有超过100万条记录。我需要在根据状态采样记录后生成一个Dataframe。(即,我需要收集5k到10k记录对应于最终数据框中的每个状态)。是否有spark功能来实现这一点

72qzrwbm

72qzrwbm1#

可能这很有用(用scala编写)

val df = Seq(
      ("state1", 1), ("state1", 2), ("state1", 3), ("state1", 4), ("state1", 5),
      ("state2", 1), ("state2", 2), ("state2", 3), ("state2", 4), ("state2", 5),
      ("state3", 1), ("state3", 2), ("state3", 3), ("state3", 4), ("state3", 5),
      ("state4", 1), ("state4", 2), ("state4", 3), ("state4", 4), ("state4", 5),
      ("state5", 1), ("state5", 2), ("state5", 3), ("state5", 4), ("state5", 5)
    ).toDF("state", "voter_id")

    // sample 3 voters for each state

    val voterIdsToSample: Double = 3 // put the records to sample for each stat
    // find distinct state
   val stateMap = df.groupBy("state").count().collect()
      .map(r => (r.getAs[String]("state"), r.getAs[Long]("count"))).toMap

    val fractions = collection.mutable.Map(stateMap.mapValues(voterIdsToSample/_).toSeq: _*)

    val sampleDF = df.rdd.map(r => (r.getAs[String]("state"), r.getAs[Int]("voter_id")))
      .sampleByKeyExact(withReplacement = false, fractions = fractions)
      .toDF("state", "voter_id")

    sampleDF.show(100, false)
    sampleDF.printSchema()
/**
      * +------+--------+
      * |state |voter_id|
      * +------+--------+
      * |state1|3       |
      * |state1|4       |
      * |state1|5       |
      * |state2|1       |
      * |state2|2       |
      * |state2|4       |
      * |state3|1       |
      * |state3|3       |
      * |state3|5       |
      * |state4|2       |
      * |state4|4       |
      * |state4|5       |
      * |state5|3       |
      * |state5|4       |
      * |state5|5       |
      * +------+--------+
      *
      * root
      * |-- state: string (nullable = true)
      * |-- voter_id: integer (nullable = false)
      */

参考-spark文件

相关问题