将scala case类传输到rdd.map func中的jsvalue,但任务不可序列化

pgvzfuti  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(312)

我是scala/spark的新手,我有案例类rdd

case class Info(key1 : String, key2 : String, key3 : String)

我想将rdd[info]转换成rdd[jsstring]并保存到elasticsearch,我使用play.api.libs和define write converter:

implicit val InfoWrites = new Writes[Info]{
   def writes(i : Info): JsObject = Json.obj( 
     "key1" -> i.key1,
     "key2" -> i.key2,
     "key3" -> i.key3
   )
}

然后我定义隐式类来使用save func:

implicit class Saver(rdd : RDD[Info]) {
    def save() : Unit = {
       rdd.map{ i => Json.toJson(i).toString }.saveJsonToEs("resource"))
    }
}

所以我可以用

infoRDD.save()

但是我在rdd.map()中的json.tojson()中一直出现“task not serializable”错误
我也尝试这样定义可序列化对象

object jsonUtils extends Serializable{
   def toJsString(i : Info) : String = {
       Json.toJson(i).toString()
   }
}
rdd.map{ i => jsonUtils.toJsString(i) }

但不断出现错误“任务不可序列化”
如何更改代码?谢谢您!

l3zydbqr

l3zydbqr1#

我运行了下面的代码,与您的代码类似,它对我有效:

import models.Info
import org.apache.spark.rdd.RDD
import play.api.libs.json.Json
import domain.utils.Implicits._

class CustomFunctions(rdd : RDD[Info]) {
  def save() = {
    rdd.map(i => Json.toJson(i).toString ).saveAsTextFile("/home/training/so-123")
  }

}

写了相应的 Implicits :

package domain.utils

import play.api.libs.json.{JsObject, Json, Writes}
import models.Info

class Implicits {
  implicit val InfoWrites = new Writes[Info]{
    def writes(i : Info): JsObject = Json.obj(
      "key1" -> i.key1,
      "key2" -> i.key2,
      "key3" -> i.key3
    )
  }

}

object Implicits extends  Implicits

创建了模型 Info :

package models

case class Info(key1 : String, key2 : String, key3 : String)

创建了一个 SparkOperationsDao 要编写和创建spark上下文:

package dao

import domain.utils.CustomFunctions
import models.Info
import org.apache.spark.{SparkConf, SparkContext}

class SparkOperationsDao {
  val conf:SparkConf = new SparkConf().setAppName("driverTrack").setMaster("local")
  val sc = new SparkContext(conf)

  def writeToElastic() = {
    val sample = List(Info("name1", "city1", "123"), Info("name2", "city2", "234"))
    val rdd = sc.parallelize(sample)
    val converter = new CustomFunctions(rdd)
    converter.save()
  }

}

object SparkOperationsDao extends SparkOperationsDao

运行应用程序:

import dao.SparkOperationsDao

object RapidTests extends App {
  SparkOperationsDao.writeToElastic()
    //.collect.foreach(println)

}

相关问题