从检查点重新启动flink应用程序时出错

4uqofj5v  于 2021-07-15  发布在  Flink
关注(0)|答案(0)|浏览(809)

我正在尝试从检查点还原一些flink应用程序(保存保存点总是引发异常,通常是超时),当我等待几天(少于kafka上的保留期)时,日志中会显示不同的问题。正如您在日志中看到的,rocksdb是状态后端。应用程序基本上阅读一些Kafka主题,加入它们并删除重复的内容。
应用程序日志1。

12:18:33.686 [Co-Flat Map -> Map -> Filter (4/4)] ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - Caught unexpected exception.
java.io.IOException: Stream closed
    at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) ~[?:1.8.0_272]
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:291) ~[?:1.8.0_272]
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0_272]
    at java.io.DataInputStream.read(DataInputStream.java:149) ~[?:1.8.0_272]
    at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94) ~[zdata-flink-streams.jar:0.1]
    at java.io.InputStream.read(InputStream.java:101) ~[?:1.8.0_272]
    at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:56) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) ~[zdata-flink-streams.jar:0.1]
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) ~[?:1.8.0_272]
    at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) ~[zdata-flink-streams.jar:0.1]
    at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654) ~[?:1.8.0_272]
    at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871) ~[?:1.8.0_272]
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) ~[zdata-flink-streams.jar:0.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [zdata-flink-streams.jar:0.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) [zdata-flink-streams.jar:0.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) [zdata-flink-streams.jar:0.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [zdata-flink-streams.jar:0.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [zdata-flink-streams.jar:0.1]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]

应用程序日志2。

12:13:17.137 [Flink Netty Server (0) Thread 1] ERROR  org.apache.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered error while consuming partitions
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer

flink流媒体工作正常,但重启应用程序时会出现不同的问题。日志里有什么我看不到的吗?这是我第一次和Flink一起工作。
额外信息:
所有应用都部署在aws emr上;
Flink1.11.2;
4个插槽。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题