steaming从kafka读取并在java中应用spark-sql聚合

n53p2ov0  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(238)

我有一个spark工作,它从数据库读取数据并应用spark sql聚合。代码如下(仅省略conf选项):

SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    sqlContext = new SQLContext(sc);
    Dataset df = MongoSpark.read(sqlContext).options(readOptions).load();
    df.registerTempTable("data");
    df.cache();
    aggregators = sqlContext.sql(myQuery);

现在我想创建另一个作业,它通过spark流读取来自kafka的消息,然后通过spark sql应用相同的聚合。我的代码如下:

Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "192.168.99.100:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", KafkaStatisticsPayloadDeserializer.class);
    kafkaParams.put("group.id", "Group1");
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("enable.auto.commit", false);

    Collection<String> topics = Arrays.asList(topic);

    SparkConf conf = new SparkConf().setAppName(topic).setMaster("local");

   /*
    * Spark streaming context
    */
    JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(2));
    /*
     * Create an input DStream for Receiving data from socket
     */
    JavaInputDStream<ConsumerRecord<String, StatisticsRecord>> stream =
            KafkaUtils.createDirectStream(
                    streamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, StatisticsRecord>Subscribe(topics, kafkaParams)
            );

到目前为止,我已经阅读和反序列化的消息成功。因此,我的问题是如何在它们上实际应用sparksql聚合。我试过以下方法,但不起作用。我想我需要首先隔离包含实际消息的“value”字段。

SQLContext sqlContext = new SQLContext(streamingContext.sparkContext());
    stream.foreachRDD(rdd -> {
        Dataset<Row> df = sqlContext.createDataFrame(rdd.rdd(), StatisticsRecord.class);
        df.createOrReplaceTempView("data");
        df.cache();
        Dataset aggregators = sqlContext.sql(SQLContextAggregations.ORDER_TYPE_DB);
        aggregators.show();
    });
62o28rlo

62o28rlo1#

应该在应用于流的函数中调用上下文。

t5zmwmid

t5zmwmid2#

我用以下代码解决了这个问题。注意,我现在以json格式而不是实际对象存储消息。

SparkConf conf = new SparkConf().setAppName(topic).setMaster("local");
    JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(2));

    SparkSession spark = SparkSession.builder().appName(topic).getOrCreate();

    /*
     * Kafka conf
     */
    Map<String, Object> kafkaParams = new HashMap<>();

    kafkaParams.put("bootstrap.servers", dbUri);
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "Group4");
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("enable.auto.commit", false);

    Collection<String> topics = Arrays.asList("Statistics");

    /*
     * Create an input DStream for Receiving data from socket
     */
    JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    streamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
            );
    /*
     * Keep only the actual message in JSON format
     */
    JavaDStream<String> recordStream = stream.flatMap(record -> Arrays.asList(record.value()).iterator());
    /*
     * Extract RDDs from stream and apply aggregation in each one
     */
    recordStream.foreachRDD(rdd -> {
        if (rdd.count() > 0) {
            Dataset<Row> df = spark.read().json(rdd.rdd());
            df.createOrReplaceTempView("data");
            df.cache();

            Dataset aggregators = spark.sql(SQLContextAggregations.ORDER_TYPE_DB);
            aggregators.show();
        }
    });

相关问题