如何加载Kafka已经发布的所有记录?

pxq42qpu  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(270)

我有一个pyspark结构的流式python应用程序设置如下

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("data streaming app")\
    .getOrCreate()

data_raw = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafkahost:9092")\
    .option("subscribe", "my_topic")\
    .load()

query = data_raw.writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .trigger(processingTime="5 seconds")\
    .start()\
    .awaitTermination()

所有这些都显示了这一点

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+

19/03/04 22:00:50 INFO streaming.StreamExecution: Streaming query made progress: {
  "id" : "ab24bd30-6e2d-4c2a-92a2-ddad66906a5b",
  "runId" : "29592d76-892c-4b29-bcda-f4ef02aa1390",
  "name" : null,
  "timestamp" : "2019-03-04T22:00:49.389Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 852,
    "getBatch" : 180,
    "getOffset" : 135,
    "queryPlanning" : 107,
    "triggerExecution" : 1321,
    "walCommit" : 27
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[my_topic]]",
    "startOffset" : null,
    "endOffset" : {
      "my_topic" : {
        "0" : 303
      }
    },
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@74fad4a5"
  }
}

如你所见, my_topic 有303条信息,但我不能让它显示。其他信息包括,我正在使用合流kafka jdbc连接器来查询oracle数据库并将行存储到kafka主题中。我有一个avro模式注册表设置。如果需要,我也将共享这些属性文件。
有人知道发生了什么事吗?

zqry0prt

zqry0prt1#

作为一个流应用程序,这个spark结构流只在消息发布后读取消息。出于测试目的,我想做的是阅读主题中的所有内容。为了做到这一点,你所要做的是一个额外的选择 readStream ,即。 option("startingOffsets", "earliest") .

data_raw = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafkahost:9092")\
    .option("subscribe", "my_topic")\
    .option("startingOffsets", "earliest")
    .load()

相关问题