我是scala/spark的新手,我有案例类rdd
case class Info(key1 : String, key2 : String, key3 : String)
我想将rdd[info]转换成rdd[jsstring]并保存到elasticsearch,我使用play.api.libs和define write converter:
implicit val InfoWrites = new Writes[Info]{
def writes(i : Info): JsObject = Json.obj(
"key1" -> i.key1,
"key2" -> i.key2,
"key3" -> i.key3
)
}
然后我定义隐式类来使用save func:
implicit class Saver(rdd : RDD[Info]) {
def save() : Unit = {
rdd.map{ i => Json.toJson(i).toString }.saveJsonToEs("resource"))
}
}
所以我可以用
infoRDD.save()
但是我在rdd.map()中的json.tojson()中一直出现“task not serializable”错误
我也尝试这样定义可序列化对象
object jsonUtils extends Serializable{
def toJsString(i : Info) : String = {
Json.toJson(i).toString()
}
}
rdd.map{ i => jsonUtils.toJsString(i) }
但不断出现错误“任务不可序列化”
如何更改代码?谢谢您!
1条答案
按热度按时间l3zydbqr1#
我运行了下面的代码,与您的代码类似,它对我有效:
写了相应的
Implicits
:创建了模型
Info
:创建了一个
SparkOperationsDao
要编写和创建spark上下文:运行应用程序: