spark和kafka流媒体集成

hgb9j2n6  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(792)

我想和Kafka融合
我用的是Spark。3.0.0/Kafka2.12-2.6.0/spark-streaming-kafka-0-10Kafka2.12-2.4.0.jar
我用下面的电线开始了spark shell。

`./bin/spark-shell --jars spark-streaming-kafka-0-10_2.12-2.4.0.jar`

我用scala试了一下spark里的电线,如下所示

val ds = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "tp").load()
val counts = ds.groupBy("value").count()
val query = counts.writeStream.outputMode("complete").format("console")
query.start()

但我有如下错误

20/09/18 13:45:45 ERROR MicroBatchExecution: Query [id = a587f8d8-5c08- 44ab-8901-4882cf87b2a3, runId = 0191ebde-cadf-473e-9221-796c64f39a2c] 
 terminated with error
 java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$3(MicroBatchExecution.scala:102)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:81)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:322)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
 Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 30 more
 Exception in thread "stream execution thread for [id = a587f8d8-5c08-44ab-8901-4882cf87b2a3, runId = 0191ebde-cadf-473e-9221-796c64f39a2c]" java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$3(MicroBatchExecution.scala:102)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:81)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:322)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
 Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 30 more

所以我又像下面那样试了一遍

`val stream = spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "tp").option("startingOffsets", "earliest").load()`
stream.show()

但我又犯了一个错误

java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
 at org.apache.spark.sql.kafka010.KafkaRelation.buildScan(KafkaRelation.scala:64) 
 at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:313) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
 at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
 at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
 at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
 at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
 at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
 at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
 at scala.collection.Iterator.foreach(Iterator.scala:941)
 at scala.collection.Iterator.foreach$(Iterator.scala:941)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
 at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
 at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
 at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
 at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
 at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
 at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
 at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
 at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
 at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
 at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
 at scala.collection.Iterator.foreach(Iterator.scala:941)
 at scala.collection.Iterator.foreach$(Iterator.scala:941)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
 at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
 at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
 at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
 at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
 at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
 at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
 at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
 at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:330)
 at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:94)
 at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
 at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
 at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
 at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:94)
 at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:87)
 at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:107)
 at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
 at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
 at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
 at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:107)
 at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:100)
 at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:199)
 at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381)
 at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:199)
 at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207)
 at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:95)
 at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
 at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
 at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
 at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
 at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
 at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
 at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
 at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
 at org.apache.spark.sql.Dataset.show(Dataset.scala:824)
 at org.apache.spark.sql.Dataset.show(Dataset.scala:783)
 at org.apache.spark.sql.Dataset.show(Dataset.scala:792)
 ... 47 elided
 Caused by: java.lang.ClassNotFoundException: 
 org.apache.spark.kafka010.KafkaConfigUpdater
 at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
 ... 116 more

我成功Kafka流测试本身和Spark插座流与网猫太。。但我不知道怎么解决这些问题。。。你能告诉我解决办法吗?谢谢

ghg1uchk

ghg1uchk1#

spark.readStream.format 是结构化流媒体的一部分,包含的jar不是正确的。如果要将jar文件作为参数传递,则需要传入 spark-sql-kafka-0-10_2.12-3.0.0.jar 以及它的所有依赖关系。
简单的选择是使用如下所示的包格式,它还负责提取所有依赖项。 bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 它需要几秒钟来提取所有的依赖项和工作。

scala> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "tp").load()
df: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]

相关问题