kafka通过时间戳、消费者循环获取记录

3b6akqbq  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(333)

我使用的是kafka 0.10.2.1集群。我正在使用kafka的offsetfortimes api来寻找特定的偏移量,并希望在到达结束时间戳时中断循环。
我的代码如下:

//package kafka.ex.test;

import java.util.*;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

public class ConsumerGroup {

   public static OffsetAndTimestamp fetchOffsetByTime( KafkaConsumer<Long, String> consumer , TopicPartition partition , long startTime){

      Map<TopicPartition, Long> query = new HashMap<>();
      query.put(
              partition,
              startTime);

      final Map<TopicPartition, OffsetAndTimestamp> offsetResult = consumer.offsetsForTimes(query);
      if( offsetResult == null || offsetResult.isEmpty() ) {
         System.out.println(" No Offset to Fetch ");
         System.out.println(" Offset Size "+offsetResult.size());

         return null;
      }
      final OffsetAndTimestamp offsetTimestamp = offsetResult.get(partition);
      if(offsetTimestamp == null ){
         System.out.println("No Offset Found for partition : "+partition.partition());
      }
      return offsetTimestamp;
   }

   public static KafkaConsumer<Long, String>  assignOffsetToConsumer( KafkaConsumer<Long, String> consumer, String topic , long startTime ){
      final List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
      System.out.println("Number of Partitions : "+partitionInfoList.size());
      final List<TopicPartition> topicPartitions = new ArrayList<>();
      for (PartitionInfo pInfo : partitionInfoList) {
         TopicPartition partition = new TopicPartition(topic, pInfo.partition());
         topicPartitions.add(partition);
      }
      consumer.assign(topicPartitions);
      for(TopicPartition partition : topicPartitions ){
         OffsetAndTimestamp offSetTs = fetchOffsetByTime(consumer, partition, startTime);

         if( offSetTs == null ){
            System.out.println("No Offset Found for partition : " + partition.partition());
            consumer.seekToEnd(Arrays.asList(partition));
         }else {
            System.out.println(" Offset Found for partition : " +offSetTs.offset()+" " +partition.partition());
            System.out.println("FETCH offset success"+
                    " Offset " + offSetTs.offset() +
                    " offSetTs " + offSetTs);
            consumer.seek(partition, offSetTs.offset());
         }
      }
      return consumer;
   }

   public static void main(String[] args) throws Exception {

      String topic = args[0].toString();
      String group = args[1].toString();

      long start_time_Stamp =  Long.parseLong( args[3].toString());
      String bootstrapServers = args[2].toString();
      long end_time_Stamp =  Long.parseLong( args[4].toString());
      Properties props = new Properties();
      boolean reachedEnd = false;

      props.put("bootstrap.servers", bootstrapServers);
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",
         "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer",
         "org.apache.kafka.common.serialization.StringDeserializer");

      KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(props);
      assignOffsetToConsumer(consumer, topic, start_time_Stamp);

      System.out.println("Subscribed to topic " + topic);
      int i = 0;

      int arr[] = {0,0,0,0,0};
      while (true) {
         ConsumerRecords<Long, String> records = consumer.poll(6000);
         int count= 0;
         long lasttimestamp = 0;
         long lastOffset = 0;
            for (ConsumerRecord<Long, String> record : records) {

               count++;

               if(arr[record.partition()] == 0){
                  arr[record.partition()] =1;
               }

               if (record.timestamp() >= end_time_Stamp) {
                  reachedEnd = true;
                  break;
               }

               System.out.println("record=>"+" offset="
                       +record.offset()
                       + " timestamp="+record.timestamp()
                        + " :"+record);
               System.out.println("recordcount = "+count+" bitmap"+Arrays.toString(arr));

            }

         if (reachedEnd) break;
         if (records == null || records.isEmpty()) break; // dont wait for records
      }

   }

}

我面临以下问题:
consumer.poll甚至失败1000毫秒。如果我使用1000毫秒,我必须在循环中轮询几次。我现在有一个非常大的价值。但是,既然已经找到了分区内的相关偏移量,那么如何可靠地设置轮询超时以便立即返回数据呢?
我的观察结果是,当返回数据时,并不总是来自所有分区。即使从所有分区返回数据,也不会返回所有记录。主题中的记录数超过1000条。但实际在循环中获取和打印的记录数量较少(约200条)。我的KafkaAPI的当前使用有什么问题吗?
如何可靠地跳出已经获得了开始和结束时间戳之间的所有数据并且不过早地跳出循环?

xmakbtuz

xmakbtuz1#

每次轮询获取的记录数取决于使用者配置
当其中一个分区到达endtimestamp时,您正在中断循环,这不是您想要的。在退出轮询循环之前,您应该检查是否所有分区都被查找为结束
poll调用是一个异步调用,fetch请求和响应都是针对每个节点的,因此根据代理响应时间的不同,您可以在一个poll中获取所有响应,也可以不获取所有响应

相关问题