当我使用streamingquerylistener监视结构化流时,在onqueryprogress上发现了重复的
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
if(queryProgress.progress.numInputRows!=0) {
println("Query made progress: " + queryProgress.progress)
}
结果是
Query made progress: {
"id" : "e76a8789-738c-49f6-b7f4-d85356c28600",
"runId" : "d8ce0fad-db38-4566-9198-90169efeb2d8",
"name" : null,
"timestamp" : "2017-08-15T07:28:27.077Z",
"numInputRows" : 1,
"processedRowsPerSecond" : 0.3050640634533252,
"durationMs" : {
"addBatch" : 2452,
"getBatch" : 461,
"queryPlanning" : 276,
"triggerExecution" : 3278
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[test1]]",
"startOffset" : {
"test1" : {
"0" : 19
}
},
"endOffset" : {
"test1" : {
"0" : 20
}
},
"numInputRows" : 1,
"processedRowsPerSecond" : 0.3050640634533252
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ForeachSink@3ec8a100"
}
}
Query made progress: {
"id" : "a5b1f905-5575-43a7-afe9-dead0e4de2a7",
"runId" : "8caea640-8772-4aab-ab13-84c1e952fb77",
"name" : null,
"timestamp" : "2017-08-15T07:28:27.075Z",
"numInputRows" : 1,
"processedRowsPerSecond" : 0.272108843537415,
"durationMs" : {
"addBatch" : 2844,
"getBatch" : 445,
"queryPlanning" : 293,
"triggerExecution" : 3672
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[test1]]",
"startOffset" : {
"test1" : {
"0" : 19
}
},
"endOffset" : {
"test1" : {
"0" : 20
}
},
"numInputRows" : 1,
"processedRowsPerSecond" : 0.272108843537415
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ForeachSink@6953f971"
}
}
为什么我发一条消息,那么它有两个不同的结果。
我的主要程序问题是我应该使用spark每5分钟校准一次数据,比如00:00-00:05,00:05-00:10等等。一天有288点到卡尔。
所以我的想法是用结构化流来过滤特定的数据,而不是过滤存储数据库,下次再把数据库和结构化流一起读取
所以我应该听每一批更新我的时间来阅读数据库。
1条答案
按热度按时间cld4siwp1#
这两个事件来自不同的查询。你可以看到
id
以及runId
它们是不同的。