我正在使用Spark Streaming应用程序,我需要从一个Kafka主题中消耗数据,并需要推入另一个Kafka主题。
我已经创建了一个UDF函数,它可以执行一些内置Spark SQL/其他函数无法提供的业务逻辑
Object TestingObject Extetnds Serializble{
def userdefined_function(String:row_string):String = {
return "Data After Business Logic"
}
def main(args: Array[String]): Unit = {
kafkaStream.foreachRDD(foreachFunc = rdd => {
if (!rdd.isEmpty()) {
val df = ss.read.option("mode", "DROPMALFORMED").json(ss.createDataset(newRDD)(Encoders.STRING))
val Enricheddf = df.toJSON.foreach(row => {
val data = userdefined_function(row);
kafkaproducer.send(topicname,data)
})
}}
}
字符串
- 我知道在spark应用程序中使用UDF是非常昂贵的。* 但在我的业务逻辑中我没有其他方法,所以我应该与我的应用程序一起使用。
我的问题是如何优化Spark Scala流应用程序中的My UDF函数?
**我可以在main函数中使用UDF吗?或者我可以在foreach函数中使用UDF吗(每行)?或者我可以把UDF放在不同的类中,然后用Spark广播那个类吗?或者我应该怎么做?**有人能对此给予建议吗?提前感谢。
1条答案
按热度按时间mwyxok5s1#
我将试图澄清几点:
在你的代码中,有几个关于Spark的主要概念你应该知道:
1.您使用的是旧的Spark Streaming实现。通常,您应该使用Spark Structured Streaming API。您使用的Spark Streaming规范是基于RDD API构建的。对于每个mini-batch,您可以将传入的消息作为RDD进行操作,这里没有UDF,您将普通的Scala函数应用于每个mini-batch。
1.不要为每个minibatch创建一个新的框架。你不需要这样做。你的数据已经分布在执行器上了。你可以在foreachRDD中使用普通的Scala代码来应用任何你想要的东西,例如使用RDD的map函数。想象一下,如果你有成千上万的minibatch...
1.与UDF相关。它们非常有用,你必须考虑到它们是Spark Optimizer的黑盒,因为你可以在它们里面使用任何你想要的东西,Spark将无法检查你的代码来创建执行计划,它认为这是最有效的方式。
1.当你使用一个UDF时,Spark必须将Spark到Scala类型的数据表示序列化/并行化(能力是有代价的),反之亦然,所以有额外的代价,但这并不意味着你必须避免它们,有时它们是非常有用的。除了额外的GC开销。所以避免在它们内部使用沉重的对象,例如使用普通数组,而不是元组或大case类。