我正试图将Kafka的数据流传输到spark
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams, topics);
在这里,我迭代javapairinputdstream来处理rdd。
directKafkaStream.foreachRDD(rdd ->{
rdd.foreachPartition(items ->{
while (items.hasNext()) {
String[] State = items.next()._2.split("\\,");
System.out.println(State[2]+","+State[3]+","+State[4]+"--");
};
});
});
我可以在foreachrdd中获取数据,我的要求是必须全局访问状态数组。当我尝试全局访问状态数组时,我得到了一个异常
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
有什么建议吗?谢谢。
1条答案
按热度按时间slmsl1lt1#
这更像是将查找表与流式rdd连接起来,以获取具有匹配的“code”和“violationcode”字段的所有项。
水流应该是这样的。
创建hive lookup table=>lookuprdd的rdd
从kafka流创建数据流
对于dstream中的每个rdd,将lookuprdd与streamrdd连接起来,处理连接的项(计算金额总和…)并保存这个处理结果。
注:以下代码不完整。请填写所有待办事项注解。