Kafka消费者在推特流媒体中的否决错误

sbtkgmzw  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(257)

我一直在研究Kafka的推特流媒体数据。
下面是我的示例链接:http://www.hahaskills.com/tutorials/kafka/twitter_doc.html
我能够使用生产者代码,它是工作良好。能够得到推特饲料和发送给Kafka制片人。
我不能使用消费代码,因为它已经抛出了许多API的弃用错误。
以下是消费者代码:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
//import kafka.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
//import kafka.consumer.KafkaStream;
//import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

//import org.apache.kafka.clients.producer.KafkaProducer;

public class KafkaConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public KafkaConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");

        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

        this.topic = topic;
    }

    public void testConsumer() {

     System.out.println("Test Con called");

        Map<String, Integer> topicCount = new HashMap<>();

        topicCount.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

        System.out.println("For");

        for (final KafkaStream stream : streams) {

            ConsumerIterator<byte[], byte[]> it = stream.iterator();

            System.out.println("Size"+it.length());

            while (it.hasNext()) {
                System.out.println("Stream");
                System.out.println("Message from Single Topic: " + new String(it.next().message()));
            }
        }

        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {

     System.out.println("Started");
     String topic="twittertopic";
     KafkaConsumer simpleTWConsumer = new KafkaConsumer("localhost:XXXX", "testgroup", topic);
     simpleTWConsumer.testConsumer();
     System.out.println("End");
    }    
}

它抛出错误:consumerconnector、consumeriterator、kafkastream已弃用。
consumerconfig不可见。
这个示例代码有固定版本吗(kafka consumer for twitter)?

koaltpgm

koaltpgm1#

您所遵循的教程非常旧,它使用的是已弃用的旧scala-kafka客户机,请参阅http://kafka.apache.org/documentation/#legacyapis
已弃用的类包括: kafka.consumer.* 以及 kafka.javaapi.consumer 而是在下面使用更新的java消费者
org.apache.kafka.clients.consumer.* kafka.producer.* 以及 kafka.javaapi.producer 而是在下面使用更新的java producer org.apache.kafka.clients.producer.* 除了使用不推荐使用的类之外,您的代码基本上是正确的,我只需要修复一些导入。请参阅下面的固定版本。使用它,我可以将我正在生成的消息消费到一个名为 twittertopic .

package example;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MyConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public MyConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = topic;
    }

    public void testConsumer() {

        Map<String, Integer> topicCount = new HashMap<>();
        topicCount.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                System.out.println("Message from Single Topic: " + new String(it.next().message()));
            }
        }

        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {
        System.out.println("Started");
        String topic = "twittertopic";
        MyConsumer simpleTWConsumer = new MyConsumer("localhost:2181", "testgroup", topic);
        simpleTWConsumer.testConsumer();
        System.out.println("End");
    }
}

虽然可以使用上面的代码,但下一个主要的kafka版本可能会删除当前不推荐使用的类,因此不应该使用这些类编写新的逻辑。
相反,您应该开始使用java客户机,您可以使用github上提供的示例:https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples
使用新的java使用者,您的逻辑如下所示:

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyConsumer {

    static final String TOPIC = "twittertopic";
    static final String GROUP = "testgroup";

    public static void main(String[] args) {
        System.out.println("Started");

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", GROUP);
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);) {
            consumer.subscribe(Arrays.asList(TOPIC));

            for (int i = 0; i < 1000; i++) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));
                System.out.println("Size: " + records.count());
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received a message: " + record.key() + " " + record.value());
                }
            }
        }
        System.out.println("End");
    }

}

相关问题