我对scala spark和mongo很陌生。同时尝试用以下代码通过spark将一些数据加载到mongodb。
import com.mongodb.spark.config.WriteConfig
import com.mongodb.spark.toDocumentRDDFunctions
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.Document
object MongoTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName(this.getClass.getSimpleName)
.getOrCreate()
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).set("spark.driver.allowMultipleContexts", "true")
val sc = new SparkContext(conf)
val documents = sc.parallelize((1 to 10).map(i => Document.parse(s"{test: $i}")))
documents.saveToMongoDB(WriteConfig(Map("spark.mongodb.output.uri" -> "mongodb://127.0.0.1:27017/sampledb.testMongo")))
}
}
出现错误,我的spark提交失败,错误如下:
java.lang.NoSuchMethodError: com.mongodb.Mongo.<init>(Lcom/mongodb/MongoClientURI;)V
at com.mongodb.MongoClient.<init>(MongoClient.java:328)
at com.mongodb.spark.connection.DefaultMongoClientFactory.create(DefaultMongoClientFactory.scala:43)
at com.mongodb.spark.connection.MongoClientCache.acquire(MongoClientCache.scala:55)
at com.mongodb.spark.MongoConnector.acquireClient(MongoConnector.scala:239)
at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:152)
at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184)
at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:116)
at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:115)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我使用spark版本2.4.0和scala版本2.11.12。你知道我错在哪里吗。?
暂无答案!
目前还没有任何答案,快来回答吧!