spark:foreachrdd,跳过行抛出不可序列化的任务(scala闭包)

jtoj6r0c  于 2021-05-30  发布在  Hadoop
关注(0)|答案(2)|浏览(333)

我有一个流hdfs文本文件的代码。但是每个文本文件包含一个标题和50行的描述。我想忽略这些行,只接收数据。
这是我的代码,但它抛出了一个sparkexception:task not serializable

val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/log")

hdfsDStream.foreachRDD(
  rdd => {
    val data = rdd.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String])
    => {
      if (partitionIdx == 0) {
        lines.drop(50)
      }
      lines
    })

    val rowRDD = data.map(_.split(",")).map(p => Row(p(0),p(1),p(2),p(3)))

    if (data.count() > 0) {
        ...
    }
  }
)
mftmpeh8

mftmpeh81#

我认为您只需要zipwithindex并过滤索引小于50的情况。

val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/log")

hdfsDstream.foreachRDD( rdd => {
  val data = rdd.zipWithIndex.filter( _._2 < 50 ).map( _._1 )

  // Now do whatever you want with your data.
} )

也。。。在这里- Row(p(0),p(1),p(2),p(3)) ,你真的需要吗 Row 突然之间?

qnzebej0

qnzebej02#

任务不可序列化错误发生在这种情况下:将函数传递给spark:引用整个对象的风险是什么?或运行apache spark作业时出现任务不可序列化异常
很可能您正在那里创建某种对象,并在rdd方法中调用其函数,从而强制引擎序列化您的对象。
不幸的是,您打印的代码部分工作得非常好,问题出在被点替换的部分。例如,这一个有效:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql._

val ssc = new StreamingContext(sc, Seconds(60))
val hdfsDStream = ssc.textFileStream("/sparkdemo/streaming")

hdfsDStream.foreachRDD(
  rdd => {
    val data = rdd.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String])
    => {
      if (partitionIdx == 0) {
        lines.drop(50)
      }
      lines
    })

    val rowRDD = data.map(_.split(",")).map(p => Row(p(0),p(1),p(2),p(3)))

    if (data.count() > 0) {
        rowRDD.take(10).foreach(println)
    }
  }
)
ssc.start()
ssc.awaitTermination()

相关问题