flink on yarn意外的块数据错误

pzfprimi  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(332)

我有一个flink应用程序,它使用kafka集群的数据并运行sql数据转换。我在emr上运行这个应用程序,当我使用java-jar选项运行时,应用程序按预期运行。
但是,使用命令flink run-m yarn cluster-yn 2-yjm 1g-ytm 2g jarfilename arg1 arg2运行Yarn时
应用程序因堆栈下跟踪而失败。

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
    ... 36 more
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.StreamCorruptedException: unexpected block data
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)

更新:
这个问题解决了。根本原因是flink的类加载器。我们最初使用的是springbootmaven插件。我们将它改为maven shade plugin,这就解决了这个问题。
参考资料:integration-apache flink+spring boot

ax6ht2ek

ax6ht2ek1#

你用的是哪种flink版本?您确定在群集上使用相同的版本吗?
此类(反)序列化问题通常与用于序列化和反序列化的不同版本相连接。

相关问题