为什么我的spark水印演示不起作用?

vx6bjr1n  于 2021-07-09  发布在  Spark
关注(0)|答案(0)|浏览(204)

我想运行一个带有水印的流结构,但似乎不起作用:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
val in=spark.readStream.schema("id INT,time TIMESTAMP").csv("data/sensor/*.txt").withWatermark("time","1 seconds")
in.createOrReplaceTempView("in")
val in2=spark.sql("select id,window,count(1),collect_list(concat(time)) from in group by id,window(time,'5 second') order by window,id")
val writer=in2.writeStream.format("console").option("truncate","false").outputMode("complete").trigger(Trigger.ProcessingTime("1 second")).option("checkpointLocation","chktmp5")
val stream=writer.start

我使用带有csv的文件夹作为流媒体源,我将在特殊处理时间生成csv,计划如下:

eventtime processtime
8:57:01         8:57:11
8:57:03         8:57:13
8:57:01         8:57:25

结束后,您可以将所有处理时间视为文件时间:

me@ubuntu:~/Downloads/spark-3.0.1-bin-hadoop2.7/bin/data/sensor$ ls --full-time
total 24
-rw-rw-r-- 1 yz yz  22 2021-03-25 08:57:11.811041590 -0700 1.txt
-rw-rw-r-- 1 yz yz  22 2021-03-25 08:57:13.811052616 -0700 2.txt
-rw-rw-r-- 1 yz yz  22 2021-03-25 08:57:25.815118132 -0700 3.txt

1.txt包含:

3,2021-03-25 08:57:01

2.txt包含:

2,2021-03-25 08:57:03

3.txt包含:

3,2021-03-25 08:57:01

显然,事件时间为08:57:01,处理时间为08:57:25的事件太晚了,应该放弃,但实际上没有,因为我使用的是控制台接收器,我可以看到:

-------------------------------------------
Batch: 2
-------------------------------------------
+---+------------------------------------------+--------+------------------------------------------+
|id |window                                    |count(1)|collect_list(concat(CAST(time AS STRING)))|
+---+------------------------------------------+--------+------------------------------------------+
|2  |[2021-03-25 08:57:00, 2021-03-25 08:57:05]|1       |[2021-03-25 08:57:03]                     |
|3  |[2021-03-25 08:57:00, 2021-03-25 08:57:05]|2       |[2021-03-25 08:57:01,2021-03-25 08:57:01] |

+---+------------------------------------------+--------+------------------------------------------+

所以我觉得水印不起作用,为什么?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题