spark对每条记录多次调用udf

x33g5p2x  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(720)

我在使用spark 1.6.1时遇到了一个奇怪的行为:我在一个包含一些输入数据的Dataframe上运行一个带有大量计算(物理模拟)的udf,并构建一个包含许多列(~40)的结果Dataframe。
奇怪的是,在这种情况下,我的udf在输入Dataframe的每条记录中被调用了不止一次(频率是1.6倍),我觉得这是不可接受的,因为它非常昂贵。如果我减少列数(例如,减少到20列),那么这种行为就会消失。
我写了一个小脚本来演示:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf

object Demo {

  case class Result(a: Double)

  def main(args: Array[String]): Unit = {

    val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[*]"))
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val numRuns = sc.accumulator(0) // to count the number of udf calls

    val myUdf = udf((i:Int) => {numRuns.add(1);Result(i.toDouble)})

    val data = sc.parallelize((1 to 100), numSlices = 5).toDF("id")

    // get results of UDF
    var results = data
      .withColumn("tmp", myUdf($"id"))
      .withColumn("result", $"tmp.a")

    // add many columns to dataframe (must depend on the UDF's result)
    for (i <- 1 to 42) {
      results=results.withColumn(s"col_$i",$"result")
    }

    // trigger action
    val res = results.collect()
    println(res.size) // prints 100

    println(numRuns.value) // prints 160

  }
}

现在,有没有办法在不减少列数的情况下解决这个问题?

k2fxgqgv

k2fxgqgv1#

大约一年前,我们也遇到了同样的问题,花了很多时间,直到我们最终弄清楚了问题所在。
我们还有一个非常昂贵的自定义项要计算,我们发现每当我们引用它的列时,它就会被反复计算。这几天前又发生在我们身上,所以我决定在这个上面打开一个bug:spark-18748
我们在这里也提出了一个问题,但现在我看到的标题不太好:试图把一个blob变成spark中的多个列
我同意tzach的观点,即以某种方式“强迫”计划计算udf。我们做得更难看,但我们不得不这样做,因为我们无法缓存()数据-数据太大了:

val df = data.withColumn("tmp", myUdf($"id"))
val results = sqlContext.createDataFrame(df.rdd, df.schema)
             .withColumn("result", $"tmp.a")

更新:
现在我看到我的jira票与另一张相连:spark-17728,它仍然没有以正确的方式处理这个问题,但它提供了一个可选的解决方案:

val results = data.withColumn("tmp", explode(array(myUdf($"id"))))
                  .withColumn("result", $"tmp.a")
wgxvkvu9

wgxvkvu92#

在较新的spark verion(2.3+)中,我们可以将UDF标记为非确定性:https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/expressions/userdefinedfunction.html#asnondeterministic():org.apache.spark.sql.expressions.userdefinedfunction
i、 e.使用

val myUdf = udf(...).asNondeterministic()

这样可以确保只调用一次udf

xt0899hw

xt0899hw3#

我不能真正解释这种行为,但很明显,查询计划以某种方式选择了一条路径,其中一些记录被计算了两次。这意味着如果我们缓存中间结果(就在应用udf之后),我们就可以“强制”spark不重新计算udf。事实上,一旦添加了缓存,它的行为就和预期的一样—udf被精确调用了100次:

// get results of UDF
var results = data
  .withColumn("tmp", myUdf($"id"))
  .withColumn("result", $"tmp.a").cache()

当然,缓存有它自己的成本(内存…),但是如果它节省了许多udf调用,那么它最终可能会对您的情况有所帮助。

相关问题