scala EMR - Spark版本从2.4.0升级到3.1.2导致AWS Open Search(Elasticsearch 6.7)出现写入问题

moiiocjp  于 5个月前  发布在  Scala
关注(0)|答案(1)|浏览(81)

我目前使用EMR发布版本[emr.5.23.0],即Spark 2.4.0和Spark Elastic Search Connector [elasticsearch-spark-30_2.12-7.12.0.jar]将数据写入AWS Open Search(Elasticsearch 6.7):

dataframe.coalesce(6).write.mode("append").format(
            "org.elasticsearch.spark.sql"
        ).option(
            "es.nodes.wan.only", "true"
        ).option(
            "es.nodes", es_endpoint
        ).option(
            "es.port", es_port
        ).option(
            "es.resource", "%s-{%s}/%s" % (log_index_name, "timestamp", "log")
        ).save()

字符串
这曾经工作得很好。但是当我将EMR发布版本从emr.5.23.0(Spark 2.4.0)升级到emr-6.4.0(Spark 3.1.2)时,我在尝试使用elasticsearch-spark-30_2.12-7.12.0.jar写入AWS Open Search(Elasticsearch 6.7)时看到以下错误:-

Failure Reason - An error occurred while calling o331.save.
: java.lang.NoClassDefFoundError: scala/Product$class
    at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:220)
    at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:105)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 40 more
Traceback (most recent call last):
  File "usr/copy_data_to_open_search.py", line 141, in process_log_data_to_es
    "es.resource", "%s-{%s}/%s" % (log_index_name, "timestamp", "log")
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1107, in save
    self._jwrite.save()
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o331.save.
: java.lang.NoClassDefFoundError: scala/Product$class
    at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:220)
    at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:105)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 40 more


看起来这个错误与执行Spark作业期间缺少Scala类有关,该作业写入Elasticsearch 6.7,即AWS Open Search。该错误表明无法找到类scala.Product$class。
我尝试使用最新的Spark Elastic Search Connector jar更新代码:

elasticsearch-spark-30_2.12-8.11.2.jar replacing elasticsearch-spark-30_2.12-7.12.0.jar


但是代码仍然失败,出现了上面提到的同样的错误。
我该如何解决这个问题?提前感谢!

pieyvz9o

pieyvz9o1#

首先确保你传递的是elasticsearch-spark jar:

PYSPARK_SUBMIT_ARGS --packages org.elasticsearch:elasticsearch-spark:8.11.2 pyspark-shell

字符串
(或)

spark = SparkSession.builder.appName("Load Data into AWS OpenSearch").config("jars", "/spark/jars/elasticsearch-spark-30_2.12-8.11.2.jar").getOrCreate()


这些页面对我帮助很大:

  1. https://discuss.elastic.co/t/issue-using-the-connector-from-pyspark-in-7-17-3/304084/2
  2. https://discuss.elastic.co/t/python-elasticsearch-and-apache-spark-simple-data-reading/307715

实际发行

误差(java.lang.NoClassDefFoundError:scala/Product$class)通常表示它试图使用为不兼容的Scala版本构建的包。您遇到的错误java.lang.NoClassDefFoundError:scala/Product$class,提示您使用的Spark版本和Elasticsearch Spark Connector版本之间存在兼容性问题。当Scala版本之间不匹配时,通常会发生此错误使用Spark和Elasticsearch Spark连接器。
以下是您可以采取的解决问题的几个步骤:
查看Scala版本:确保Spark和Elasticsearch Spark Connector之间的Scala版本兼容。Spark 3.x通常使用Scala 2.12。检查Spark发行版使用的Scala版本,并确保您使用的Elasticsearch Spark Connector版本使用相同的Scala版本编译。
更新Elasticsearch Spark Connector:您似乎尝试过更新Elasticsearch Spark Connector,但请确保您使用的版本与Spark 3.1.2和Scala 2.12兼容。您可能需要尝试与Spark版本匹配的最新版本的连接器。

我的解决方案

我将EMR发布版本从emr-6.4.0升级到emr-6.14.0,即Spark 3.4.1和Hadoop 3.3.3,它与elasticsearch-spark-30_2.12-8.11.2.jar兼容,即Scala 2.12,解决了这个问题-(java.lang.NoClassDefFoundError:scala/Product$class)
你可以在这里找到这些jar:https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30_2.12

相关问题