onehotcoder与流式Dataframe

yxyvkwin  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(168)

我想将onehotencoder应用于流式Dataframe中的多个列,但出现以下错误。有什么建议吗?
非常感谢!

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources 
 must be executed with writeStream.start();;

代码://读取csv

val Stream = spark.read
  .format("csv")
  .option("header", "true")
  .option("delimiter", ";")
  .option("header", "true")
  .schema(DFschema)
  .load("C:/[...]"/

//Kafka

val properties = new Properties()
   //val topic = "mongotest"
   properties.put("bootstrap.servers", "localhost:9092")
   properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
   properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

   Stream.selectExpr("CAST(Col AS STRING) AS KEY", 
   "to_json(struct(*)) AS value")
   .writeStream
   .format("kafka")
   .option("topic", "predict")
   .option("kafka.bootstrap.servers", "localhost:9092")
   .option("checkpointLocation", "C:[...]")
   .start()

订阅主题

val lines = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "predict")
  .load()

  val df = Stream
  .selectExpr("CAST(value AS STRING)")
  val jsons = df.select(from_json($"value", DFschema) as "data").select("data.*")

etl[…]
将函数bucketizer()应用于字段

val Msplits = Array(Double.NegativeInfinity,7, 14, 21, Double.PositiveInfinity)
  val bucketizerM = new Bucketizer()
  .setInputCol("MEASURE")
  .setOutputCol("MEASURE_c")
  .setSplits(Msplits)

  val bucketedData1 = bucketizerD.transform(out)
  val bucketedData2 = bucketizerM.transform(bucketedData1) # Works

使用onehotencoder()时出错

val indexer = new StringIndexer()
  .setInputCol("CODE")
  .setOutputCol("CODE_index")

  val encoder = new OneHotEncoder()
  .setInputCol("CODE")
  .setOutputCol("CODE_encoded")

  val vectorAssembler = new VectorAssembler()
 .setInputCols(Array("A","B", "CODE_encoded"))
 .setOutputCol("features")

  val transformationPipeline = new Pipeline()
 .setStages(Array(indexer, encoder, vectorAssembler))

  val fittedPipeline = transformationPipeline.fit(bucketedData2) # Does't work

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题