加入同一个流以过滤某些事件

agyaoht7  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(217)

我有一个数据流,看起来像这样:

impressionId | id | name | eventType | timestamp

我需要过滤(忽略)类型为“click”的事件,它没有匹配类型为“impression”的“impressionid”(因此基本上忽略没有impression的clicks事件),然后计算我总共有多少个impression,以及在特定时间窗口中我有多少次单击(对于id/name对)。
我就是这样找到解决方案的:

[...]
Table eventsTable = tEnv.fromDataStream(eventStreamWithTimeStamp, "impressionId, id, name, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);

Table clicksTable = eventsTable
      .where("eventType = 'click'")
      .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
      .groupBy("impressionId, id, name, eventType, minuteWindow")
      .select("impressionId as clickImpressionId, eventType as clickEventType, concat(concat(id,'_'), name) as concatClickId, id as clickId, name as clickName, minuteWindow.rowtime as clickMinute");

Table impressionsTable = eventsTable
      .where("eventType = 'impression'")
      .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
      .groupBy("impressionId, id, name, eventType, minuteWindow")
      .select("impressionId as impressionImpressionId, eventType as impressionEventType, concat(concat(id,'_'), name) as concatImpId, id as impId, name as impName, minuteWindow.rowtime as impMinute");

Table filteredClickCount = clicksTable
      .join(impressionsTable, "clickImpressionId = impressionImpressionId && concatClickId = concatImpId && clickMinute = impMinute")
      .window(Slide.over("24.hour").every("1.minute").on("clickMinute").as("minuteWindow"))
      .groupBy("concatClickId, clickMinute")
      .select("concatClickId, concatClickId.count as clickCount, clickMinute as eventTime");
 DataStream<Test3> result = tEnv.toAppendStream(filteredClickCount, Test3.class);
result.print();

我要做的只是创建两个表,一个有点击,一个有印象,'内部'连接点击到印象和一个被连接意味着他们的点击有一个匹配的印象。
现在这不管用,我也不知道为什么!?
最后一个联接表生成的计数不正确。它在第一分钟起作用,但在那之后计数几乎减少了一倍。
然后,我尝试修改最后一个表,如下所示:

Table clickWithMatchingImpression2 = clicksTable
      .join(impressionsTable, "clickImpressionId = impressionImpressionId && concatClickId = concatImpId && clickMinute = impMinute")
      .groupBy("concatClickId, clickMinute")
      .select("concatClickId, concatClickId.count as clickCount, clickMinute as eventTime");

DataStream<Tuple3<Boolean, Tuple3>> result2 = tEnv.toRetractStream(clickWithMatchingImpression2, Test3.class);
    result2.print();

还有。。。。这管用!?但是我不知道为什么,也不知道如何处理这个数据流<tuple3<boolean,test3>>格式。。。当table没有Windows时,Flink拒绝使用toappendstream。我想要一个只有最后数字的简单结构。
1)我的方法正确吗?有没有更简单的方法过滤没有印象的点击?
2)为什么我的解决方案中的计数不正确?

0s7z1bwu

0s7z1bwu1#

我不完全确定我是否正确理解了您的用例,一个带有一些数据点的示例在这里肯定会有所帮助。
让我解释一下你的代码在做什么。首先,这两个表计算了过去24小时内有多少次点击/印象。对于输入

new Event("1", "1", "ABC", "...", 1),
new Event("1", "2", "ABC", "...", 2),
new Event("1", "3", "ABC", "...", 3),
new Event("1", "4", "ABC", "...", 4)

您将获得窗口(array,window\u start,window\u end,rowtime):

[1], 1969-12-31-01T00:01:00.000, 1970-01-01T00:01:00.000, 1970-01-01T00:00:59.999
[1, 2], 1969-12-31-01T00:02:00.000, 1970-01-01T00:02:00.000, 1970-01-01T00:01:59.999
[1, 2, 3], 1969-12-31-01T00:03:00.000, 1970-01-01T00:03:00.000, 1970-01-01T00:02:59.999
...

因此,当你在id和name上分组时,你会得到如下结果:

1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:00:59.999
1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:01:59.999
1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:02:59.999
...

如果您在24小时窗口中再次分组,您将多次计算具有相同id的每个事件。
如果我正确地理解了您的用例,并且您正在查找一次单击前后1分钟内发生了多少次印象,那么间隔连接可能就是您要查找的。您可以通过以下查询实现您的案例:

Table clicks = eventsTable
        .where($("eventType").isEqual("click"))
        .select(
                $("impressionId").as("clickImpressionId"),
                concat($("id"), "_", $("name")).as("concatClickId"),
                $("id").as("clickId"),
                $("name").as("clickName"),
                $("eventTime").as("clickEventTime")
        );

Table impressions = eventsTable
        .where($("eventType").isEqual("impression"))
        .select(
                $("impressionId").as("impressionImpressionId"),
                concat($("id"), "_", $("name")).as("concatImpressionId"),
                $("id").as("impressionId"),
                $("name").as("impressionName"),
                $("eventTime").as("impressionEventTime")
        );

Table table = impressions.join(
        clicks,
        $("clickImpressionId").isEqual($("impressionImpressionId"))
                .and(
                        $("clickEventTime").between(
                                $("impressionEventTime").minus(lit(1).minutes()),
                                $("impressionEventTime"))
                ))
        .select($("concatClickId"), $("impressionEventTime"));

table
        .window(Slide.over("24.hour").every("1.minute").on("impressionEventTime").as("minuteWindow"))
        .groupBy($("concatClickId"), $("minuteWindow"))
        .select($("concatClickId"), $("concatClickId").count())
        .execute()
        .print();

至于为什么flink有时不能产生追加流,而只能产生收回流,请参见。非常简单地说,如果基于时间属性的操作不起作用,当结果为“有效”时,就没有单一的时间点。因此,它必须发出更改流,而不是一个附加值。元组中的第一个字段告诉您记录是插入(true)还是收回/删除(false)。

相关问题