scala扩展窗函数

20jt8wwn  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(321)

我正在想办法写我自己的 WindowFunction 但是有问题,我不知道为什么。我遇到的问题是apply函数,因为它无法识别 MyWindowFunction 作为一个有效的输入,所以我不能编译。我正在传输的数据包含 (timestamp,x,y) 其中x和y分别为0和1。 extractTupleWithoutTs 只返回一个元组 (x,y) . 我已经用简单的求和函数和reduce函数成功地运行了代码。感谢您的帮助:)使用flink 1.3
进口:

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

其余代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = env.socketTextStream("localhost", 9999).assignTimestampsAndWatermarks(new TsExtractor)
val tuple = text.map( str => extractTupleWithoutTs(str))
val counts = tuple.keyBy(0).timeWindow(Time.seconds(5)).apply(new MyWindowFunction())
counts.print()
env.execute("Window Stream")

mywindow函数,它基本上是复制粘贴从示例中修改的类型。

class MyWindowFunction extends WindowFunction[(Int, Int), Int, Int, TimeWindow] {
  def apply(key: Int, window: TimeWindow, input: Iterable[(Int, Int)], out: Collector[Int]): () = {
    var count = 0
    for (in <- input) {
      count = count + 1
    }
    out.collect(count)
  }
}
byqmnocz

byqmnocz1#

问题是第三类参数 WindowFunction ,即键的类型。在 keyBy 方法( keyBy(0) ). 因此,不能在编译时确定键的类型。同样的问题也会出现,如果将键声明为字符串,即。, keyBy("f0") .
有两种解决方法:
使用 KeySelector 中的函数 keyBy 取出钥匙(例如 keyBy(_._1) ). 的返回类型 KeySelector 函数在编译时是已知的,这样您就可以使用正确类型的 WindowFunction 带着一个 Int 钥匙。
更改的第三个类型参数的类型 WindowFunctionorg.apache.flink.api.java.tuple.Tuple ,即。, WindowFunction[(Int, Int), Int, org.apache.flink.api.java.tuple.Tuple, TimeWindow] . Tuple 是由提取的密钥的通用持有者 keyBy . 在你的情况下,这将是一个 org.apache.flink.api.java.tuple.Tuple1 . 在 WindowFunction.apply() 你可以投 TupleTuple1 并通过 Tuple1.f0 .

相关问题