我正在想办法写我自己的 WindowFunction
但是有问题,我不知道为什么。我遇到的问题是apply函数,因为它无法识别 MyWindowFunction
作为一个有效的输入,所以我不能编译。我正在传输的数据包含 (timestamp,x,y)
其中x和y分别为0和1。 extractTupleWithoutTs
只返回一个元组 (x,y)
. 我已经用简单的求和函数和reduce函数成功地运行了代码。感谢您的帮助:)使用flink 1.3
进口:
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
其余代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = env.socketTextStream("localhost", 9999).assignTimestampsAndWatermarks(new TsExtractor)
val tuple = text.map( str => extractTupleWithoutTs(str))
val counts = tuple.keyBy(0).timeWindow(Time.seconds(5)).apply(new MyWindowFunction())
counts.print()
env.execute("Window Stream")
mywindow函数,它基本上是复制粘贴从示例中修改的类型。
class MyWindowFunction extends WindowFunction[(Int, Int), Int, Int, TimeWindow] {
def apply(key: Int, window: TimeWindow, input: Iterable[(Int, Int)], out: Collector[Int]): () = {
var count = 0
for (in <- input) {
count = count + 1
}
out.collect(count)
}
}
1条答案
按热度按时间byqmnocz1#
问题是第三类参数
WindowFunction
,即键的类型。在keyBy
方法(keyBy(0)
). 因此,不能在编译时确定键的类型。同样的问题也会出现,如果将键声明为字符串,即。,keyBy("f0")
.有两种解决方法:
使用
KeySelector
中的函数keyBy
取出钥匙(例如keyBy(_._1)
). 的返回类型KeySelector
函数在编译时是已知的,这样您就可以使用正确类型的WindowFunction
带着一个Int
钥匙。更改的第三个类型参数的类型
WindowFunction
至org.apache.flink.api.java.tuple.Tuple
,即。,WindowFunction[(Int, Int), Int, org.apache.flink.api.java.tuple.Tuple, TimeWindow]
.Tuple
是由提取的密钥的通用持有者keyBy
. 在你的情况下,这将是一个org.apache.flink.api.java.tuple.Tuple1
. 在WindowFunction.apply()
你可以投Tuple
至Tuple1
并通过Tuple1.f0
.