我能从Kafka主题的特定分区中读取吗

at0kjp5o  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(413)

我正在使用SpringKafka框架来实现kafka侦听器。现在我在读一个主题。但是,根据一个特定的需求,我被要求提供一种实现监听器的方法,该监听器将从kafka主题的特定分区中读取数据。
从不同的分区读取不同的应用程序是一种好的做法。您能帮助我了解如何通过SpringKafka和java实现这个功能吗。
提前谢谢。

xghobddn

xghobddn1#

这个 ContainerProperties 用于配置 KafkaMessageListenerContainer 具有以下特性:

public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) {

关于:


**

 * A configuration container to represent a topic name, partition number and, optionally,
 * an initial offset for it. The initial offset can be:
 * <ul>
 * <li>{@code null} - do nothing;</li>
 * <li>positive (including {@code 0}) - seek to EITHER the absolute offset within the
 * partition or an offset relative to the current position for this consumer, depending
 * on {@link #isRelativeToCurrent()}.
 * </li>
 * <li>negative - seek to EITHER the offset relative to the current last offset within
 * the partition: {@code consumer.seekToEnd() + initialOffset} OR the relative to the
 * current offset for this consumer (if any), depending on
 * {@link #isRelativeToCurrent()}.</li>
 * </ul>
 * Offsets are applied when the container is {@code start()}ed.
 *
 * @author Artem Bilan
 * @author Gary Russell
 */
public class TopicPartitionInitialOffset {

因此,通过这种方式,您可以指定从哪个主题开始使用哪个分区和哪个偏移量。
当我们使用基于分区的配置时,我们在 KafkaConsumer :

/**
 * Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment
 * and will replace the previous assignment (if there is one).
 * <p>
 * If the given list of topic partitions is empty, it is treated the same as {@link #unsubscribe()}.
 * <p>
 * Manual topic assignment through this method does not use the consumer's group management
 * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
 * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)}
 * and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}.
 * <p>
 * If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new
 * assignment replaces the old one.
 *
 * @param partitions The list of partitions to assign this consumer
 * @throws IllegalArgumentException If partitions is null or contains null or empty topics
 * @throws IllegalStateException If {@code subscribe()} is called previously with topics or pattern
 *                               (without a subsequent call to {@link #unsubscribe()})
 */
@Override
public void assign(Collection<TopicPartition> partitions) {

因此,这是直接从特定分区使用的标准实践。

相关问题