writestream将数据处理到mongodb

v1uwarro  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(263)

我正在尝试将Stream处理数据(结构化流数据)写入mongodb。我可以把Kafka的数据发送给spark,我可以处理这些数据。但我不能把数据从spark发送到mongodb。我得到了这个错误

<console>:55: error: value map is not a member of org.apache.spark.sql.Row
        val df3 = rdd.map({ case (word: String, count: Int)
import org.apache.spark.sql.{functions => f}
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.SparkSession
    import org.apache.log4j.{Logger, Level}
    import org.apache.spark.sql.streaming.Trigger
    import com.mongodb.spark._
    import com.mongodb.spark.sql._
    import org.apache.spark.streaming._
    import com.mongodb.spark.config._
    import com.mongodb.spark.MongoSpark

    val spark = SparkSession.builder().master("localhost").appName("ReadFromKafka").getOrCreate()

    import spark.implicits._

    val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "IP:9092").option("subscribe", "test").load()

    val df2 = df.select($"key".cast(StringType), $"value".cast(StringType))

    val df3 = df2.select($"value").as[String].flatMap(_.split("\\W+")).groupBy("value").count().sort(f.desc("count"))   

    case class WordCount(word: String, count: Int)

    df3.foreach({ rdd =>
    import spark.implicits._
    val df3 = rdd.map({ case (word: String, count: Int)
            => WordCount(word, count) }).toDF()
    df3.write.mode("append").mongo()
    })

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题