使用selectexpr读取spark 2.1.1中的kafka时出现异常

w8rqjzmb  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(406)

我正在运行spark提供的默认示例来计算来自kafka流的单词。
下面是我正在运行的代码:

import scala.Tuple2;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
import java.util.Map;
import java.util.HashMap;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class JavaWordCount {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession
                .builder()
                .appName("JavaWordCount")
                .getOrCreate();
        Dataset<Row> lines = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "TutorialTopic")
                .option("startingOffsets", "latest")
                .load();
        lines.selectExpr("CAST key AS STRING", "CAST value AS STRING");
        Dataset<String> words = lines
                .as(Encoders.STRING())
                .flatMap(
                        new FlatMapFunction<String, String>() {
                            @Override
                            public Iterator<String> call(String x) {
                                return Arrays.asList(x.split(" ")).iterator();
                            }
                        }, Encoders.STRING());
        Dataset<Row> wordCounts = words.groupBy("value").count();
        StreamingQuery query = wordCounts.writeStream()
                .outputMode("complete")
                .format("console")
                .start();

        query.awaitTermination();
    }
}

在pom.xml文件中,我添加了以下依赖项:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
   <version>2.1.1</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

我使用以下命令将代码提交给spark:

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.2 --class "JavaWordCount" --master local[4] target/spark-sql-kafka-0-10_2.11-1.0.jar

运行时出现以下异常:

lines.selectExpr("CAST key AS STRING","CAST value AS STRING");

例外情况:

Try to map struct<key:binary,value:binary,topic:string,partition:int,offset:bigint,timestamp:timestamp,timestampType:int> to Tuple1, but failed as the number of fields does not line up.

请帮我解决这个例外。谢谢您!

iugsix8n

iugsix8n1#

问题就在这一行 lines.as(Encoders.STRING()) .
你可以改变

lines.selectExpr("CAST key AS STRING", "CAST value AS STRING");
    Dataset<String> words = lines
            .as(Encoders.STRING())

Dataset<String> words = lines.selectExpr("CAST value AS STRING")
            .as(Encoders.STRING())

您需要使用 lines.selectExpr . 这种方法不会改变 lines 它自己。既然你用的是 .as(Encoders.STRING()) ,我想你只需要 value .

相关问题