java 8流和传入数据(kafka)

hc2pp10m  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(290)

我有一个队列(碰巧是Kafka,但我不确定这是否重要),我从中阅读消息。我想创建一个流来表示这个数据。
使用(kafka)队列的伪代码如下所示:

List<Message> messages = new ArrayList<>();

while (true) {
    ConsumerRecords<String, Message> records = kafkaConsumer.poll(100);

    messages.add(recordsToMessages(records));

    if (x) {
        break;
    }
}

return messages.stream();

使用此伪代码,在while循环中断之前(即,在读取所有队列之前),不会返回流。
我希望能够直接返回流,也就是说,在流返回后可以向流中添加新消息。
我觉得我需要使用stream.generate,但我不确定如何使用,或者我需要一个spliterator?
我还想在代码中的稍后点关闭流。
谢谢!

cvxl0en2

cvxl0en21#

下面是一个注解示例,说明了如何进行此操作:

public static void main(String[] args) {

    LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

    // Data producer
    Runnable job = () -> {
        // Send data to the stream (could come from your Kafka queue
        ThreadLocalRandom random = ThreadLocalRandom.current();
        for (int i = 0; i < 10; i++) {
            queue.offer(random.nextInt(100));
            delay(random.nextInt(2) + 1);
        }
        // Send the magic signal to stop the stream
        queue.offer(-1);
    };
    Thread thread = new Thread(job);
    thread.start();

    // Define the condition by which the stream knows there is no data left to consume
    // The function returns the next element wrapped in an Optional, or an empty Optional to tell there is no more data to read
    // In this example, the number -1 is the magic signal
    Function<BlockingQueue<Integer>, Optional<Integer>> endingCondition = q -> {
        try {
            Integer element = q.take();
            return element == -1 ? Optional.empty() : Optional.of(element);
        } catch (InterruptedException e) {
            return Optional.empty();
        }
    };
    QueueConsumingIterator<Integer> iterator = new QueueConsumingIterator<>(queue, endingCondition);

    // Construct a Stream on top of our custom queue-consuming Iterator
    Spliterator<Object> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
    Stream<Object> stream = StreamSupport.stream(spliterator, false);

    // Use the Stream as usual :)
    stream.map(String::valueOf).forEach(System.out::println);

}

.

// This is a custom Iterator that takes data from a BlockingQueue.
// Detection of the end of the data stream is use-case-dependant, so it is extracted as a user-provided Function<Queue, Optional>
// For example you may want to wait for a particular item in the queue, or consider the queue "dead"" after a certain timeout...
public static class QueueConsumingIterator<E> implements Iterator<E> {

    private final BlockingQueue<E> queue;
    private final Function<BlockingQueue<E>, Optional<E>> queueReader;
    private Optional<E> element;
    private boolean elementRead = false;

    public QueueConsumingIterator(BlockingQueue<E> queue, Function<BlockingQueue<E>, Optional<E>> queueReader) {
        this.queue = queue;
        this.queueReader = queueReader;
    }

    @Override
    public boolean hasNext() {
        if (!this.elementRead) {
            this.element = this.queueReader.apply(this.queue);
            this.elementRead = true;
        }
        return this.element.isPresent();
    }

    @Override
    public E next() {
        if (hasNext()) {
            this.elementRead = false;
            return this.element.get();
        }
        throw new NoSuchElementException();
    }

}

private static void delay(int timeout) {
    try {
        TimeUnit.SECONDS.sleep(timeout);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

这个代码背后的想法是,你可以养活一个 Stream 通过习俗 Iterator ,本身从外部源提取数据。
数据从外部源传输到 Iterator 通过 Queue . 因为只有你知道你的数据是什么样子的,以及如何检测到已经没有剩余的数据可以读取了,所以决定流是否应该继续被输入的过程被提取到一个用户提供的函数中。
希望有帮助?

相关问题