spark流:写入从kafka主题读取的行数

ktecyv1j  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(365)

spark流媒体工作是从繁忙的Kafka主题中读取事件。为了了解每个触发器间隔有多少数据进入,我只想输出从主题读取的行数。我尝试了多种方法,但都想不出来。

Dataset<Row> stream = sparkSession.readStream()
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaBootstrapServersString)
          .option("subscribe", topic)
          .option("startingOffsets", "latest")
          .option("enable.auto.commit", false)
//          .option("failOnDataLoss", false)
//          .option("maxOffsetsPerTrigger", 10000)
          .load();
      stream.selectExpr("topic").agg(count("topic")).as("count");
      //stream.selectExpr("topic").groupBy("topic").agg(count(col("topic")).as("count"));
      stream.writeStream()
            .format("console")
            .option("truncate", false)
            .trigger(Trigger.ProcessingTime("10 seconds"))
            .start();
r1zhe5dt

r1zhe5dt1#

看来你需要

stream = stream.selectExpr("topic").agg(count("topic")).as("count");

然后你可以打印出来

相关问题