无法全局访问kafka spark流中的数据

czfnxgou  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(313)

我正试图将Kafka的数据流传输到spark

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
                String.class, 
                String.class, 
                StringDecoder.class, 
                StringDecoder.class, 
                kafkaParams, topics);

在这里,我迭代javapairinputdstream来处理rdd。

directKafkaStream.foreachRDD(rdd ->{
            rdd.foreachPartition(items ->{
                while (items.hasNext()) {
                    String[] State = items.next()._2.split("\\,");
                    System.out.println(State[2]+","+State[3]+","+State[4]+"--");
                };
            });        
        });

我可以在foreachrdd中获取数据,我的要求是必须全局访问状态数组。当我尝试全局访问状态数组时,我得到了一个异常

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0

有什么建议吗?谢谢。

slmsl1lt

slmsl1lt1#

这更像是将查找表与流式rdd连接起来,以获取具有匹配的“code”和“violationcode”字段的所有项。
水流应该是这样的。
创建hive lookup table=>lookuprdd的rdd
从kafka流创建数据流
对于dstream中的每个rdd,将lookuprdd与streamrdd连接起来,处理连接的项(计算金额总和…)并保存这个处理结果。
注:以下代码不完整。请填写所有待办事项注解。

JavaPairDStream<String, String> streamPair = directKafkaStream.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() {
        @Override
        public Tuple2<String, String> call(Tuple2<String, String> tuple2) throws Exception {
            System.out.println("Tuple2 Message is----------" + tuple2._2());
            String[] state = tuple2._2.split("\\,");
            return new Tuple2<>(state[4], tuple2._2()); //pair <ViolationCode, data>
        }
    });

    streamPair.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {
        JavaPairRDD<String, String> hivePairRdd = null;
        @Override
        public Void call(JavaPairRDD<String, String> stringStringJavaPairRDD) throws Exception {
            if (hivePairRdd == null) {
                hivePairRdd = initHiveRdd();
            }
            JavaPairRDD<String, Tuple2<String, String>> joinedRdd = stringStringJavaPairRDD.join(hivePairRdd);
            System.out.println(joinedRdd.take(10));
            //todo process joinedRdd here and save the results.
            joinedRdd.count(); //to trigger an action
            return null;
        }
    });
}

public static JavaPairRDD<String, String> initHiveRdd() {
    JavaRDD<String> hiveTableRDD = null; //todo code to create RDD from hive table
    JavaPairRDD<String, String> hivePairRdd = hiveTableRDD.mapToPair(new PairFunction<String, String, String>() {
        @Override
        public Tuple2<String, String> call(String row) throws Exception {
            String code = null; //TODO process 'row' and get 'code' field
            return new Tuple2<>(code, row);
        }
    });
    return hivePairRdd;
}

相关问题