如何使用.net中的confluent.kafka从特定主题使用PartitionOffset

wwodge7n  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(331)

我需要我的消费者从特定的 TopicPartitionOffset(here from offset 278) . 假设消息是由某个特定主题的生产者生成的,比如 ="Test_1" 以前。这是我的密码

using System;
using Confluent.Kafka;

public class ConsumingTest
{
    public static void Main(string[] args)
    {
        var consumerConfig = new ConsumerConfig
                                 {
                                     BootstrapServers = "localhost:9092", EnableAutoCommit = false, GroupId = "this-group-id"
                                 };

        using (var consumer = new Consumer<Null, string>(consumerConfig))
        {
            Console.WriteLine("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Consume Started...");
            consumer.Subscribe("Test_1");

            var topicPartitionOffset = new TopicPartitionOffset("Test_1", new Partition(0), new Offset(278));

            consumer.Assign(topicPartitionOffset);
            consumer.Seek(topicPartitionOffset);

            while (true)
                try
                {
                    var cr = consumer.Consume();

                    Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine(e.Message);
                }
        }
    }
}

在第----> var cr = consumer.Consume(); 消费者消费,但什么也没发生。有什么问题。
我已经做完了 AutoOffsetReset = AutoOffsetResetType.Earliest 在consumerconfig中,consumer使用来自所有偏移量的所有消息,但这不是我要查找的。

nxagd54h

nxagd54h1#

解决方案:我找到了如下解决方案:
添加了这个 consumer.Assign(new TopicPartitionOffset(topicName, 0, new Offset(lastConsumedOffset))) 在尝试消费之前
删除这些 consumer.Subscribe("Test_1") 以及 consumer.Seek(...) 所以更新后的代码是这样的,非常有效:

using (var consumer = new Consumer<Ignore, string>(config))
            {
                consumer.Assign(topicName, 0, new Offset(lastConsumedOffset));
                while (true)
                {
                    try
                    {
                        var consumeResult = consumer.Consume();
                        Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: ${consumeResult.Value}");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Consume error: {e.Error}");
                    }
                }

                consumer.Close();
            }

相关问题