spark流在udf中使用大型广播对象时会减慢速度

55ooxyrt  于 2021-05-24  发布在  Spark
关注(0)|答案(0)|浏览(370)

我使用spark从事件中心进行流处理,遇到了以下问题。对于每个传入的消息,我需要做一些计算(无状态)。计算算法是用scala编写的,效率很高,但需要预先构造一些数据结构。对象大小约为50mb,但将来可能会更大。为了不每次都给工人发对象,我做广播。然后注册一个自定义项。但这并没有帮助,批处理持续时间的增长明显超过了我们可以停留的延迟时间。我发现批处理持续时间完全取决于对象大小。出于测试的目的,我试图使对象更小,保持计算复杂度不变,并减少批处理持续时间。另外,当对象很大时,sparkui会将gc标记为红色(超过10%的工作是由于垃圾收集)。这与我对广播的理解相矛盾,即当广播一个对象时,应该将该对象下载到worker的内存中并保存在那里,而不会产生额外的开销。
我设法写了一个业务领域不可知的例子。这里,当n很小时,批处理持续时间约为0.3秒,但当n=6000(144mb)时,批处理持续时间变为1.5(x5),当n=10000时,批处理持续时间变为4秒。但是计算复杂度并不取决于对象的大小。所以,这意味着,使用广播对象有巨大的开销。请帮我找到解决办法。

// emulate large precalculated object
val n = 10000
val obj = (1 to n).map(i => (1 to n).toArray).toArray

// broadcast it to the workers (should reduce overhead during execution)
val objBd = sc.broadcast(obj)

// register UDF
val myUdf = spark.udf.register("myUdf", (num: Int) => {
  // emulate very efficient algorithm that requires large data structure
  var i = (num+1)/(num+1)
  objBd.value(i)(i)
})

// do stream processing
spark.readStream
  .format("rate")
  .option("rowsPerSecond", 300)
  .load()
  .withColumn("result", myUdf($"value"))
  .writeStream
  .format("memory")
  .queryName("locations")
  .start()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题