基于kafka分区的结构化流式读取

yc0p9oo0  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(308)

我使用spark structured streaming来读取来自kafka主题的传入消息,并基于传入消息写入多个Parquet表,因此我创建了一个readstream,因为kafka源是公共的,并且为每个Parquet表在循环中创建了单独的写入流。这可以正常工作,但readstream正在创建一个瓶颈,因为对于每个writestream,它都会创建一个readstream,并且无法缓存已经读取的Dataframe。

val kafkaDf=spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", conf.servers)
      .option("subscribe", conf.topics)
      //  .option("earliestOffset","true")
      .option("failOnDataLoss",false)
      .load()

foreach table   {  
//filter the data from source based on table name
//write to parquet
 parquetDf.writeStream.format("parquet")
        .option("path", outputFolder + File.separator+ tableName)
        .option("checkpointLocation", "checkpoint_"+tableName)
        .outputMode("append")
        .trigger(Trigger.Once())
       .start()
}

现在,每个写入流都在创建一个新的消费群体,从kafka读取整个数据,然后进行过滤并写入parquet。这会造成巨大的开销。为了避免这种开销,我可以将kafka主题划分为与表数相同的分区,然后readstream应该只从给定的分区读取。但我看不到将分区细节指定为kafka读取流的一部分的方法。

vuv7lop3

vuv7lop31#

如果数据量不是那么高,写你自己的接收器,收集每个微批的数据,然后你应该能够缓存Dataframe并写到不同的位置,虽然需要一些调整,但它会工作

相关问题