获取nullpointerexception:
java.lang.NullPointerException
at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:604)
at org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:46)
at org.elasticsearch.client.transport.TransportProxyClient.lambda$execute$0(TransportProxyClient.java:59)
at org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:250)
at org.elasticsearch.client.transport.TransportProxyClient.execute(TransportProxyClient.java:59)
at org.elasticsearch.client.transport.TransportClient.doExecute(TransportClient.java:363)
at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:408)
at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:80)
at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:54)
我有一个场景,多个并发任务在spark streaming应用程序中的4个执行器中运行,每个执行器从kafka读取数据,准备批量并接收es索引中的一批记录。我第一次在这些批记录中得到了一个奇怪的nullpointerexception,但是在第二次运行中它们得到了成功的处理。
有人能告诉我为什么会这样吗。
2条答案
按热度按时间8xiog9wr1#
这是我正在使用的代码段,第一行是build.sbt文件中的依赖项
我使用typesafe config从属性文件读取config。我以json的形式将数据发布到kafka,因此我使用了“savejsontoes()”api,您可以在elasticsearch网站上的连接器文档中找到更多信息
mtb9vblg2#
到目前为止,我找到了一个解决方法,可以一次将记录推送到es索引,并删除了这个bulkapi(bulkapi在后台也做同样的事情)。