我使用的是flink的scalaapi。我在一段时间内有一些变化 reports = DataStream[Tuple15]
(the) Tuple15
是一个scala元组,所有字段 Int
). 问题在这里:
reports
.filter(_._1 == 0) // some filter
.map( x => (x._3, x._4, x._5, x._7, x._8))
(TypeInformation.of(classOf[(Int,Int,Int,Int,Int)])) // keep only 5 fields as a Tuple5
.keyBy(2,3,4) // the error is in apply, but I think related to this somehow
.timeWindow(Time.minutes(5), Time.minutes(1))
// the line under is line 107, where the error is
.apply( (tup, timeWindow, iterable, collector: Collector[(Int, Int, Int, Float)]) => {
...
})
错误状态为:
InvalidProgramException: Specifying keys via field positions is only valid for
tuple data types. Type: GenericType<scala.Tuple5>
整个错误跟踪(我标记了指向错误的行,第107行,对应于 apply
方法(按上述代码):
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: GenericType<scala.Tuple5>
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:217)
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:208)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:256)
at org.apache.flink.streaming.api.scala.DataStream.keyBy(DataStream.scala:289)
here -> at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.latestAverageVelocity(LinearRoad.scala:107)
at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.main(LinearRoad.scala:46)
at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad.main(LinearRoad.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
但这对我来说毫无意义。我用的是元组类型,不是吗?或者这是怎么回事 GenericType<...>
?
那我该怎么修 map
使 keyBy
工作?
4条答案
按热度按时间k97glaaz1#
aggregateoperator只支持flink元组。如果您面临这个问题,那么首先请检查您的导入是否是scala.tuple2,然后它是错误的。所以应该是org.apache.flink.api.java.tuple.tuple2
ffdz8vbo2#
原因是
TypeInformation
属于javaapi,因此不知道scala元组。因此,它返回一个GenericType
不能用作keyBy
野外作业。如果要手动生成scala元组类型信息,必须使用
createTypeInformation
包含在org.apache.flink.api.scala
/org.apache.flink.streaming.api.scala
包对象。但是如果导入包对象,则不需要手动指定类型信息,因为
TypeInformation
是的上下文绑定map
操作和createTypeInformation
是一个隐函数。下面的代码片段显示了处理
TypeInformations
.utugiqy63#
我也遇到了同样的问题,可以按如下方式解决:
使用FlinkAPI中的tuple2类,即[import org.apache.flink.api.java.tuple.tuple15],而不是scala.tuple15
请查看导入部分并更正。
这里我使用了flinkjavaapi。如果是scala,请导入org.apache.flink.api.scala.\包
[Apache·Flink]
nxowjjhe4#
好吧,在花了很多时间之后,我实际上是通过移除
TypeInformation
. 所以,改变这个:对此:
尽管如此,我认为这个解决方案是一种黑客攻击,因为我仍然收到来自flink的警告(嗯,信息日志):
所以,如果有更一般的答案,我很乐意接受。在那之前,这对我有用。
更新
我以前试过,但没用。我刚刚意识到,现在它的工作感谢@till的回答。所以,正如我所说的,你需要导入
org.apache.flink.streaming.api.scala.createTypeInformation
或者org.apache.flink.api.scala.createTypeInformation
(不是两者都有!)。