hadoop—是否可以使用java将kafka使用者接收的输出写入文件

j8yoct9x  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(235)

我有一个用java编写的kafka生产者代码,可以编写kafka消息。以及接收这些消息的用户代码。
是否可以将消费者收到的消息写入java中的任何文本文件。

2izufjch

2izufjch1#

谢谢各位,
我能做到。一旦用户端接收到数据,就必须编写一个普通的java代码。
下面是ode中将消息打印到控制台的行。

System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));

您可以将所有消息存储到字符串中,并一次将所有消息打印到文件中。

System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
completMessage += new String(bytes, "UTF-8")+"\n";
``` `new String(bytes, "UTF-8")+"\n";` 包含实际消息。
最后将所有消息打印到文件中。

writeDataToFile(completMessage);
``` writeDataToFile 包含将字符串写入文件的简单java代码。
谢谢您。

bbuxkriu

bbuxkriu2#

这是可能的。下面是这个的工作代码。

package com.venk.prac;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import kafka.utils.ShutdownableThread;

public class FileConsumer extends ShutdownableThread {

    private final KafkaConsumer<Integer, String> kafkaConsumer;
    private String topic;
    private String filePath;
    private BufferedWriter buffWriter;

    public FileConsumer(String topic, String filePath) {

        super("FileConsumer", false);
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                KafkaProperties.KAFKA_BROKER_SERVERS_PORTS_STRING);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FileConsumer");
        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        kafkaConsumer = new KafkaConsumer<Integer, String>(properties);
        this.topic = topic;
        this.filePath = filePath;

        try {
            this.buffWriter = new BufferedWriter(new FileWriter(filePath));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public void doWork() {
        // TODO Auto-generated method stub
        kafkaConsumer.subscribe(Collections.singletonList(this.topic));
        ConsumerRecords<Integer, String> consumerRecords = kafkaConsumer.poll(1000);
        try {
            for (ConsumerRecord<Integer, String> record : consumerRecords) 
                buffWriter.write(record.value() + System.lineSeparator());
            buffWriter.flush();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public String name() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public boolean isInterruptible() {
        return false;
    }

}
lokaqttq

lokaqttq3#

如果您正在编写自己的使用者,则应该在同一应用程序中包含写入文件的逻辑。使用预先打包的console consumer,您可以通过管道将其传输到一个文件。例如: kafka-console-consumer > file.txt 另一个(无代码)选项是尝试streamsetsdatacollector,这是一个开放源码的apache许可工具,它也有一个拖放ui。它包括Kafka和各种数据格式的内置连接器。

  • 充分披露我是这个项目的提交人。

相关问题