我使用spark从事件中心进行流处理,遇到了以下问题。对于每个传入的消息,我需要做一些计算(无状态)。计算算法是用scala编写的,效率很高,但需要预先构造一些数据结构。对象大小约为50mb,但将来可能会更大。为了不每次都给工人发对象,我做广播。然后注册一个自定义项。但这并没有帮助,批处理持续时间的增长明显超过了我们可以停留的延迟时间。我发现批处理持续时间完全取决于对象大小。出于测试的目的,我试图使对象更小,保持计算复杂度不变,并减少批处理持续时间。另外,当对象很大时,sparkui会将gc标记为红色(超过10%的工作是由于垃圾收集)。这与我对广播的理解相矛盾,即当广播一个对象时,应该将该对象下载到worker的内存中并保存在那里,而不会产生额外的开销。
我设法写了一个业务领域不可知的例子。这里,当n很小时,批处理持续时间约为0.3秒,但当n=6000(144mb)时,批处理持续时间变为1.5(x5),当n=10000时,批处理持续时间变为4秒。但是计算复杂度并不取决于对象的大小。所以,这意味着,使用广播对象有巨大的开销。请帮我找到解决办法。
// emulate large precalculated object
val n = 10000
val obj = (1 to n).map(i => (1 to n).toArray).toArray
// broadcast it to the workers (should reduce overhead during execution)
val objBd = sc.broadcast(obj)
// register UDF
val myUdf = spark.udf.register("myUdf", (num: Int) => {
// emulate very efficient algorithm that requires large data structure
var i = (num+1)/(num+1)
objBd.value(i)(i)
})
// do stream processing
spark.readStream
.format("rate")
.option("rowsPerSecond", 300)
.load()
.withColumn("result", myUdf($"value"))
.writeStream
.format("memory")
.queryName("locations")
.start()
暂无答案!
目前还没有任何答案,快来回答吧!