我在Kafka上有这样的json消息: {"id_post":"p1", "message":"blablabla"}
我想解析消息,并打印(或用于进一步计算) message
元素。使用以下代码打印json
val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, inputGroup, topicMap)
val postStream = kafkaStream.map(_._2)
postStream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0){
rdd.foreach(record => {
println(record)
}
}
但我没办法得到一个元素。我尝试了一些json解析器,但没有成功。你知道吗?
更新:不同的json解析器有一些错误这是circe解析器的代码和输出:
val parsed_record = parse(record)
以及输出:
14:45:00,676 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
at io.circe.jawn.CirceSupportParser$$anon$1$$anon$4.add(CirceSupportParser.scala:36)
at jawn.CharBasedParser$class.parseString(CharBasedParser.scala:90)
at jawn.StringParser.parseString(StringParser.scala:15)
at jawn.Parser.rparse(Parser.scala:397)
at jawn.Parser.parse(Parser.scala:338)
at jawn.SyncParser.parse(SyncParser.scala:24)
at jawn.SupportParser$$anonfun$parseFromString$1.apply(SupportParser.scala:15)
以此类推。。在我使用 parse(record)
看起来它无法访问和/或分析字符串 record
.
如果我同时使用lift json也是一样的 parse(record)
错误输出大致相同:
16:58:20,425 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at net.liftweb.json.JsonParser$$anonfun$2.apply(JsonParser.scala:144)
at net.liftweb.json.JsonParser$$anonfun$2.apply(JsonParser.scala:141)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:80)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:45)
at net.liftweb.json.package$.parse(package.scala:40)
at SparkConsumer$$anonfun$main$1$$anonfun$apply$1.apply(SparkConsumer.scala:98)
at SparkConsumer$$anonfun$main$1$$anonfun$apply$1.apply(SparkConsumer.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
在scala.collection.abstractiterator.foreach(迭代器。scala:1157)
3条答案
按热度按时间omvjsjqw1#
从scala/apachespark中的json字符串中提取数据
下面是专业的礼仪
p1tboqfb2#
我解决了这个问题,所以我写在这里以备将来参考:
依赖,依赖,依赖!
我选择使用liftjson,但这适用于任何json解析器和/或框架。
我使用的spark版本(v1.4.1)与scala 2.10兼容,这里是pom.xml中的依赖项:
还有其他一些图书馆。我使用的是scala 2.11的lift json版本。。。这是错误的。
因此,对于将来的我,如果您正在阅读本主题:与scala版本和依赖项保持一致。在这种情况下:
llycmphe3#
你也有同样的问题。
但是我用
fastjson
.SBT dependency : // http://mvnrepository.com/artifact/com.alibaba/fastjson libraryDependencies += "com.alibaba" % "fastjson" % "1.2.12"
或Maven dependency : <!-- http://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.12</version> </dependency>
你可以试试。希望这会有帮助。