无法从spark流式处理“rdd.foreachpartition”调用函数,但复制该函数的所有行都有效

c0vxltue  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(314)

我尝试从工作节点创建一个spark rdd流,而不是首先在驱动程序处收集它。所以我创建了以下代码

def writeToKafka[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)], topic: String, keySerializerClass: String, valueSerializerClass: String, brokers: String = producerBroker) = {
    rdd.foreachPartition { partitionOfRecords =>
      val producer = new KafkaProducer[K, V](getProducerProps(keySerializerClass, valueSerializerClass, brokers))
      partitionOfRecords.foreach { message =>
        producer.send(new ProducerRecord[K, V](topic, message._1, message._2))
      }
      producer.close()
    }
  }

  def getProducerProps(keySerializerClass: String, valueSerializerClass: String, brokers: String): Properties = {
    val producerProps: Properties = new Properties
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass)
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass)
    producerProps
  }

运行此代码会导致以下异常

15/09/01 15:13:00 ERROR JobScheduler: Error running job streaming job 1441120380000 ms.3
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:805)
at com.company.opt.detector.StreamingDetector.writeToKafka(StreamingDetector.scala:84)
at com.company.opt.MyClass.MyClass$$anonfun$doStreamingWork$3.apply(MyClass.scala:47)
at com.company.opt.MyClass.MyClass$$anonfun$doStreamingWork$3.apply(MyClass.scala:47)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.company.opt.MyClass.MyClass$
Serialization stack:
- object not serializable (class: com.company.opt.MyClass.MyClass$, value: com.company.opt.MyClass.MyClass$@7e2bb5e0)
- field (class: com.company.opt.detector.StreamingDetector$$anonfun$writeToKafka$1, name: $outer, type: class com.company.opt.detector.StreamingDetector)
- object (class com.company.opt.detector.StreamingDetector$$anonfun$writeToKafka$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 21 more

但是当我从 getProducerProps 功能直接进入我的 writeToKafka 功能如下,一切正常。

def writeToKafka[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)], topic: String, keySerializerClass: String, valueSerializerClass: String, brokers: String = producerBroker) = {
    rdd.foreachPartition { partitionOfRecords =>
      val producerProps: Properties = new Properties
      producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
      producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass)
      producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass)
      val producer = new KafkaProducer[K, V](producerProps)
      partitionOfRecords.foreach { message =>
        producer.send(new ProducerRecord[K, V](topic, message._1, message._2))
      }
      producer.close()
    }
  }

有人能解释为什么会这样吗?谢谢

ftf50wuq

ftf50wuq1#

我同意maasg的回答,也许你会发现这篇文章很有趣,它探讨了如何确保闭包中的哪些数据是由spark序列化的

rdlzhqv9

rdlzhqv92#

鉴于此 getProducerProps 是封闭它的类的方法,当从闭包使用它时,它等价于do this.getProducerProps(...) . 然后问题变得明显: this 正在拉入闭包,需要与所有其他字段一起序列化。该类的某些成员不可序列化,并给出了此异常。
一个好的做法是将这些方法作为一个单独的对象:

object ProducerUtils extends Serializable {
 def getProducerProps(keySerializerClass: String, valueSerializerClass: String, brokers: String): Properties = ???
}

另一种方法是使该方法成为函数并将其赋给 val . 那么,它的价值 val 因此,不会将整个示例拉入可序列化闭包:

val producerProps: (String,String,String) => Properties = ???

相关问题