我正在尝试将Stream处理数据(结构化流数据)写入mongodb。我可以把Kafka的数据发送给spark,我可以处理这些数据。但我不能把数据从spark发送到mongodb。我得到了这个错误
<console>:55: error: value map is not a member of org.apache.spark.sql.Row
val df3 = rdd.map({ case (word: String, count: Int)
import org.apache.spark.sql.{functions => f}
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Logger, Level}
import org.apache.spark.sql.streaming.Trigger
import com.mongodb.spark._
import com.mongodb.spark.sql._
import org.apache.spark.streaming._
import com.mongodb.spark.config._
import com.mongodb.spark.MongoSpark
val spark = SparkSession.builder().master("localhost").appName("ReadFromKafka").getOrCreate()
import spark.implicits._
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "IP:9092").option("subscribe", "test").load()
val df2 = df.select($"key".cast(StringType), $"value".cast(StringType))
val df3 = df2.select($"value").as[String].flatMap(_.split("\\W+")).groupBy("value").count().sort(f.desc("count"))
case class WordCount(word: String, count: Int)
df3.foreach({ rdd =>
import spark.implicits._
val df3 = rdd.map({ case (word: String, count: Int)
=> WordCount(word, count) }).toDF()
df3.write.mode("append").mongo()
})
暂无答案!
目前还没有任何答案,快来回答吧!