scala udf的collect\u list任务不可序列化

20jt8wwn  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(384)

我正在尝试在自定义项字段上使用collect\u list。下面是我的代码。如果我不使用自定义字段代码工作良好。。然而,使用udf派生字段会产生以下错误

Task not serializable: java.io.NotSerializableException: scala.runtime.LazyRef

class SparkEntry extends Serializable {
  def process(): Unit = {
    def modifyword = (file_path:String) => {file_path+"_"}
    val spark = SparkSession.builder().appName("spp").master("local").getOrCreate()
    spark.udf.register("customudf",modifyword)
    val someData = Seq(
      Row(8, "bat"),
      Row(9, "bat"),
      Row(64, "mouse"),
      Row(9, "mouse"),
      Row(-27, "horse"),
      Row(9, "horse")
   )
  val someSchema = List(
     StructField("number", IntegerType, true),
     StructField("word", StringType, true)
  )

  val someDF = spark.createDataFrame(
    spark.sparkContext.parallelize(someData),
    StructType(someSchema)
  )
  val new_df = someDF.withColumn("new_column",callUDF("customudf",cols = col("word")))
  new_df.show()
  val grouped_df = new_df.groupBy("word").agg(collect_list(struct(col("new_column"),col("number")))).toDF("word","combined")
 grouped_df.show()
 spark.close()

} }

bpsygsoo

bpsygsoo1#

对我来说很有效-

def modifyword = (file_path:String) => {file_path+"_"}
    val spark = SparkSession.builder().appName("spp").master("local").getOrCreate()
    spark.udf.register("customudf",modifyword)
    val someData = Seq(
      Row(8, "bat"),
      Row(9, "bat"),
      Row(64, "mouse"),
      Row(9, "mouse"),
      Row(-27, "horse"),
      Row(9, "horse")
    )
    val someSchema = List(
      StructField("number", IntegerType, true),
      StructField("word", StringType, true)
    )

    val someDF = spark.createDataFrame(
      spark.sparkContext.parallelize(someData),
      StructType(someSchema)
    )
    val new_df = someDF.withColumn("new_column",callUDF("customudf",cols = col("word")))
    new_df.show()
    val grouped_df = new_df.groupBy("word").agg(collect_list(struct(col("new_column"),col("number")))).toDF("word","combined")
    grouped_df.show()

    /**
      * +------+-----+----------+
      * |number| word|new_column|
      * +------+-----+----------+
      * |     8|  bat|      bat_|
      * |     9|  bat|      bat_|
      * |    64|mouse|    mouse_|
      * |     9|mouse|    mouse_|
      * |   -27|horse|    horse_|
      * |     9|horse|    horse_|
      * +------+-----+----------+
      *
      * +-----+--------------------+
      * | word|            combined|
      * +-----+--------------------+
      * |  bat|[[bat_, 8], [bat_...|
      * |horse|[[horse_, -27], [...|
      * |mouse|[[mouse_, 64], [m...|
      * +-----+--------------------+
      */

尝试将scala版本升级到 2.12.4 . 懒虫是 Serializable 那里

相关问题