spark onqueryprogress复制

dzjeubhm  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(293)

当我使用streamingquerylistener监视结构化流时,在onqueryprogress上发现了重复的

override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {

        if(queryProgress.progress.numInputRows!=0) {

          println("Query made progress: " + queryProgress.progress)

        }

结果是

Query made progress: {
  "id" : "e76a8789-738c-49f6-b7f4-d85356c28600",
  "runId" : "d8ce0fad-db38-4566-9198-90169efeb2d8",
  "name" : null,
  "timestamp" : "2017-08-15T07:28:27.077Z",
  "numInputRows" : 1,
  "processedRowsPerSecond" : 0.3050640634533252,
  "durationMs" : {
    "addBatch" : 2452,
    "getBatch" : 461,
    "queryPlanning" : 276,
    "triggerExecution" : 3278
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[test1]]",
    "startOffset" : {
      "test1" : {
        "0" : 19
      }
    },
    "endOffset" : {
      "test1" : {
        "0" : 20
      }
    },
    "numInputRows" : 1,
    "processedRowsPerSecond" : 0.3050640634533252
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ForeachSink@3ec8a100"
  }
}
Query made progress: {
  "id" : "a5b1f905-5575-43a7-afe9-dead0e4de2a7",
  "runId" : "8caea640-8772-4aab-ab13-84c1e952fb77",
  "name" : null,
  "timestamp" : "2017-08-15T07:28:27.075Z",
  "numInputRows" : 1,
  "processedRowsPerSecond" : 0.272108843537415,
  "durationMs" : {
    "addBatch" : 2844,
    "getBatch" : 445,
    "queryPlanning" : 293,
    "triggerExecution" : 3672
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[test1]]",
    "startOffset" : {
      "test1" : {
        "0" : 19
      }
    },
    "endOffset" : {
      "test1" : {
        "0" : 20
      }
    },
    "numInputRows" : 1,
    "processedRowsPerSecond" : 0.272108843537415
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ForeachSink@6953f971"
  }
}

为什么我发一条消息,那么它有两个不同的结果。
我的主要程序问题是我应该使用spark每5分钟校准一次数据,比如00:00-00:05,00:05-00:10等等。一天有288点到卡尔。
所以我的想法是用结构化流来过滤特定的数据,而不是过滤存储数据库,下次再把数据库和结构化流一起读取
所以我应该听每一批更新我的时间来阅读数据库。

cld4siwp

cld4siwp1#

这两个事件来自不同的查询。你可以看到 id 以及 runId 它们是不同的。

相关问题