kafka.common.TopicAndPartition.topic()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(13.7k)|赞(0)|评价(0)|浏览(167)

本文整理了Java中kafka.common.TopicAndPartition.topic()方法的一些代码示例,展示了TopicAndPartition.topic()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopicAndPartition.topic()方法的具体详情如下:
包路径:kafka.common.TopicAndPartition
类名称:TopicAndPartition
方法名:topic

TopicAndPartition.topic介绍

暂无

代码示例

代码示例来源:origin: linkedin/cruise-control

/**
 * Check whether the topic has partitions undergoing partition reassignment and wait for the reassignments to finish.
 *
 * @param zkUtils the ZkUtils class used to check ongoing partition reassignments.
 * @return Whether there are no ongoing partition reassignments.
 */
public static boolean ensureTopicNotUnderPartitionReassignment(ZkUtils zkUtils, String topic) {
 int attempt = 0;
 while (JavaConversions.asJavaCollection(zkUtils.getPartitionsBeingReassigned().keys()).stream()
            .anyMatch(tp -> tp.topic().equals(topic))) {
  try {
   sleep(1000 << attempt);
  } catch (InterruptedException e) {
   // Let it go.
  }
  if (++attempt == 10) {
   return false;
  }
 }
 return true;
}

代码示例来源:origin: Graylog2/graylog2-server

/**
 * A Java transliteration of what the scala implementation does, which unfortunately is declared as private
 */
protected void flushDirtyLogs() {
  LOG.debug("Checking for dirty logs to flush...");
  final Set<Map.Entry<TopicAndPartition, Log>> entries = JavaConversions.mapAsJavaMap(logManager.logsByTopicPartition()).entrySet();
  for (final Map.Entry<TopicAndPartition, Log> topicAndPartitionLogEntry : entries) {
    final TopicAndPartition topicAndPartition = topicAndPartitionLogEntry.getKey();
    final Log kafkaLog = topicAndPartitionLogEntry.getValue();
    final long timeSinceLastFlush = JODA_TIME.milliseconds() - kafkaLog.lastFlushTime();
    try {
      LOG.debug(
          "Checking if flush is needed on {} flush interval {} last flushed {} time since last flush: {}",
          topicAndPartition.topic(),
          kafkaLog.config().flushInterval(),
          kafkaLog.lastFlushTime(),
          timeSinceLastFlush);
      if (timeSinceLastFlush >= kafkaLog.config().flushMs()) {
        kafkaLog.flush();
      }
    } catch (Exception e) {
      LOG.error("Error flushing topic " + topicAndPartition.topic(), e);
    }
  }
}

代码示例来源:origin: apache/incubator-pinot

@Override
public FetchResponse fetch(FetchRequest request) {
 scala.collection.Traversable<Tuple2<TopicAndPartition, PartitionFetchInfo>> requestInfo = request.requestInfo();
 java.util.Map<TopicAndPartition, Short> errorMap = new HashMap<>();
 while (requestInfo.headOption().isDefined()) {
  // jfim: IntelliJ erroneously thinks the following line is an incompatible type error, but it's only because
  // it doesn't understand scala covariance when called from Java (ie. it thinks head() is of type A even though
  // it's really of type Tuple2[TopicAndPartition, PartitionFetchInfo])
  Tuple2<TopicAndPartition, PartitionFetchInfo> t2 = requestInfo.head();
  TopicAndPartition topicAndPartition = t2._1();
  PartitionFetchInfo partitionFetchInfo = t2._2();
  if (!topicAndPartition.topic().equals(topicName)) {
   errorMap.put(topicAndPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
  } else if (partitionLeaderIndices.length < topicAndPartition.partition()) {
   errorMap.put(topicAndPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
  } else if (partitionLeaderIndices[topicAndPartition.partition()] != index) {
   errorMap.put(topicAndPartition, Errors.NOT_LEADER_FOR_PARTITION.code());
  } else {
   // Do nothing, we'll generate a fake message
  }
  requestInfo = requestInfo.tail();
 }
 return new MockFetchResponse(errorMap);
}

代码示例来源:origin: linkedin/camus

private String generateLogWarnForSkippedTopics(Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetInfo,
  SimpleConsumer consumer) {
 StringBuilder sb = new StringBuilder();
 sb.append("The following topics will be skipped due to failure in fetching latest offsets from leader "
   + consumer.host() + ":" + consumer.port());
 for (TopicAndPartition topicAndPartition : offsetInfo.keySet()) {
  sb.append("  " + topicAndPartition.topic());
 }
 return sb.toString();
}

代码示例来源:origin: uber/chaperone

private void injectMetrics(final TopicAndPartition topicAndPartition) {
 if (!partitionInjected.contains(topicAndPartition)) {
  Metrics.getRegistry().register(
    String.format(OFFSET_LAG_NAME_FORMAT, topicAndPartition.topic(), topicAndPartition.partition()),
    new Gauge<Long>() {
     @Override
     public Long getValue() {
      if (partitionLag.containsKey(topicAndPartition)) {
       return partitionLag.get(topicAndPartition);
      } else {
       return -1L;
      }
     }
    });
  partitionInjected.add(topicAndPartition);
 }
}

代码示例来源:origin: uber/chaperone

private static long getLatestOffset(SimpleConsumer consumer, TopicAndPartition topicAndPartition) {
  Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
  requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
  kafka.javaapi.OffsetRequest request =
    new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
  OffsetResponse response = consumer.getOffsetsBefore(request);

  if (response.hasError()) {
   logger.warn("Failed to fetch offset for {} due to {}", topicAndPartition,
     response.errorCode(topicAndPartition.topic(), topicAndPartition.partition()));
   return -1;
  }

  long[] offsets = response.offsets(topicAndPartition.topic(), topicAndPartition.partition());
  return offsets[0];
 }
}

代码示例来源:origin: linkedin/camus

long latestOffset = latestOffsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
long earliestOffset =
  earliestOffsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
  new EtlRequest(context, topicAndPartition.topic(), Integer.toString(leader.getLeaderId()),
    topicAndPartition.partition(), leader.getUri());
etlRequest.setLatestOffset(latestOffset);

代码示例来源:origin: uber/uReplicator

private static String getOffsetLagName(TopicAndPartition tp) {
 return "OffsetMonitorLag." + tp.topic().replace('.', '_') + "." + tp.partition();
}

代码示例来源:origin: com.cerner.common.kafka/common-kafka-admin

/**
 * Returns the set of all partitions for the given topic in the Kafka cluster
 *
 * @param topic
 *      a Kafka topic
 * @return unmodifiable set of all partitions for the given topic in the Kafka cluster
 * @throws AdminOperationException
 *      if there is an issue reading partitions from Kafka
 */
public Set<TopicAndPartition> getPartitions(String topic) {
  LOG.debug("Retrieving all partitions for topic [{}]", topic);
  return Collections.unmodifiableSet(
      getPartitions().stream().filter(p -> p.topic().equals(topic)).collect(Collectors.toSet()));
}

代码示例来源:origin: com.linkedin.camus/camus-etl-kafka

private String generateLogWarnForSkippedTopics(Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetInfo,
  SimpleConsumer consumer) {
 StringBuilder sb = new StringBuilder();
 sb.append("The following topics will be skipped due to failure in fetching latest offsets from leader "
   + consumer.host() + ":" + consumer.port());
 for (TopicAndPartition topicAndPartition : offsetInfo.keySet()) {
  sb.append("  " + topicAndPartition.topic());
 }
 return sb.toString();
}

代码示例来源:origin: allegro/hermes

@Override
@SuppressWarnings("unchecked")
public int readLeaderForPartition(TopicAndPartition topicAndPartition) {
  try {
    TopicPartition topicPartition = new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition());
    return (int)kafkaZkClient.getLeaderForPartition(topicPartition).get();
  } catch (Exception exception) {
    throw new BrokerNotFoundForPartitionException(topicAndPartition.topic(), topicAndPartition.partition(), exception);
  }
}

代码示例来源:origin: uber/uReplicator

protected void updateOffset() {
 logger.debug("OffsetMonitor updates offset with leaders=" + partitionLeader);
 offsetMonitorFailureCount.set(0);
 for (Map.Entry<TopicAndPartition, BrokerEndPoint> entry : partitionLeader.entrySet()) {
  String leaderBroker = getHostPort(entry.getValue());
  TopicAndPartition tp = entry.getKey();
  if (StringUtils.isEmpty(leaderBroker)) {
   logger.warn("{} does not have leader partition", tp);
  } else {
   try {
    cronExecutor.submit(updateOffsetTask(leaderBroker, tp));
   } catch (RejectedExecutionException re) {
    offsetMonitorFailureCount.getAndAdd(1);
    logger.warn(String.format("cronExecutor is full! Drop task for topic: %s, partition: %d",
      tp.topic(), tp.partition()), re);
    throw re;
   } catch (Throwable t) {
    offsetMonitorFailureCount.getAndAdd(1);
    logger.error(String.format("cronExecutor got throwable! Drop task for topic: %s, partition: %d",
      tp.topic(), tp.partition()), t);
    throw t;
   }
  }
 }
}

代码示例来源:origin: uber/uReplicator

/**
 * Get stuck topic partitions via offset manager.
 *
 * @return the topic partitions that have been stuck for at least _movePartitionAfterStuckMillis.
 */
private Set<TopicPartition> getStuckTopicPartitions() {
 Set<TopicPartition> partitions = new HashSet<>();
 if (_movePartitionAfterStuckMillis <= 0) {
  return partitions;
 }
 Map<TopicAndPartition, TopicPartitionLag> noProgressMap = _helixMirrorMakerManager.getOffsetMonitor()
   .getNoProgressTopicToOffsetMap();
 long now = System.currentTimeMillis();
 for (Map.Entry<TopicAndPartition, TopicPartitionLag> entry : noProgressMap.entrySet()) {
  TopicPartitionLag lastLag = entry.getValue();
  if (now - lastLag.getTimeStamp() > _movePartitionAfterStuckMillis) {
   partitions.add(new TopicPartition(entry.getKey().topic(), entry.getKey().partition()));
  }
 }
 return partitions;
}

代码示例来源:origin: org.opendaylight.centinel/centinel-laas

/**
 * @param zookeeperHosts
 *            Zookeeper hosts e.g. localhost:2181. If multiple zookeeper
 *            then host1:port1[,host2:port2,...]
 * @param groupID
 *            consumer group to update
 * @param offsets
 *            mapping of (topic and) partition to offset to push to
 *            Zookeeper
 */
public void createOffsets(String zookeeperHosts, String groupID, Map<TopicAndPartition, Long> offsets) {
  try (SuperZkClient zkClient = new SuperZkClient(zookeeperHosts)) {
    for (Map.Entry<TopicAndPartition, Long> entry : offsets.entrySet()) {
      TopicAndPartition topicAndPartition = entry.getKey();
      ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topicAndPartition.topic());
      int partition = topicAndPartition.partition();
      long offset = entry.getValue();
      String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
      ZkUtils.updatePersistentPath(zkClient, partitionOffsetPath, Long.toString(offset));
    }
  }
}

代码示例来源:origin: michal-harish/kafka-hadoop-loader

private long getEarliestOffset() {
  // return kafka.api.OffsetRequest.EarliestTime();
  Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<>();
  requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(EARLIEST_TIME, 1));
  OffsetRequest offsetRequest = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion(), CLIENT_ID);
  if (earliestOffset <= 0) {
    OffsetResponse offsetResponse = consumer.getOffsetsBefore(offsetRequest);
    earliestOffset = offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
  }
  return earliestOffset;
}

代码示例来源:origin: michal-harish/kafka-hadoop-loader

private long getLatestOffset() {
  // return kafka.api.OffsetRequest.LatestTime();
  Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<>();
  requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(LATEST_TIME, 1));
  OffsetRequest offsetRequest = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion(), CLIENT_ID);
  if (latestOffset <= 0) {
    OffsetResponse offsetResponse = consumer.getOffsetsBefore(offsetRequest);
    latestOffset = offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
  }
  return latestOffset;
}

代码示例来源:origin: uber/uReplicator

private long getLatestOffset(SimpleConsumer consumer, TopicAndPartition topicAndPartition) {
 Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
 requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
 kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,
   kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
 OffsetResponse response = consumer.getOffsetsBefore(request);
 if (response.hasError()) {
  logger.warn("Failed to fetch offset for {} due to {}", topicAndPartition,
    response.errorCode(topicAndPartition.topic(), topicAndPartition.partition()));
  return -1;
 }
 long[] offsets = response.offsets(topicAndPartition.topic(), topicAndPartition.partition());
 return offsets[0];
}

代码示例来源:origin: org.graylog2/graylog2-shared

/**
 * A Java transliteration of what the scala implementation does, which unfortunately is declared as private
 */
protected void flushDirtyLogs() {
  LOG.debug("Checking for dirty logs to flush...");
  final Set<Map.Entry<TopicAndPartition, Log>> entries = JavaConversions.mapAsJavaMap(logManager.logsByTopicPartition()).entrySet();
  for (final Map.Entry<TopicAndPartition, Log> topicAndPartitionLogEntry : entries) {
    final TopicAndPartition topicAndPartition = topicAndPartitionLogEntry.getKey();
    final Log kafkaLog = topicAndPartitionLogEntry.getValue();
    final long timeSinceLastFlush = JODA_TIME.milliseconds() - kafkaLog.lastFlushTime();
    try {
      LOG.debug(
          "Checking if flush is needed on {} flush interval {} last flushed {} time since last flush: {}",
          topicAndPartition.topic(),
          kafkaLog.config().flushInterval(),
          kafkaLog.lastFlushTime(),
          timeSinceLastFlush);
      if (timeSinceLastFlush >= kafkaLog.config().flushMs()) {
        kafkaLog.flush();
      }
    } catch (Exception e) {
      LOG.error("Error flushing topic " + topicAndPartition.topic(), e);
    }
  }
}

代码示例来源:origin: michal-harish/kafka-hadoop-loader

@Override
public void close() throws IOException {
  log.info(
    "Topic: {}, broker: {}, partition: {} ~ num. processed messages {}",
    topicAndPartition.topic(),
    split.getBrokerId(),
    topicAndPartition.partition(),
    numProcessedMessages
  );
  if (numProcessedMessages > 0) {
    try(KafkaZkUtils zk = new KafkaZkUtils(
      conf.get(KafkaInputFormat.CONFIG_ZK_CONNECT),
      conf.getInt(KafkaInputFormat.CONFIG_ZK_SESSION_TIMEOUT_MS, 10000),
      conf.getInt(KafkaInputFormat.CONFIG_ZK_SESSION_TIMEOUT_MS, 10000)
    )) {
      new CheckpointManager(conf, zk)
          .commitOffsets(split.getTopic(), split.getPartition(), nextOffsetToConsume - 1);
    }
  }
  consumer.close();
}

代码示例来源:origin: allegro/hermes

private void createLeaderForPartition(TopicAndPartition topicAndPartition, int leaderId) throws Exception {
  String path = "/brokers/topics/" + topicAndPartition.topic() + "/partitions/" + topicAndPartition.partition() + "/state";
  zookeeperClient.create().creatingParentsIfNeeded().forPath(path);
  zookeeperClient.setData().forPath(path, getSampleLeaderDetails(leaderId).getBytes());
}

相关文章

微信公众号

最新文章

更多