pyspark-structuredstreamingintoElasticSearch

noj0wjuj  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(307)

我正在编写一个代码,在这个代码中,我试图使用pyspark的结构化流将数据流传输到ElasticSearch中。
spark版本:3.0.0安装模式:pip

query = inpJoinDF.writeStream \
.outputMode("append") \
.queryName("writing_to_es") \
.format("org.elasticsearch.spark.sql") \
.option("checkpointLocation", "es_checkpoint/") \
.option("es.resource", "spark_test/doc") \
.option("es.nodes", "localhost") \
.start()

还尝试了添加包和格式

pyspark --packages org.elasticsearch:elasticsearch-hadoop:7.7.1
format("es")

以下是错误行:
第328行,在get\ return\ value py4j.protocol.py4jjavaerror中:调用o57.start时出错:java.lang.classnotfoundexception:找不到数据源:org.elasticsearch.spark.sql。请在以下地址查找包裹http://spark.apache.org/third-party-projects.html 在org.apache.spark.sql.execution.datasources.datasource$.lookupdateasource(datasource。scala:674)位于org.apache.spark.sql.streaming.datastreamwriter.start(datastreamwriter。scala:342)位于sun.reflect.nativemethodaccessorimpl.invoke0(本机方法)sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl。java:62)在sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl。java:43)在java.lang.reflect.method.invoke(方法。java:498)在py4j.reflection.methodinvoker.invoke(methodinvoker。java:244)在py4j.reflection.reflectionengine.invoke(reflectionengine。java:357)在py4j.gateway.invoke(gateway。java:282)在py4j.commands.abstractcommand.invokemethod(abstractcommand。java:132)在py4j.commands.callcommand.execute(callcommand。java:79)在py4j.gatewayconnection.run(网关连接。java:238)在java.lang.thread.run(线程。java:748)原因:java.lang.classnotfoundexception:org.elasticsearch.spark.sql.defaultsource位于java.net.urlclassloader.findclass(urlclassloader)。java:382)在java.lang.classloader.loadclass(classloader。java:424)在java.lang.classloader.loadclass(classloader。java:357)在org.apache.spark.sql.execution.datasources.datasource$.$anonfun$lookupdateasource$5(数据源。scala:648)在scala.util.try$.apply(try。scala:213)位于org.apache.spark.sql.execution.datasources.datasource$.$anonfun$lookupdateasource$4(数据源)。scala:648)在scala.util.failure.orelse(try。scala:224)在org.apache.spark.sql.execution.datasources.datasource$.lookupdateasource(数据源。scala:648) ... 12个以上
谢谢?感谢你的帮助!!

t9eec4r0

t9eec4r01#

非常感谢,我使用的是基于scala 2.12构建的spark 3,不幸的是,elasticsearch hadoop jar在scala 2.11版本之前都是受支持的。我已经把我的spark版本降到了2.4.6,它是基于Scala2.11构建的。

相关问题