kafka.common.TopicAndPartition类的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(14.7k)|赞(0)|评价(0)|浏览(149)

本文整理了Java中kafka.common.TopicAndPartition类的一些代码示例,展示了TopicAndPartition类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopicAndPartition类的具体详情如下:
包路径:kafka.common.TopicAndPartition
类名称:TopicAndPartition

TopicAndPartition介绍

暂无

代码示例

代码示例来源:origin: prestodb/presto

private static long[] findAllOffsets(SimpleConsumer consumer, String topicName, int partitionId)
{
  TopicAndPartition topicAndPartition = new TopicAndPartition(topicName, partitionId);
  // The API implies that this will always return all of the offsets. So it seems a partition can not have
  // more than Integer.MAX_VALUE-1 segments.
  //
  // This also assumes that the lowest value returned will be the first segment available. So if segments have been dropped off, this value
  // should not be 0.
  PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), Integer.MAX_VALUE);
  OffsetRequest offsetRequest = new OffsetRequest(ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo), kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
  OffsetResponse offsetResponse = consumer.getOffsetsBefore(offsetRequest);
  if (offsetResponse.hasError()) {
    short errorCode = offsetResponse.errorCode(topicName, partitionId);
    throw new RuntimeException("could not fetch data from Kafka, error code is '" + errorCode + "'");
  }
  return offsetResponse.offsets(topicName, partitionId);
}

代码示例来源:origin: apache/incubator-pinot

@Override
public FetchResponse fetch(FetchRequest request) {
 scala.collection.Traversable<Tuple2<TopicAndPartition, PartitionFetchInfo>> requestInfo = request.requestInfo();
 java.util.Map<TopicAndPartition, Short> errorMap = new HashMap<>();
 while (requestInfo.headOption().isDefined()) {
  // jfim: IntelliJ erroneously thinks the following line is an incompatible type error, but it's only because
  // it doesn't understand scala covariance when called from Java (ie. it thinks head() is of type A even though
  // it's really of type Tuple2[TopicAndPartition, PartitionFetchInfo])
  Tuple2<TopicAndPartition, PartitionFetchInfo> t2 = requestInfo.head();
  TopicAndPartition topicAndPartition = t2._1();
  PartitionFetchInfo partitionFetchInfo = t2._2();
  if (!topicAndPartition.topic().equals(topicName)) {
   errorMap.put(topicAndPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
  } else if (partitionLeaderIndices.length < topicAndPartition.partition()) {
   errorMap.put(topicAndPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
  } else if (partitionLeaderIndices[topicAndPartition.partition()] != index) {
   errorMap.put(topicAndPartition, Errors.NOT_LEADER_FOR_PARTITION.code());
  } else {
   // Do nothing, we'll generate a fake message
  }
  requestInfo = requestInfo.tail();
 }
 return new MockFetchResponse(errorMap);
}

代码示例来源:origin: com.linkedin.camus/camus-etl-kafka

@Override
public long getLastOffset(long time) {
 SimpleConsumer consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), 60000, 1024 * 1024, "hadoop-etl");
 Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetInfo =
   new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
 offsetInfo.put(new TopicAndPartition(topic, partition), new PartitionOffsetRequestInfo(time, 1));
 OffsetResponse response =
   consumer
     .getOffsetsBefore(new OffsetRequest(offsetInfo, kafka.api.OffsetRequest.CurrentVersion(), "hadoop-etl"));
 long[] endOffset = response.offsets(topic, partition);
 consumer.close();
 if (endOffset.length == 0) {
  log.info("The exception is thrown because the latest offset retunred zero for topic : " + topic
    + " and partition " + partition);
 }
 this.latestOffset = endOffset[0];
 return endOffset[0];
}

代码示例来源:origin: uber/chaperone

private static long getLatestOffset(SimpleConsumer consumer, TopicAndPartition topicAndPartition) {
  Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
  requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
  kafka.javaapi.OffsetRequest request =
    new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
  OffsetResponse response = consumer.getOffsetsBefore(request);

  if (response.hasError()) {
   logger.warn("Failed to fetch offset for {} due to {}", topicAndPartition,
     response.errorCode(topicAndPartition.topic(), topicAndPartition.partition()));
   return -1;
  }

  long[] offsets = response.offsets(topicAndPartition.topic(), topicAndPartition.partition());
  return offsets[0];
 }
}

代码示例来源:origin: alibaba/jstorm

public long getOffset(String topic, int partition, long startOffsetTime) {
  SimpleConsumer simpleConsumer = findLeaderConsumer(partition);
  if (simpleConsumer == null) {
    LOG.error("Error consumer is null get offset from partition:" + partition);
    return -1;
  }
  TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
  Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
  requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
  OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId());
  long[] offsets = simpleConsumer.getOffsetsBefore(request).offsets(topic, partition);
  if (offsets.length > 0) {
    return offsets[0];
  } else {
    return NO_OFFSET;
  }
}

代码示例来源:origin: gnuhpc/Kafka-zk-restapi

private long getOffsets(Node leader, String topic, int partitionId, long time) {
 TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
 SimpleConsumer consumer =
   new SimpleConsumer(leader.host(), leader.port(), 10000, 1024, "Kafka-zk-simpleconsumer");
 PartitionOffsetRequestInfo partitionOffsetRequestInfo =
   new PartitionOffsetRequestInfo(time, 10000);
 OffsetRequest offsetRequest =
   new OffsetRequest(
     ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo),
     kafka.api.OffsetRequest.CurrentVersion(),
     consumer.clientId());
 OffsetResponse offsetResponse = consumer.getOffsetsBefore(offsetRequest);
 if (offsetResponse.hasError()) {
  short errorCode = offsetResponse.errorCode(topic, partitionId);
  log.warn(format("Offset response has error: %d", errorCode));
  throw new ApiException(
    "could not fetch data from Kafka, error code is '"
      + errorCode
      + "'Exception Message:"
      + offsetResponse.toString());
 }
 long[] offsets = offsetResponse.offsets(topic, partitionId);
 consumer.close();
 return offsets[0];
}

代码示例来源:origin: HiveKa/HiveKa

"hive_kafka_client");
PartitionOffsetRequestInfo partitionLatestOffsetRequestInfo = new PartitionOffsetRequestInfo(
    kafka.api.OffsetRequest.LatestTime(), 1);
PartitionOffsetRequestInfo partitionEarliestOffsetRequestInfo = new PartitionOffsetRequestInfo(
    kafka.api.OffsetRequest.EarliestTime(), 1);
Map<TopicAndPartition, PartitionOffsetRequestInfo> latestOffsetInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
Map<TopicAndPartition, PartitionOffsetRequestInfo> earliestOffsetInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    .getOffsetsBefore(new OffsetRequest(latestOffsetInfo,
        kafka.api.OffsetRequest.CurrentVersion(), "hive_kafka_client"));
OffsetResponse earliestOffsetResponse = consumer
    .getOffsetsBefore(new OffsetRequest(earliestOffsetInfo,
        kafka.api.OffsetRequest.CurrentVersion(), "hive_kafka_client"));
consumer.close();
for (TopicAndPartition topicAndPartition : topicAndPartitions) {
  long latestOffset = latestOffsetResponse.offsets(
      topicAndPartition.topic(),
      topicAndPartition.partition())[0];
  long earliestOffset = earliestOffsetResponse.offsets(
      topicAndPartition.topic(),
      topicAndPartition.partition())[0];
      topicAndPartition.topic(), Integer.toString(leader
          .getLeaderId()), topicAndPartition.partition(),
      leader.getUri());
  etlRequest.setLatestOffset(latestOffset);

代码示例来源:origin: apache/crunch

final SimpleConsumer consumer = getSimpleConsumer(broker);
try {
 topicMetadataResponse = consumer.send(topicMetadataRequest);
 break;
} catch (Exception err) {
   Arrays.toString(topics), endpoint.host()), err);
} finally {
 consumer.close();
  requestInfo = brokerRequests.get(leader);
 requestInfo.put(new TopicAndPartition(metadata.topic(), partition.partitionId()), new PartitionOffsetRequestInfo(
   time, 1));
 TopicPartition topicPartition = new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition());
 long[] offsets = offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition());
 long offset;
  if (time == kafka.api.OffsetRequest.EarliestTime())
   throw new IllegalStateException("We requested the earliest offsets for topic [" + topicAndPartition.topic()
     + "] but Kafka returned no values");
     kafka.api.OffsetRequest.EarliestTime(), topicAndPartition.topic());

代码示例来源:origin: michal-harish/kafka-hadoop-loader

private long getEarliestOffset() {
  // return kafka.api.OffsetRequest.EarliestTime();
  Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<>();
  requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(EARLIEST_TIME, 1));
  OffsetRequest offsetRequest = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion(), CLIENT_ID);
  if (earliestOffset <= 0) {
    OffsetResponse offsetResponse = consumer.getOffsetsBefore(offsetRequest);
    earliestOffset = offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
  }
  return earliestOffset;
}

代码示例来源:origin: apache/incubator-gobblin

@Override
protected long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
 Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetRequestInfo =
   Collections.singletonMap(new TopicAndPartition(partition.getTopicName(), partition.getId()),
     new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
 return getOffset(partition, offsetRequestInfo);
}

代码示例来源:origin: linkedin/camus

new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1);
  new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1);
Map<TopicAndPartition, PartitionOffsetRequestInfo> latestOffsetInfo =
  new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
 earliestOffsetResponse = getLatestOffsetResponse(consumer, earliestOffsetInfo, context);
consumer.close();
if (earliestOffsetResponse == null) {
 log.warn(generateLogWarnForSkippedTopics(earliestOffsetInfo, consumer));
 long latestOffset = latestOffsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
 long earliestOffset =
   earliestOffsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
   new EtlRequest(context, topicAndPartition.topic(), Integer.toString(leader.getLeaderId()),
     topicAndPartition.partition(), leader.getUri());
 etlRequest.setLatestOffset(latestOffset);
 etlRequest.setEarliestOffset(earliestOffset);

代码示例来源:origin: apache/incubator-gobblin

@Override
protected long getEarliestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
 Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetRequestInfo =
   Collections.singletonMap(new TopicAndPartition(partition.getTopicName(), partition.getId()),
     new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1));
 return getOffset(partition, offsetRequestInfo);
}

代码示例来源:origin: apache/flink

@Override
protected TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
  return new TopicAndPartition(partition.getTopic(), partition.getPartition());
}

代码示例来源:origin: michal-harish/kafka-hadoop-loader

this.split = (KafkaInputSplit) split;
this.nextOffsetToConsume = this.split.getStartOffset();
this.topicAndPartition = new TopicAndPartition(this.split.getTopic(), this.split.getPartition());
this.fetchSize = conf.getInt(KafkaInputFormat.CONFIG_KAFKA_MESSAGE_MAX_BYTES, 1024 * 1024);
this.timeout = conf.getInt(KafkaInputFormat.CONFIG_KAFKA_SOCKET_TIMEOUT_MS, 3000);
int bufferSize = conf.getInt(KafkaInputFormat.CONFIG_KAFKA_RECEIVE_BUFFER_BYTES, 64 * 1024);
consumer = new SimpleConsumer(
    this.split.getBrokerHost(), this.split.getBrokerPort(), timeout, bufferSize, CLIENT_ID);
    topicAndPartition.topic(),
    this.split.getBrokerId(),
    topicAndPartition.partition(),
    nextOffsetToConsume,
    earliestOffset
  topicAndPartition.topic(),
  this.split.getBrokerId(),
  topicAndPartition.partition(),
  this.split,
  earliestOffset,

代码示例来源:origin: uber/chaperone

private void updateLeaderMap() {
 for (String broker : brokerList) {
  try {
   SimpleConsumer consumer = getSimpleConsumer(broker);
   TopicMetadataRequest req = new TopicMetadataRequest(auditTopics);
   kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
   List<TopicMetadata> metaData = resp.topicsMetadata();
   for (TopicMetadata tmd : metaData) {
    for (PartitionMetadata pmd : tmd.partitionsMetadata()) {
     TopicAndPartition topicAndPartition = new TopicAndPartition(tmd.topic(), pmd.partitionId());
     partitionLeader.put(topicAndPartition, getHostPort(pmd.leader()));
    }
   }
  } catch (Exception e) {
   logger.warn("Got exception to get metadata from broker=" + broker, e);
  }
 }
}

代码示例来源:origin: linkedin/camus

TopicAndPartition topicAndPartition = new TopicAndPartition(kafkaRequest.getTopic(), kafkaRequest.getPartition());
log.debug("\nAsking for offset : " + (currentOffset));
PartitionFetchInfo partitionFetchInfo = new PartitionFetchInfo(currentOffset, fetchBufferSize);
 fetchResponse = simpleConsumer.fetch(fetchRequest);
 if (fetchResponse.hasError()) {
  String message =

代码示例来源:origin: uber/uReplicator

SimpleConsumer consumer = getSimpleConsumer(broker);
TopicMetadataRequest req = new TopicMetadataRequest(topicList);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
  TopicAndPartition topicAndPartition = new TopicAndPartition(tmd.topic(), pmd.partitionId());
  if (topicSet.contains(tmd.topic())) {
   partitionLeader.put(topicAndPartition, pmd.leader());
while (iter.hasNext()) {
 TopicAndPartition tp = iter.next().getKey();
 if (!topicSet.contains(tp.topic())) {
  iter.remove();
  logger.info("Remove non exist topic {} from noProgressMap", tp);

代码示例来源:origin: com.ebay.jetstream/jetstream-messaging

TopicAndPartition key = new TopicAndPartition(topic,
      partition);
  PartitionReader p = m_partitionMap.get(key);
  String key = new TopicAndPartition(topic, partition)
      .toString();
  LOGGER.error(
if (!topic.equals(tp.topic()))
  continue;
int partition = tp.partition();
if (!myPartitions.contains(partition)) {
  LOGGER.info( "Try to release " + tp

代码示例来源:origin: michal-harish/kafka-hadoop-loader

@Override
public void close() throws IOException {
  log.info(
    "Topic: {}, broker: {}, partition: {} ~ num. processed messages {}",
    topicAndPartition.topic(),
    split.getBrokerId(),
    topicAndPartition.partition(),
    numProcessedMessages
  );
  if (numProcessedMessages > 0) {
    try(KafkaZkUtils zk = new KafkaZkUtils(
      conf.get(KafkaInputFormat.CONFIG_ZK_CONNECT),
      conf.getInt(KafkaInputFormat.CONFIG_ZK_SESSION_TIMEOUT_MS, 10000),
      conf.getInt(KafkaInputFormat.CONFIG_ZK_SESSION_TIMEOUT_MS, 10000)
    )) {
      new CheckpointManager(conf, zk)
          .commitOffsets(split.getTopic(), split.getPartition(), nextOffsetToConsume - 1);
    }
  }
  consumer.close();
}

代码示例来源:origin: HomeAdvisor/Kafdrop

groupId,
topic.getPartitions().stream()
 .map(p -> new TopicAndPartition(topic.getName(), p.getId()))
 .collect(Collectors.toList()),
(short) (zookeeperOffsets ? 0 : 1), 0, // version 0 = zookeeper offsets, 1 = kafka offsets
.collect(Collectors.toMap(entry -> entry.getKey().partition(), entry -> entry.getValue().offset()));

相关文章

微信公众号

最新文章

更多