我有一个flink应用程序,它使用kafka集群的数据并运行sql数据转换。我在emr上运行这个应用程序,当我使用java-jar选项运行时,应用程序按预期运行。
但是,使用命令flink run-m yarn cluster-yn 2-yjm 1g-ytm 2g jarfilename arg1 arg2运行Yarn时
应用程序因堆栈下跟踪而失败。
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 36 more
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.StreamCorruptedException: unexpected block data
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)
更新:
这个问题解决了。根本原因是flink的类加载器。我们最初使用的是springbootmaven插件。我们将它改为maven shade plugin,这就解决了这个问题。
参考资料:integration-apache flink+spring boot
1条答案
按热度按时间ax6ht2ek1#
你用的是哪种flink版本?您确定在群集上使用相同的版本吗?
此类(反)序列化问题通常与用于序列化和反序列化的不同版本相连接。