为什么spark submit失败并显示“analysisexception:kafka不是有效的spark sql数据源”?

svmlkihl  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(315)

我使用spark 2.1.0和kafka 0.10.2.1。
我编写了一个spark应用程序,从kafka主题中读取数据集。
代码如下:

package com.example;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class MLP {

    public static void main(String[] args) {
        SparkSession spark = SparkSession
            .builder()
            .appName("MLP")
            .getOrCreate();

        Dataset<Row> df = spark
            .read()
            .format("kafka")
            .option("kafka.bootstrap.servers","localhost:9092,localhost:9093")
            .option("subscribe", "resultsTopic")
            .load();
        df.show();
        spark.stop();
    }
}

我的部署脚本如下:

spark-submit \
  --verbose \
  --jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ',') \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10 \
  --class com.****\
  --master (Spark Master URL) /path/to/jar

但是我得到了一个错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException:
kafka is not a valid Spark SQL Data Source.;

我已经尝试使用同一个应用程序和非jafka数据源,并且正确地创建了Dataframe。我也尝试过在客户机模式下使用yarn,但我得到了相同的错误。

67up9zun

67up9zun1#

kafka作为非流Dataframe的数据源-数据集可从spark 2.2获得,本期spark jira上的参考
正如@jaceklaskowski提到的,将包更改为(将jacek的版本修改为使用2.2):

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

更重要的是,使用 readStream 读取数据流。
你不能使用 show 使用流式数据源,而不是 console 格式。

StreamingQuery query = df.writeStream()
  .outputMode("append")
  .format("console")
  .start();

query.awaitTermination();

查看此链接

ppcbkaq5

ppcbkaq52#

首先,你应该 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10 (我对此表示怀疑)与以下内容有关:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1

我不认为这个版本 2.10 曾经有过。你可能考虑过 2.1.0 如果你使用 2.1.0 (不是 2.10 ).
第二,删除 --jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ',') 除了一些额外的jar,比如Kafka来源的jar。
这样你就可以 kafka 源格式。

相关问题