我使用spark 2.1.0和kafka 0.10.2.1。
我编写了一个spark应用程序,从kafka主题中读取数据集。
代码如下:
package com.example;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class MLP {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("MLP")
.getOrCreate();
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092,localhost:9093")
.option("subscribe", "resultsTopic")
.load();
df.show();
spark.stop();
}
}
我的部署脚本如下:
spark-submit \
--verbose \
--jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ',') \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10 \
--class com.****\
--master (Spark Master URL) /path/to/jar
但是我得到了一个错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException:
kafka is not a valid Spark SQL Data Source.;
我已经尝试使用同一个应用程序和非jafka数据源,并且正确地创建了Dataframe。我也尝试过在客户机模式下使用yarn,但我得到了相同的错误。
2条答案
按热度按时间67up9zun1#
kafka作为非流Dataframe的数据源-数据集可从spark 2.2获得,本期spark jira上的参考
正如@jaceklaskowski提到的,将包更改为(将jacek的版本修改为使用2.2):
更重要的是,使用
readStream
读取数据流。你不能使用
show
使用流式数据源,而不是console
格式。查看此链接
ppcbkaq52#
首先,你应该
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10
(我对此表示怀疑)与以下内容有关:我不认为这个版本
2.10
曾经有过。你可能考虑过2.1.0
如果你使用2.1.0
(不是2.10
).第二,删除
--jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ',')
除了一些额外的jar,比如Kafka来源的jar。这样你就可以
kafka
源格式。