apacheflink:如何使用表api对每n行进行分组?

nvbavucw  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(287)

最近我尝试使用apacheflink进行快速批处理。我有一张table和一张tablecolumn:value and 不相关的索引列
基本上我想计算每5行值的平均值和范围。然后我将根据我刚刚计算的平均值来计算平均值和标准差。所以我想最好的办法就是 Tumble Windows。
看起来像这样

DataSet<Tuple2<Double, Integer>> rawData = {get the source data};
Table table = tableEnvironment.fromDataSet(rawData);
Table groupedTable = table
            .window(Tumble.over("5.rows").on({what should I write?}).as("w")
            .groupBy("w")
            .select("f0.avg, f0.max-f0.min");

{The next step is to use groupedTable to calculate overall mean and stdDev}

但我不知道该写什么 .on() . 我试过了 "proctime" 但它表示,目前还没有这样的投入。我只希望它在从源代码读取时按顺序分组。但它必须是一个时间属性,所以我不能使用 "f2" -索引列也按顺序排列。
我必须添加时间戳才能这样做吗?在批处理中有必要吗?它会减慢计算速度吗?解决这个问题的最好办法是什么?
更新:我试着在表api中使用滑动窗口,结果得到了一个异常。

// Calculate mean value in each group
    Table groupedTable = table
            .groupBy("f0")
            .select("f0.cast(LONG) as groupNum, f1.avg as avg")
            .orderBy("groupNum");

//Calculate moving range of group Mean using sliding window
    Table movingRangeTable = groupedTable
            .window(Slide.over("2.rows").every("1.rows").on("groupNum").as("w"))
            .groupBy("w")
            .select("groupNum.max as groupNumB, (avg.max - avg.min) as MR");

例外情况是:
线程“main”java.lang.unsupportedoperationexception中出现异常:当前不支持事件时间上的计数滑动组窗口。
在org.apache.flink.table.plan.nodes.dataset.datasetwindowaggegate.createeventtimeslidingwindowdataset(datasetwindowaggegate。scala:456)
在org.apache.flink.table.plan.nodes.datasetwindowaggregate.translatetoplan(datasetwindowaggregate。scala:139)
...
这是否意味着表api中不支持滑动窗口?如果我没记错的话,dataset api中没有窗口函数。那么如何计算间歇过程中的移动范围呢?

drnojrws

drnojrws1#

这个 window 子句用于定义基于窗口函数的分组,例如 Tumble 或者 Session . 表api(或sql)中没有很好地定义每5行分组一次,除非指定行的顺序。这是在 on 合同条款 Tumble 功能。由于此功能源自流处理,因此 on 子句需要timestamp属性。
您可以使用 currentTimestamp() 功能。但是,我应该指出,flink将对数据进行排序,因为它不知道函数的单调性。此外,所有这些都将使用1的并行性,因为没有允许分区的子句。
或者,还可以实现一个用户定义的标量函数,将索引属性转换为时间戳(实际上是一个长值)。不过,Flink会做一个完整的数据排序。

相关问题