我有rdd的数据,我在javadstream转换,现在我想把它发送到Kafka主题,我不想Kafka发送代码,只是我需要foreachrdd实现,我的代码是看起来像
public void publishtoKafka(ITblStream t)
{
MyTopicProducer MTP = ProducerFactory.createProducer(hostname+":"+port);
JavaDStream<?> rdd = (JavaDStream<?>) t.getRDD();
rdd.foreachRDD(new Function<String, String>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
KafkaUtils.sendDataAsString(MTP,topicName, "String RDDData");
return null;
}
});
log.debug("------------------------sent to kafka: ------------------");
}
这里mytopicproducer将创建producer,该producer工作正常kafkautils.senddataasstring是将数据发布到kafka的方法topic也工作正常,
我只有一个问题我不能用foreach或foreachrdd将javadstream rdd转换成字符串最后我需要rdds的字符串消息,请建议只使用java代码,我不想使用匿名类,
提前谢谢,
2条答案
按热度按时间djmepvbi1#
我没有测试,但应该可以正常工作:
关键是你必须
collect
您的rdd(这里是javadoc)从rdd获取字符串数据的实际集合。cld4siwp2#
当我使用