spark structured streaming 2.3.0中的水印

tag5nh1u  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(436)

我在spark structured streaming 2.3.0中读取了Kafka的数据。数据包含一些教师的信息,有teacherid、teachername和teacherGroupsId。teachergroupsids是一个数组列,其中包含组的ID。在我的任务中,我必须将带有组id的列Map到包含组名信息的列([1,2,3]=>[suns,books,flowers])。名称和ID存储在hbase中,可以每天更改。后来我要把数据发给另一个Kafka主题。
所以,我从两个来源读取数据-Kafka和hbase。我使用shc库从hbase读取数据。
首先,分解数组列(组id),然后使用hbase中的数据进行连接。
在下一步中,我将使用teacherid聚合数据。但是我使用的append模式不支持这个操作。
我尝试过水印技术,但目前它不起作用。我添加了一个带有时间戳的新列,并按此列分组。

Dataset<Row> inputDataset = //reading from Kafka

Dataset<Row> explodedDataset = // explode function applied and join with HBase

Dataset<Row> outputDataset = explodedDataset
.withColumn("eventTime", lit(current_timestamp()))
.withWatermark("eventTime", "2 minutes")
.groupBy(window(col("eventTime"), "5 seconds"), col("teacherId"))
.agg(collect_list(col("groupname")));

实际结果在输出端显示空Dataframe。没有任何争吵。

7lrncoxx

7lrncoxx1#

问题是 current_timestamp() .
current\u timestamp返回该时刻的时间戳,因此,如果使用此列创建Dataframe并打印结果,则打印当前时间戳,但如果处理df并打印相同的列,则打印新的时间戳。
此解决方案在本地工作,但有时在分布式系统中它会失败,因为工人在收到打印数据的命令时,此数据已经超出时间戳范围。

相关问题