我在k8s spark operator中运行spark流应用程序。
spark版本3.0.0。从kafka-spark-sql-kafka-0-10读取数据。我没有任何bigquery依赖项,但在日志中:
Exception in thread "main" java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider com.google.cloud.spark.bigquery.BigQueryRelationProvider could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)
at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:644)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:194)
at ru.neoflex.okb.population.reporting.subject.program.realation.utils.KafkaSourceConfigurable.kafkaSource(KafkaSourceConfigurable.scala:13)
at ru.neoflex.okb.population.reporting.subject.program.realation.SubjectToProgramRelationLoader.run(SubjectToProgramRelationLoader.scala:20)
at ru.neoflex.okb.population.reporting.subject.program.realation.SparkApp$.main(SparkApp.scala:5)
at ru.neoflex.okb.population.reporting.subject.program.realation.SparkApp.main(SparkApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class
仅在jar的datasourceregister中:
org.apache.spark.sql.delta.sources.DeltaDataSource
org.apache.spark.sql.v2.avro.AvroDataSourceV2
org.apache.spark.sql.kafka010.KafkaSourceProvider
在本地运行-所有的ik都可以,但在kubernetes这个问题。
./spark-submit
--name spark-test
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
--class ru...SparkApp
--conf spark.executor.instances=3
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
--conf spark.kubernetes.namespace=spark3
..
--conf spark.master=k8s://https://master.infra:8443
local:///opt/spark/reporting-loader.jar
我的工作依赖
val core = "org.apache.spark" %% "spark-core" % Version.spark % Provided
val sql = "org.apache.spark" %% "spark-sql" % Version.spark % Provided
val streaming = "org.apache.spark" %% "spark-streaming" % Version.spark % Provided
val sqlKafka = "org.apache.spark" %% "spark-sql-kafka-0-10" % Version.spark
val avro = "org.apache.spark" %% "spark-avro" % Version.spark
val productModel = "ru.....schema" %% "trigger" % Version.productModels
val deltaLake = "io.delta" %% "delta-core" % Version.deltaLake
val pureConfig = "com.github.pureconfig" %% "pureconfig" % Version.pureConfig
val scalaTest = "org.scalatest" %% "scalatest" % Version.scalaTest % Test
程序集设置
val assemblySettings = Seq(
assembly / assemblyOption := (assemblyOption in assembly).value.copy(includeScala = false),
assembly / assemblyMergeStrategy := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
)
1条答案
按热度按时间rseugnpd1#
可以在本地模型上运行,但是在集群上运行时遇到错误,错误日志显示“java.lang.noclassdeffounderror:org/apache/spark/internal/logging$class”。
显然,spark cluster缺少jar依赖项“spark-core_2.11-2.4.3.jar”。尝试将reporting-loader.jar的所有maven依赖项上载到集群。