如何通过datastremapi或flinktableapi/sql将给定键和公共窗口上的三个或更多数据流/表连接起来?

rwqw0loc  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(153)

我想在一个给定的键和一个公共窗口上连接三个或更多的数据流或表。但是我不知道如何正确地编写代码。官方文件https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/ 给出下面的例子,但是它只是连接两个数据流,那么如何在一个给定的键和一个公共窗口上连接三个或更多的数据流呢?

dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});

我试图弄清楚我首先用公共窗口连接两个数据流,然后用结果数据流连接第三个数据流和公共窗口?然而,当我们将timecharacteristic设置为eventtime时,这三个数据流的事件时间的语义似乎会发生变化。

对于flink表api和sql,同样的问题是,如何在给定的键和公共窗口上连接三个或更多的表?官方文件https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sql.html 只需给出下面的例子就可以了。

Table result1 = tableEnv.sqlQuery(
"SELECT user, " +
"  TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,  " +
"  SUM(amount) FROM Orders " +
"GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");

我尝试编写下面的sql来连接给定键和公共窗口上的三个表,但是我认为这不对。

String SQL = "SELECT" +
            " grades.user1  , SUM(salaries.amount)   FROM grades " +
            " INNER JOIN salaries ON   grades.user1 =   salaries.user1 " +
            " INNER JOIN person ON   grades.user1 =   person.user1 "+
             "GROUP BY grades.user1, TUMBLE(grades.proctime,  INTERVAL '5' SECOND) "

那么,通过datastremapi或flinktableapi/sql将给定键和公共窗口上的三个或更多数据流/表连接起来的正确方法是什么呢?
2018年6月16日更新,使问题更清楚。
对于flink sql,我所需要的,就像下面的伪代码一样,是用一个公共tumblingeventtimewindow连接三个表,也就是说,数据流api的替代版本,不管是用flink sql表示的,也意味着连接三个表中的所有事件,这发生在同一个tumblingeventtimewindow中。

SELECT A.a, B.b, C.c
FROM A, B, C
WHERE A.x = B.x AND A.x = C.x AND
window(TumblingEventTimeWindows.of(Time.seconds(3))

在下面的flink设计文档中似乎也提到了join特性:“event time tumbling windowed stream-stream-joins:joins tuple of two streams that are in the same tumbling event time window”,我不知道flink sql是否实现了这种类型的flink sql连接特性。
https://docs.google.com/document/d/1tlayjnotble_-m1rqfga6ouj1oysfqrjpcp1h2tvqdi/edit#

laximzn5

laximzn51#

很难对您的问题给出明确的答案,因为您需要的连接的语义并不清楚。datastreamapi的窗口连接实现的语义不同于表api/sql的窗口连接。
在datastream api上,您只需定义另一个联接,如下所示:

firstStream
  .join(secondStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...})
  .join(thirdStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...})

由于flink实现了标准sql,因此可以像往常一样定义三个表的联接:

SELECT A.a, B.b, C.c
  FROM A, B, C
  WHERE A.x = B.x AND A.x = C.x AND
        A.ts BETWEEN B.ts - INTERVAL '10' MINUTE AND B.ts + INTERVAL '10' MINUTE AND
        A.ts BETWEEN C.ts - INTERVAL '10' MINUTE AND C.ts + INTERVAL '10' MINUTE

窗口范围( A.ts BETWEEN B.ts - X AND B.ts + Y) 可根据需要定义。

相关问题