如何发送数据使用foreachrdd使用java

js5cn81o  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(227)

我有rdd的数据,我在javadstream转换,现在我想把它发送到Kafka主题,我不想Kafka发送代码,只是我需要foreachrdd实现,我的代码是看起来像

public void publishtoKafka(ITblStream t)
    {
        MyTopicProducer MTP = ProducerFactory.createProducer(hostname+":"+port);
        JavaDStream<?> rdd = (JavaDStream<?>) t.getRDD();

        rdd.foreachRDD(new Function<String, String>() {
            @Override
            public Void call(JavaRDD<String> rdd) throws Exception {
             KafkaUtils.sendDataAsString(MTP,topicName, "String RDDData");
            return null;
            }
          });
        log.debug("------------------------sent to kafka: ------------------");

    }

这里mytopicproducer将创建producer,该producer工作正常kafkautils.senddataasstring是将数据发布到kafka的方法topic也工作正常,
我只有一个问题我不能用foreach或foreachrdd将javadstream rdd转换成字符串最后我需要rdds的字符串消息,请建议只使用java代码,我不想使用匿名类,
提前谢谢,

djmepvbi

djmepvbi1#

我没有测试,但应该可以正常工作:

public Void call(JavaRDD<String> rdd) throws Exception {
    for (rddData : rdd.collect()) {
        KafkaUtils.sendDataAsString(MTP,topicName, rddData);
    }
    return null;
}

关键是你必须 collect 您的rdd(这里是javadoc)从rdd获取字符串数据的实际集合。

cld4siwp

cld4siwp2#

当我使用

rdd.foreachRDD(new Function<JavaRDD<String>, Void>() { 
                @Override 
                public Void call(JavaRDD<String> rdd) throws Exception { 
                if(rdd!=null) 
            {   
            List<String> result = rdd.collect(); 
            KafkaUtil.sendString(p,topic,result.get(0)); 
 KafkaUtils.sendDataAsString(MTP,topicName, result.get(0)); 

            }

相关问题