如何在spark结构化流媒体中读取特定的kafka分区

dfty9e19  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(255)

我的Kafka主题有三个分区,我想知道我是否可以从三个分区中的一个分区中读取。我的消费者是spark结构化流媒体应用程序。
下面是我现有的Kafka设置在Spark。

val inputDf = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", topic)
  .option("startingOffsets", "latest")
  .load()
xytpbqjk

xytpbqjk1#

类似地,如何写入特定分区。我试过了,但没用。

someDF
          .selectExpr("key", "value")
          .writeStream
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaServers)
          .option("topic", "someTopic")
          .option("partition", partIdx)
          .start()
jslywgbw

jslywgbw2#

下面是如何读取特定分区。

val inputDf = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("assign", """{"topic":[0]}""") 
  .option("startingOffsets", "latest")
  .load()

ps:从多个分区而不是1-->“”“{”topic“:[0,1,2..n]}”读取

相关问题