将kafkajson消息转换为case类,并使用使用playjson的rdd方法将其保存到cassandra

yacmzcpb  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(213)

我是spark流媒体的新手。我的要求是:我有以下格式的json:

{"brand":"blowfish","category":"shoes","description":"Blowfish Women's Hiro Whiskey Dcpu Ankle-high Synthetic Sandal - 8m","mode":"visa signature","orditems":"3","productcode":"404e242a-070f-4fbb-a271-489c92049b05","unitprice":"7630"}

现在我的项目需求是使用playjson和rdd方法,我必须将其存储到cassandra表中。table:transction_table. keyspace:krios .i 我能够实现它使用Dataframe。但根据设计,我们必须遵循rdd方法和案例类。
在把json转换成case类之后,我没有得到“savetocassandra”方法。
这是我的示例代码:

import TransctionCoreClass.TransctionData
import com.datastax.spark.connector.cql.CassandraConnector
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import play.api.libs.json._
import play.api.libs.functional.syntax._
import com.datastax.spark.connector.streaming._
import play.api.libs.functional.syntax._
import play.api.libs.json.{JsPath, Json, Reads, Writes}

object TransctionCoreRDDFormat {

  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("org.apache.spark.streaming.kafka010").setLevel(Level.OFF)
 case class TransctionData(productcode:String,description:String,brand:String,category:String,unitprice:String,orditems:String,mode:String)

  val transactionSchema = StructType(Array(
    StructField("brand", StringType, true),
    StructField("category", StringType, true),
    StructField("description", StringType, true),
    StructField("mode", StringType, true),
    StructField("ordItems", DoubleType, true),
    StructField("productCode", StringType, true),
    StructField("unitPrice", StringType, true)))

  val spark = SparkSession.builder
    .master("local")
    .appName("TransctionReceiver")
    .getOrCreate();
  val ssc = new StreamingContext(spark.sparkContext, Seconds(30))
  import spark.implicits._

  ssc.sparkContext.setLogLevel("ERROR")
  val topics = List("-first_topic")
  val configuration = Configuration.apply(ConfigFactory.load.resolve)
  val productStreanm=
    KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics,buildKafkaProperties(configuration)))

  productStreanm.foreachRDD{
    rddRaw=>{
      println("Entrringgg")
      val transrdd = rddRaw.map(x=>convertToCaseClass(x.value().toString))
 ===============Now how to store it in cassandra?????============ as I am  not getting option to saveToCassnadra method available as part of transrdd.

    }

  }

  implicit val transctionReads:Reads[TransctionData]={
    (

      ( JsPath \ "brand").read[String] and
        (JsPath \ "category").read[String] and
        (JsPath \ "description").read[String] and
        (JsPath \ "mode").read[String] and
        (JsPath \ "ordItems").read[String] and
        (JsPath \ "productCode").read[String] and
        (JsPath \ "unitPrice").read[String]

      )(TransctionData.apply _)
  }

  implicit val transctionFormat:Writes[TransctionData]=new Writes[TransctionData]{
    def writes(trns: TransctionData)=Json.obj(
      "brand" -> JsString(trns.brand),
      "category" -> JsString(trns.category),
      "description" -> JsString(trns.description),
      "mode" -> JsString(trns.mode),
      "ordItems" -> JsString(trns.orditems),
      "productCode" -> JsString(trns.productcode),
      "unitPrice" -> JsString(trns.unitprice)

    )
  }

  val cassnadraConnector=CassandraConnector(
    ssc.sparkContext.getConf
      .set("spark.cassandra.connection.host","localhost")
      .set("spark.cassandra.connection.port","9042")
      .set("spark.cassandra.auth.username","cassandra")
      .set("spark.cassandra.auth.password","cassandra")
  )

  def buildKafkaProperties(configuration: Configuration)=Map (

    "bootstrap.servers" -> configuration.kafka.brokerList,
    "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
    "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
    "auto.offset.reset" ->"earliest",
    "offset" ->"20",
    //"max.poll.records"->20,
    //  "enable.auto.commit" ->false,
    "group.id" ->"grou1pu774u77i3u8"
  )

  def convertToCaseClass(data:String):Option[TransctionData]= {

    val parsedJsValue = Json.parse(data)
    val parsed = Json.fromJson[TransctionData](parsedJsValue)
    parsed
    parsed match{

      case s @ JsSuccess(_,_) => s.asOpt
      case e :JsError         =>None

    }

  }

  ssc.start()
  // println(s"==========Spark context Sarted ]======${ssc.sparkContext.appName}")
  ssc.awaitTermination()

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题