在apache flink中找不到方案“nfs”的文件系统实现

kninwzqo  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(689)

我使用apache flink(v1.10.0)来流式计算rabbitmq数据,但是当我配置检查点数据来存储nfs文件系统时,如下所示:

env.setStateBackend(new FsStateBackend("nfs://28736435.boston.nas.amazonaws.com:/data/k8s/data/flink/checkpoint"));

告诉我这个错误:

Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.

    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336)
    at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
    at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager

    at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    ... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager

    at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152)
    at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
    at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
    ... 7 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.

    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:282)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:205)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:486)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227)
    at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:215)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
    at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
    at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
    at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
    ... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'nfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'nfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.

    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
    at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:64)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:490)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:279)
    ... 23 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.

    at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
    ... 28 more

手册说它支持nfs文件系统来存储检查点。我在apache flink中的nfs文件系统配置哪里做错了?

tp5buhyn

tp5buhyn1#

您需要在每个示例上挂载nfs卷,并使用“file”方案,而不是“nfs”。
因为您是在aws上运行的,所以使用s3进行检查点是很自然的。如果您决定这样做,那么应该使用flink-s3-fs-presto(s3p://)而不是flink-s3-fs-hadoop(s3a://),因为hadoop实现在使用flink进行检查点时性能很差。

相关问题