org.apache.kafka.clients.consumer.Consumer.assignment()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(9.5k)|赞(0)|评价(0)|浏览(135)

本文整理了Java中org.apache.kafka.clients.consumer.Consumer.assignment()方法的一些代码示例,展示了Consumer.assignment()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.assignment()方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:assignment

Consumer.assignment介绍

暂无

代码示例

代码示例来源:origin: openzipkin/brave

@Override public Set<TopicPartition> assignment() {
 return delegate.assignment();
}

代码示例来源:origin: apache/storm

private void throwIfEmittingForUnassignedPartition(TopicPartition currBatchTp) {
  final Set<TopicPartition> assignments = consumer.assignment();
  if (!assignments.contains(currBatchTp)) {
    throw new IllegalStateException("The spout is asked to emit tuples on a partition it is not assigned."
      + " This indicates a bug in the TopicFilter or ManualPartitioner implementations."
      + " The current partition is [" + currBatchTp + "], the assigned partitions are [" + assignments + "].");
  }
}

代码示例来源:origin: apache/storm

private Collection<TopicPartition> pauseTopicPartitions(TopicPartition excludedTp) {
  final Set<TopicPartition> pausedTopicPartitions = new HashSet<>(consumer.assignment());
  LOG.debug("Currently assigned topic-partitions {}", pausedTopicPartitions);
  pausedTopicPartitions.remove(excludedTp);
  consumer.pause(pausedTopicPartitions);
  LOG.debug("Paused topic-partitions {}", pausedTopicPartitions);
  return pausedTopicPartitions;
}

代码示例来源:origin: linkedin/cruise-control

/**
 * The check if the consumption is done or not. The consumption is done if the consumer has caught up with the
 * log end or all the partitions are paused.
 * @param endOffsets the log end for each partition.
 * @return true if the consumption is done, false otherwise.
 */
private boolean consumptionDone(Map<TopicPartition, Long> endOffsets) {
 Set<TopicPartition> partitionsNotPaused = new HashSet<>(_metricConsumer.assignment());
 partitionsNotPaused.removeAll(_metricConsumer.paused());
 for (TopicPartition tp : partitionsNotPaused) {
  if (_metricConsumer.position(tp) < endOffsets.get(tp)) {
   return false;
  }
 }
 return true;
}

代码示例来源:origin: apache/storm

/**
 * Assign partitions to the KafkaConsumer.
 * @param <K> The consumer key type
 * @param <V> The consumer value type
 * @param consumer The Kafka consumer to assign partitions to
 * @param newAssignment The partitions to assign.
 * @param listener The rebalance listener to call back on when the assignment changes
 */
public <K, V> void assignPartitions(Consumer<K, V> consumer, Set<TopicPartition> newAssignment,
  ConsumerRebalanceListener listener) {
  Set<TopicPartition> currentAssignment = consumer.assignment();
  if (!newAssignment.equals(currentAssignment)) {
    listener.onPartitionsRevoked(currentAssignment);
    consumer.assign(newAssignment);
    listener.onPartitionsAssigned(newAssignment);
  }
}

代码示例来源:origin: apache/storm

Set<TopicPartition> assignment = consumer.assignment();
if (!isAtLeastOnceProcessing()) {
  return new PollablePartitionsInfo(assignment, Collections.emptyMap());

代码示例来源:origin: apache/storm

private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
  doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
  Set<TopicPartition> pausedPartitions = new HashSet<>(consumer.assignment());
  pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains);
  try {
    consumer.pause(pausedPartitions);
    final ConsumerRecords<K, V> consumerRecords = consumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
    ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
    final int numPolledRecords = consumerRecords.count();
    LOG.debug("Polled [{}] records from Kafka",
      numPolledRecords);
    if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
      //Commit polled records immediately to ensure delivery is at-most-once.
      Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
        createFetchedOffsetsMetadata(consumer.assignment());
      consumer.commitSync(offsetsToCommit);
      LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
    }
    return consumerRecords;
  } finally {
    consumer.resume(pausedPartitions);
  }
}

代码示例来源:origin: apache/nifi

/**
 * Execute poll using pause API just for sending heartbeat, not polling messages.
 */
void retainConnection() {
  pollingLock.lock();
  TopicPartition[] assignments = null;
  try {
    final Set<TopicPartition> assignmentSet = kafkaConsumer.assignment();
    if (assignmentSet.isEmpty()) {
      return;
    }
    if (logger.isDebugEnabled()) {
      logger.debug("Pausing " + assignmentSet);
    }
    assignments = assignmentSet.toArray(new TopicPartition[assignmentSet.size()]);
    kafkaConsumer.pause(assignments);
    kafkaConsumer.poll(0);
    if (logger.isDebugEnabled()) {
      logger.debug("Resuming " + assignments);
    }
  } finally {
    try {
      if (assignments != null) {
        kafkaConsumer.resume(assignments);
      }
    } finally {
      pollingLock.unlock();
    }
  }
}

代码示例来源:origin: linkedin/cruise-control

while (_metricConsumer.assignment().isEmpty()) {
 pollerCount++;
 _metricConsumer.poll(10);
for (TopicPartition tp : _metricConsumer.assignment()) {
 timestampToSeek.put(tp, startTimeMs);
Set<TopicPartition> assignment = new HashSet<>(_metricConsumer.assignment());
Map<TopicPartition, Long> endOffsets = _metricConsumer.endOffsets(assignment);
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = _metricConsumer.offsetsForTimes(timestampToSeek);
 LOG.debug("Starting consuming from metrics reporter topic partitions {}.", _metricConsumer.assignment());
     _metricConsumer.assignment(), startTimeMs, endTimeMs, totalMetricsAdded);

代码示例来源:origin: apache/storm

@Override
public void nextTuple() {
  try {
    if (refreshAssignmentTimer.isExpiredResetOnTrue()) {
      refreshAssignment();
    }
    if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
      if (isAtLeastOnceProcessing()) {
        commitOffsetsForAckedTuples();
      } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
          createFetchedOffsetsMetadata(consumer.assignment());
        consumer.commitAsync(offsetsToCommit, null);
        LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
      }
    }
    PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
    if (pollablePartitionsInfo.shouldPoll()) {
      try {
        setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo));
      } catch (RetriableException e) {
        LOG.error("Failed to poll from kafka.", e);
      }
    }
    emitIfWaitingNotEmitted();
  } catch (InterruptException e) {
    throwKafkaConsumerInterruptedException();
  }
}

代码示例来源:origin: apache/metron

@Override
public String getSampleMessage(final String topic) {
 String message = null;
 if (listTopics().contains(topic)) {
  try (Consumer<String, String> kafkaConsumer = kafkaConsumerFactory.createConsumer()) {
   kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
    .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
    .collect(Collectors.toList()));
   kafkaConsumer.assignment().stream()
    .filter(p -> (kafkaConsumer.position(p) - 1) >= 0)
    .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
   final ConsumerRecords<String, String> records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT);
   message = records.isEmpty() ? null : records.iterator().next().value();
   kafkaConsumer.unsubscribe();
  }
 }
 return message;
}

代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client

@Override
public Set<TopicPartition> assignment() {
 return consumer.assignment();
}

代码示例来源:origin: io.zipkin.brave/brave-instrumentation-kafka-clients

@Override public Set<TopicPartition> assignment() {
 return delegate.assignment();
}

代码示例来源:origin: rayokota/kafka-graphs

@Override
public Set<TopicPartition> assignment() {
  return kafkaConsumer.assignment();
}

代码示例来源:origin: spring-projects/spring-kafka

return null;
}).given(consumer).commitSync(any(Map.class));
given(consumer.assignment()).willReturn(records.keySet());
final CountDownLatch pauseLatch = new CountDownLatch(2);
willAnswer(i -> {

代码示例来源:origin: spring-projects/spring-kafka

return null;
}).given(consumer).commitSync(any(Map.class));
given(consumer.assignment()).willReturn(records1.keySet());
TopicPartitionInitialOffset[] topicPartitionOffset = new TopicPartitionInitialOffset[] {
    new TopicPartitionInitialOffset("foo", 0) };

代码示例来源:origin: com.cerner.common.kafka/common-kafka

private Set<TopicPartition> getAssignedPartitions() {
  Set<TopicPartition> assignedPartitions = consumer.assignment();
  if (assignedPartitions.isEmpty()) {
    // Polling with an immediate timeout will initialize the assignments for a fresh consumer.
    pollRecords(0L);
    assignedPartitions = consumer.assignment();
  }
  return assignedPartitions;
}

代码示例来源:origin: rayokota/kafka-graphs

private static Set<TopicPartition> localPartitions(Consumer<byte[], byte[]> consumer, String topic) {
  Set<TopicPartition> result = new HashSet<>();
  Set<TopicPartition> assignment = consumer.assignment();
  for (TopicPartition tp : assignment) {
    if (tp.topic().equals(topic)) {
      result.add(tp);
    }
  }
  return result;
}

代码示例来源:origin: vert-x3/vertx-kafka-client

@Override
public KafkaReadStream<K, V> assignment(Handler<AsyncResult<Set<TopicPartition>>> handler) {
 this.submitTask((consumer, future) -> {
  Set<TopicPartition> partitions = consumer.assignment();
  if (future != null) {
   future.complete(partitions);
  }
 }, handler);
 return this;
}

代码示例来源:origin: linkedin/li-apache-kafka-clients

@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
 Set<String> newSubscription = new HashSet<>(topics);
 // TODO: This is a hot fix for KAFKA-3664 and should be removed after the issue is fixed.
 commitSync();
 for (TopicPartition tp : _kafkaConsumer.assignment()) {
  if (!newSubscription.contains(tp.topic())) {
   _consumerRecordsProcessor.clear(tp);
  }
 }
 _consumerRebalanceListener.setUserListener(callback);
 _kafkaConsumer.subscribe(new ArrayList<>(topics), _consumerRebalanceListener);
}

相关文章