我目前使用EMR发布版本[emr.5.23.0],即Spark 2.4.0和Spark Elastic Search Connector [elasticsearch-spark-30_2.12-7.12.0.jar]将数据写入AWS Open Search(Elasticsearch 6.7):
dataframe.coalesce(6).write.mode("append").format(
"org.elasticsearch.spark.sql"
).option(
"es.nodes.wan.only", "true"
).option(
"es.nodes", es_endpoint
).option(
"es.port", es_port
).option(
"es.resource", "%s-{%s}/%s" % (log_index_name, "timestamp", "log")
).save()
字符串
这曾经工作得很好。但是当我将EMR发布版本从emr.5.23.0(Spark 2.4.0)升级到emr-6.4.0(Spark 3.1.2)时,我在尝试使用elasticsearch-spark-30_2.12-7.12.0.jar写入AWS Open Search(Elasticsearch 6.7)时看到以下错误:-
Failure Reason - An error occurred while calling o331.save.
: java.lang.NoClassDefFoundError: scala/Product$class
at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:220)
at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:105)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 40 more
Traceback (most recent call last):
File "usr/copy_data_to_open_search.py", line 141, in process_log_data_to_es
"es.resource", "%s-{%s}/%s" % (log_index_name, "timestamp", "log")
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1107, in save
self._jwrite.save()
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o331.save.
: java.lang.NoClassDefFoundError: scala/Product$class
at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:220)
at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:105)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 40 more
型
看起来这个错误与执行Spark作业期间缺少Scala类有关,该作业写入Elasticsearch 6.7,即AWS Open Search。该错误表明无法找到类scala.Product$class。
我尝试使用最新的Spark Elastic Search Connector jar更新代码:
elasticsearch-spark-30_2.12-8.11.2.jar replacing elasticsearch-spark-30_2.12-7.12.0.jar
型
但是代码仍然失败,出现了上面提到的同样的错误。
我该如何解决这个问题?提前感谢!
1条答案
按热度按时间pieyvz9o1#
首先确保你传递的是elasticsearch-spark jar:
字符串
(或)
型
这些页面对我帮助很大:
实际发行:
误差(java.lang.NoClassDefFoundError:scala/Product$class)通常表示它试图使用为不兼容的Scala版本构建的包。您遇到的错误java.lang.NoClassDefFoundError:scala/Product$class,提示您使用的Spark版本和Elasticsearch Spark Connector版本之间存在兼容性问题。当Scala版本之间不匹配时,通常会发生此错误使用Spark和Elasticsearch Spark连接器。
以下是您可以采取的解决问题的几个步骤:
查看Scala版本:确保Spark和Elasticsearch Spark Connector之间的Scala版本兼容。Spark 3.x通常使用Scala 2.12。检查Spark发行版使用的Scala版本,并确保您使用的Elasticsearch Spark Connector版本使用相同的Scala版本编译。
更新Elasticsearch Spark Connector:您似乎尝试过更新Elasticsearch Spark Connector,但请确保您使用的版本与Spark 3.1.2和Scala 2.12兼容。您可能需要尝试与Spark版本匹配的最新版本的连接器。
我的解决方案:
我将EMR发布版本从emr-6.4.0升级到emr-6.14.0,即Spark 3.4.1和Hadoop 3.3.3,它与elasticsearch-spark-30_2.12-8.11.2.jar兼容,即Scala 2.12,解决了这个问题-(java.lang.NoClassDefFoundError:scala/Product$class)
你可以在这里找到这些jar:https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30_2.12