使用Scala在spark streaming应用程序中编写优化的UDF的最佳方法是什么?

oyjwcjzk  于 5个月前  发布在  Scala
关注(0)|答案(1)|浏览(79)

我正在使用Spark Streaming应用程序,我需要从一个Kafka主题中消耗数据,并需要推入另一个Kafka主题。

我已经创建了一个UDF函数,它可以执行一些内置Spark SQL/其他函数无法提供的业务逻辑

Object TestingObject Extetnds Serializble{

def userdefined_function(String:row_string):String = {
return "Data After Business Logic"
}

def main(args: Array[String]): Unit = {
kafkaStream.foreachRDD(foreachFunc = rdd => {
      if (!rdd.isEmpty()) {
val df = ss.read.option("mode", "DROPMALFORMED").json(ss.createDataset(newRDD)(Encoders.STRING))
        val Enricheddf = df.toJSON.foreach(row => {
val data = userdefined_function(row);
kafkaproducer.send(topicname,data)
})
}}
}

字符串

  • 我知道在spark应用程序中使用UDF是非常昂贵的。* 但在我的业务逻辑中我没有其他方法,所以我应该与我的应用程序一起使用。

我的问题是如何优化Spark Scala流应用程序中的My UDF函数?

**我可以在main函数中使用UDF吗?或者我可以在foreach函数中使用UDF吗(每行)?或者我可以把UDF放在不同的类中,然后用Spark广播那个类吗?或者我应该怎么做?**有人能对此给予建议吗?提前感谢。

mwyxok5s

mwyxok5s1#

我将试图澄清几点:
在你的代码中,有几个关于Spark的主要概念你应该知道:

  1. main函数,像其他语言一样,是应用程序的入口点,所以你的问题是,如果你可以在main函数中使用UDF,是的,你可以在那里使用任何你想要的东西。
  2. UDF的概念应用于Spark SQL世界,这意味着这个概念与Spark数据框架密切相关。
    1.您使用的是旧的Spark Streaming实现。通常,您应该使用Spark Structured Streaming API。您使用的Spark Streaming规范是基于RDD API构建的。对于每个mini-batch,您可以将传入的消息作为RDD进行操作,这里没有UDF,您将普通的Scala函数应用于每个mini-batch。
    1.不要为每个minibatch创建一个新的框架。你不需要这样做。你的数据已经分布在执行器上了。你可以在foreachRDD中使用普通的Scala代码来应用任何你想要的东西,例如使用RDD的map函数。想象一下,如果你有成千上万的minibatch...
    1.与UDF相关。它们非常有用,你必须考虑到它们是Spark Optimizer的黑盒,因为你可以在它们里面使用任何你想要的东西,Spark将无法检查你的代码来创建执行计划,它认为这是最有效的方式。
    1.当你使用一个UDF时,Spark必须将Spark到Scala类型的数据表示序列化/并行化(能力是有代价的),反之亦然,所以有额外的代价,但这并不意味着你必须避免它们,有时它们是非常有用的。除了额外的GC开销。所以避免在它们内部使用沉重的对象,例如使用普通数组,而不是元组或大case类。

相关问题