我正在尝试使用提供的elasticsearch2连接器将twitter流写入elasticsearch2.3索引
在intellij中运行我的作业可以正常工作,但是当我在本地集群上运行jar作业时,会出现以下错误:
05/09/2016 13:26:58 Job execution switched to status RUNNING.
05/09/2016 13:26:58 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to SCHEDULED
05/09/2016 13:26:58 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to DEPLOYING
05/09/2016 13:26:58 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to RUNNING
05/09/2016 13:26:59 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to FAILED
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
05/09/2016 13:26:59 Job execution switched to status FAILING.
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
05/09/2016 13:26:59 Job execution switched to status FAILED.
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541)
at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69)
at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
我在scala中的代码:
val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "elasticsearch")
config.put("node.name", "node-1")
config.put("path.home", "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2/bin")
val transports = new util.ArrayList[InetSocketAddress]
transports.add(new InetSocketAddress(InetAddress.getLocalHost(),9300))
transports.add(new InetSocketAddress(InetAddress.getLoopbackAddress(),9300))
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"),9300))
transports.add(new InetSocketAddress(InetAddress.getByName("localhost"),9300))
stream.addSink(new ElasticsearchSink(config, transports, new ElasticSearchSinkTwitter()))
从ide和本地集群运行该程序有什么区别?
1条答案
按热度按时间whlutmcx1#
像这样的问题通常是由ide(intellij,eclipse)和flink的作业提交通过fatjar管理/包含依赖关系的不同方式引起的。
前几天我遇到了同样的问题,任务管理器日志文件显示了以下根本原因:
java.lang.illegalargumentexception:名为“lucene50”的类型为org.apache.lucene.codecs.postingsformat的spi类不存在。您需要将支持此spi的相应jar文件添加到类路径中。当前类路径支持以下名称:[es090,completion090,xbloomfilter]
在搜索错误时,我找到了此答案,从而解决了问题:
https://stackoverflow.com/a/38354027/3609571
通过将以下依赖项添加到
pom.xml
:注意,在这种情况下,依赖关系的顺序很重要。它只在
lucene-core
依赖顶部。把它加到最后对我不起作用。因此,这更多的是一个“黑客”,而不是一个适当的修复。