spark elasticsearch配置-从spark读取ElasticSearch

cclgggtu  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(597)

我试图通过spark scala读取elasticsearch的数据。我看到很多帖子都在讨论这个问题,我尝试了他们在不同帖子中提到的所有选项,但似乎没有什么对我有效

JAR Used - elasticsearch-hadoop-5.6.8.jar (Used elasticsearch-spark-5.6.8.jar too without any success)
Elastic Search Version - 5.6.8
Spark - 2.3.0
Scala - 2.11

代码:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._

import org.elasticsearch.spark._ 

val spark = SparkSession.builder.appName("elasticSpark").master("local[*]").getOrCreate()

val reader = spark.read.format("org.elasticsearch.spark.sql").option("es.index.auto.create", "true").option("spark.serializer", "org.apache.spark.serializer.KryoSerializer").option("es.port", "9200").option("es.nodes", "xxxxxxxxx").option("es.nodes.wan.only", "true").option("es.net.http.auth.user","xxxxxx").option("es.net.http.auth.pass", "xxxxxxxx")

val read = reader.load("index/type")

Error:
ERROR rest.NetworkClient: Node [xxxxxxxxx:9200] failed (The server xxxxxxxxxxxxx failed to respond); no other nodes left - aborting...
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
  at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:294)
  at org.elasticsearch.spark.sql.SchemaUtils$.discoverMappingAndGeoFields(SchemaUtils.scala:98)
  at org.elasticsearch.spark.sql.SchemaUtils$.discoverMapping(SchemaUtils.scala:91)
  at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:129)
  at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:129)
  at org.elasticsearch.spark.sql.ElasticsearchRelation$$anonfun$schema$1.apply(DefaultSource.scala:133)
  at org.elasticsearch.spark.sql.ElasticsearchRelation$$anonfun$schema$1.apply(DefaultSource.scala:133)
  at scala.Option.getOrElse(Option.scala:121)
  at org.elasticsearch.spark.sql.ElasticsearchRelation.schema(DefaultSource.scala:133)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:432)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
  ... 53 elided
Caused by: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[xxxxxxxxxxx:9200]]
  at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:149)
  at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:461)
  at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:425)
  at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:429)
  at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:155)
  at org.elasticsearch.hadoop.rest.RestClient.remoteEsVersion(RestClient.java:655)
  at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:287)
  ... 65 more

除此之外,我还尝试了以下属性,但没有成功:

option("es.net.ssl.cert.allow.self.signed", "true")
option("es.net.ssl.truststore.location", "<path for elasticsearch cert file>")
option("es.net.ssl.truststore.pass", "xxxxxx")

请注意elasticsearch节点位于unix边缘节点内,并且http://:9200(仅在代码有任何差异时提及)
我错过了什么?还有其他的房产吗?请帮忙

k2arahey

k2arahey1#

使用支持spark2+版本的below-jar代替弹性hadoop或弹性spark-jar。
https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20_2.11/5.6.8

相关问题