java客户端无法连接到本地flink群集

gzszwxb4  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(341)

我正在试用一个带有本地flink集群的小程序,按照这里的说明进行设置。示例wordcount程序运行良好,但当我尝试运行自己的程序时,它会在连接到作业管理器时暂停并失败。这是Flink1.5和JDK1.8
代码的相关部分是

FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setStreaming(true);
options.setFlinkMaster("localhost:6123");
options.setRunner(FlinkRunner.class);

我使用start-cluster.sh启动集群,可以看到两个进程(作业和任务管理器)正在运行。Flink号上的木料不多。在客户端,打开debug之后,我可以看到以下内容 18:43:20.507 [flink-akka.actor.default-dispatcher-4] INFO akka.remote.Remoting - Remoting started; listening on addresses :[akka.tcp://flink@talonx:38183] 18:43:20.511 [main] INFO org.apache.flink.client.program.StandaloneClusterClient - Actor system started at akka.tcp://flink@talonx:38183 18:43:20.511 [main] INFO org.apache.flink.client.program.StandaloneClusterClient - Submitting job with JobID: dbf63281771465550fd3598b2b67b91f. Waiting for job completion. Submitting job with JobID: dbf63281771465550fd3598b2b67b91f. Waiting for job completion. 18:43:20.521 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Received SubmitJobAndWait(JobGraph(jobId: dbf63281771465550fd3598b2b67b91f)) but there is no connection to a JobManager yet. 18:43:20.522 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Received job test-talonx-0618131319-b721a69a (dbf63281771465550fd3598b2b67b91f). 18:43:20.523 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Disconnect from JobManager null. 过了一会儿,我在客户机上遇到了以下异常 19:03:19.396 [main] ERROR org.apache.beam.runners.flink.FlinkRunner - Pipeline execution failed org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:449) at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212) at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176) at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:126) at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:115) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) at Test.main(Test.java:106) Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager. at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300) at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:481) ... 10 common frames omitted Caused by: org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost connection to the JobManager. at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:219) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:104) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:71) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 这里可能少了什么?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题