本文整理了Java中kafka.common.TopicAndPartition
类的一些代码示例,展示了TopicAndPartition
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopicAndPartition
类的具体详情如下:
包路径:kafka.common.TopicAndPartition
类名称:TopicAndPartition
暂无
代码示例来源:origin: prestodb/presto
private static long[] findAllOffsets(SimpleConsumer consumer, String topicName, int partitionId)
{
TopicAndPartition topicAndPartition = new TopicAndPartition(topicName, partitionId);
// The API implies that this will always return all of the offsets. So it seems a partition can not have
// more than Integer.MAX_VALUE-1 segments.
//
// This also assumes that the lowest value returned will be the first segment available. So if segments have been dropped off, this value
// should not be 0.
PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), Integer.MAX_VALUE);
OffsetRequest offsetRequest = new OffsetRequest(ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo), kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
OffsetResponse offsetResponse = consumer.getOffsetsBefore(offsetRequest);
if (offsetResponse.hasError()) {
short errorCode = offsetResponse.errorCode(topicName, partitionId);
throw new RuntimeException("could not fetch data from Kafka, error code is '" + errorCode + "'");
}
return offsetResponse.offsets(topicName, partitionId);
}
代码示例来源:origin: apache/incubator-pinot
@Override
public FetchResponse fetch(FetchRequest request) {
scala.collection.Traversable<Tuple2<TopicAndPartition, PartitionFetchInfo>> requestInfo = request.requestInfo();
java.util.Map<TopicAndPartition, Short> errorMap = new HashMap<>();
while (requestInfo.headOption().isDefined()) {
// jfim: IntelliJ erroneously thinks the following line is an incompatible type error, but it's only because
// it doesn't understand scala covariance when called from Java (ie. it thinks head() is of type A even though
// it's really of type Tuple2[TopicAndPartition, PartitionFetchInfo])
Tuple2<TopicAndPartition, PartitionFetchInfo> t2 = requestInfo.head();
TopicAndPartition topicAndPartition = t2._1();
PartitionFetchInfo partitionFetchInfo = t2._2();
if (!topicAndPartition.topic().equals(topicName)) {
errorMap.put(topicAndPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
} else if (partitionLeaderIndices.length < topicAndPartition.partition()) {
errorMap.put(topicAndPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
} else if (partitionLeaderIndices[topicAndPartition.partition()] != index) {
errorMap.put(topicAndPartition, Errors.NOT_LEADER_FOR_PARTITION.code());
} else {
// Do nothing, we'll generate a fake message
}
requestInfo = requestInfo.tail();
}
return new MockFetchResponse(errorMap);
}
代码示例来源:origin: com.linkedin.camus/camus-etl-kafka
@Override
public long getLastOffset(long time) {
SimpleConsumer consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), 60000, 1024 * 1024, "hadoop-etl");
Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetInfo =
new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
offsetInfo.put(new TopicAndPartition(topic, partition), new PartitionOffsetRequestInfo(time, 1));
OffsetResponse response =
consumer
.getOffsetsBefore(new OffsetRequest(offsetInfo, kafka.api.OffsetRequest.CurrentVersion(), "hadoop-etl"));
long[] endOffset = response.offsets(topic, partition);
consumer.close();
if (endOffset.length == 0) {
log.info("The exception is thrown because the latest offset retunred zero for topic : " + topic
+ " and partition " + partition);
}
this.latestOffset = endOffset[0];
return endOffset[0];
}
代码示例来源:origin: uber/chaperone
private static long getLatestOffset(SimpleConsumer consumer, TopicAndPartition topicAndPartition) {
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request =
new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
logger.warn("Failed to fetch offset for {} due to {}", topicAndPartition,
response.errorCode(topicAndPartition.topic(), topicAndPartition.partition()));
return -1;
}
long[] offsets = response.offsets(topicAndPartition.topic(), topicAndPartition.partition());
return offsets[0];
}
}
代码示例来源:origin: alibaba/jstorm
public long getOffset(String topic, int partition, long startOffsetTime) {
SimpleConsumer simpleConsumer = findLeaderConsumer(partition);
if (simpleConsumer == null) {
LOG.error("Error consumer is null get offset from partition:" + partition);
return -1;
}
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId());
long[] offsets = simpleConsumer.getOffsetsBefore(request).offsets(topic, partition);
if (offsets.length > 0) {
return offsets[0];
} else {
return NO_OFFSET;
}
}
代码示例来源:origin: gnuhpc/Kafka-zk-restapi
private long getOffsets(Node leader, String topic, int partitionId, long time) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
SimpleConsumer consumer =
new SimpleConsumer(leader.host(), leader.port(), 10000, 1024, "Kafka-zk-simpleconsumer");
PartitionOffsetRequestInfo partitionOffsetRequestInfo =
new PartitionOffsetRequestInfo(time, 10000);
OffsetRequest offsetRequest =
new OffsetRequest(
ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo),
kafka.api.OffsetRequest.CurrentVersion(),
consumer.clientId());
OffsetResponse offsetResponse = consumer.getOffsetsBefore(offsetRequest);
if (offsetResponse.hasError()) {
short errorCode = offsetResponse.errorCode(topic, partitionId);
log.warn(format("Offset response has error: %d", errorCode));
throw new ApiException(
"could not fetch data from Kafka, error code is '"
+ errorCode
+ "'Exception Message:"
+ offsetResponse.toString());
}
long[] offsets = offsetResponse.offsets(topic, partitionId);
consumer.close();
return offsets[0];
}
代码示例来源:origin: HiveKa/HiveKa
"hive_kafka_client");
PartitionOffsetRequestInfo partitionLatestOffsetRequestInfo = new PartitionOffsetRequestInfo(
kafka.api.OffsetRequest.LatestTime(), 1);
PartitionOffsetRequestInfo partitionEarliestOffsetRequestInfo = new PartitionOffsetRequestInfo(
kafka.api.OffsetRequest.EarliestTime(), 1);
Map<TopicAndPartition, PartitionOffsetRequestInfo> latestOffsetInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
Map<TopicAndPartition, PartitionOffsetRequestInfo> earliestOffsetInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
.getOffsetsBefore(new OffsetRequest(latestOffsetInfo,
kafka.api.OffsetRequest.CurrentVersion(), "hive_kafka_client"));
OffsetResponse earliestOffsetResponse = consumer
.getOffsetsBefore(new OffsetRequest(earliestOffsetInfo,
kafka.api.OffsetRequest.CurrentVersion(), "hive_kafka_client"));
consumer.close();
for (TopicAndPartition topicAndPartition : topicAndPartitions) {
long latestOffset = latestOffsetResponse.offsets(
topicAndPartition.topic(),
topicAndPartition.partition())[0];
long earliestOffset = earliestOffsetResponse.offsets(
topicAndPartition.topic(),
topicAndPartition.partition())[0];
topicAndPartition.topic(), Integer.toString(leader
.getLeaderId()), topicAndPartition.partition(),
leader.getUri());
etlRequest.setLatestOffset(latestOffset);
代码示例来源:origin: apache/crunch
final SimpleConsumer consumer = getSimpleConsumer(broker);
try {
topicMetadataResponse = consumer.send(topicMetadataRequest);
break;
} catch (Exception err) {
Arrays.toString(topics), endpoint.host()), err);
} finally {
consumer.close();
requestInfo = brokerRequests.get(leader);
requestInfo.put(new TopicAndPartition(metadata.topic(), partition.partitionId()), new PartitionOffsetRequestInfo(
time, 1));
TopicPartition topicPartition = new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition());
long[] offsets = offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition());
long offset;
if (time == kafka.api.OffsetRequest.EarliestTime())
throw new IllegalStateException("We requested the earliest offsets for topic [" + topicAndPartition.topic()
+ "] but Kafka returned no values");
kafka.api.OffsetRequest.EarliestTime(), topicAndPartition.topic());
代码示例来源:origin: michal-harish/kafka-hadoop-loader
private long getEarliestOffset() {
// return kafka.api.OffsetRequest.EarliestTime();
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<>();
requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(EARLIEST_TIME, 1));
OffsetRequest offsetRequest = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion(), CLIENT_ID);
if (earliestOffset <= 0) {
OffsetResponse offsetResponse = consumer.getOffsetsBefore(offsetRequest);
earliestOffset = offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
}
return earliestOffset;
}
代码示例来源:origin: apache/incubator-gobblin
@Override
protected long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetRequestInfo =
Collections.singletonMap(new TopicAndPartition(partition.getTopicName(), partition.getId()),
new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
return getOffset(partition, offsetRequestInfo);
}
代码示例来源:origin: linkedin/camus
new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1);
new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1);
Map<TopicAndPartition, PartitionOffsetRequestInfo> latestOffsetInfo =
new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
earliestOffsetResponse = getLatestOffsetResponse(consumer, earliestOffsetInfo, context);
consumer.close();
if (earliestOffsetResponse == null) {
log.warn(generateLogWarnForSkippedTopics(earliestOffsetInfo, consumer));
long latestOffset = latestOffsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
long earliestOffset =
earliestOffsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
new EtlRequest(context, topicAndPartition.topic(), Integer.toString(leader.getLeaderId()),
topicAndPartition.partition(), leader.getUri());
etlRequest.setLatestOffset(latestOffset);
etlRequest.setEarliestOffset(earliestOffset);
代码示例来源:origin: apache/incubator-gobblin
@Override
protected long getEarliestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetRequestInfo =
Collections.singletonMap(new TopicAndPartition(partition.getTopicName(), partition.getId()),
new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1));
return getOffset(partition, offsetRequestInfo);
}
代码示例来源:origin: apache/flink
@Override
protected TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
return new TopicAndPartition(partition.getTopic(), partition.getPartition());
}
代码示例来源:origin: michal-harish/kafka-hadoop-loader
this.split = (KafkaInputSplit) split;
this.nextOffsetToConsume = this.split.getStartOffset();
this.topicAndPartition = new TopicAndPartition(this.split.getTopic(), this.split.getPartition());
this.fetchSize = conf.getInt(KafkaInputFormat.CONFIG_KAFKA_MESSAGE_MAX_BYTES, 1024 * 1024);
this.timeout = conf.getInt(KafkaInputFormat.CONFIG_KAFKA_SOCKET_TIMEOUT_MS, 3000);
int bufferSize = conf.getInt(KafkaInputFormat.CONFIG_KAFKA_RECEIVE_BUFFER_BYTES, 64 * 1024);
consumer = new SimpleConsumer(
this.split.getBrokerHost(), this.split.getBrokerPort(), timeout, bufferSize, CLIENT_ID);
topicAndPartition.topic(),
this.split.getBrokerId(),
topicAndPartition.partition(),
nextOffsetToConsume,
earliestOffset
topicAndPartition.topic(),
this.split.getBrokerId(),
topicAndPartition.partition(),
this.split,
earliestOffset,
代码示例来源:origin: uber/chaperone
private void updateLeaderMap() {
for (String broker : brokerList) {
try {
SimpleConsumer consumer = getSimpleConsumer(broker);
TopicMetadataRequest req = new TopicMetadataRequest(auditTopics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata tmd : metaData) {
for (PartitionMetadata pmd : tmd.partitionsMetadata()) {
TopicAndPartition topicAndPartition = new TopicAndPartition(tmd.topic(), pmd.partitionId());
partitionLeader.put(topicAndPartition, getHostPort(pmd.leader()));
}
}
} catch (Exception e) {
logger.warn("Got exception to get metadata from broker=" + broker, e);
}
}
}
代码示例来源:origin: linkedin/camus
TopicAndPartition topicAndPartition = new TopicAndPartition(kafkaRequest.getTopic(), kafkaRequest.getPartition());
log.debug("\nAsking for offset : " + (currentOffset));
PartitionFetchInfo partitionFetchInfo = new PartitionFetchInfo(currentOffset, fetchBufferSize);
fetchResponse = simpleConsumer.fetch(fetchRequest);
if (fetchResponse.hasError()) {
String message =
代码示例来源:origin: uber/uReplicator
SimpleConsumer consumer = getSimpleConsumer(broker);
TopicMetadataRequest req = new TopicMetadataRequest(topicList);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
TopicAndPartition topicAndPartition = new TopicAndPartition(tmd.topic(), pmd.partitionId());
if (topicSet.contains(tmd.topic())) {
partitionLeader.put(topicAndPartition, pmd.leader());
while (iter.hasNext()) {
TopicAndPartition tp = iter.next().getKey();
if (!topicSet.contains(tp.topic())) {
iter.remove();
logger.info("Remove non exist topic {} from noProgressMap", tp);
代码示例来源:origin: com.ebay.jetstream/jetstream-messaging
TopicAndPartition key = new TopicAndPartition(topic,
partition);
PartitionReader p = m_partitionMap.get(key);
String key = new TopicAndPartition(topic, partition)
.toString();
LOGGER.error(
if (!topic.equals(tp.topic()))
continue;
int partition = tp.partition();
if (!myPartitions.contains(partition)) {
LOGGER.info( "Try to release " + tp
代码示例来源:origin: michal-harish/kafka-hadoop-loader
@Override
public void close() throws IOException {
log.info(
"Topic: {}, broker: {}, partition: {} ~ num. processed messages {}",
topicAndPartition.topic(),
split.getBrokerId(),
topicAndPartition.partition(),
numProcessedMessages
);
if (numProcessedMessages > 0) {
try(KafkaZkUtils zk = new KafkaZkUtils(
conf.get(KafkaInputFormat.CONFIG_ZK_CONNECT),
conf.getInt(KafkaInputFormat.CONFIG_ZK_SESSION_TIMEOUT_MS, 10000),
conf.getInt(KafkaInputFormat.CONFIG_ZK_SESSION_TIMEOUT_MS, 10000)
)) {
new CheckpointManager(conf, zk)
.commitOffsets(split.getTopic(), split.getPartition(), nextOffsetToConsume - 1);
}
}
consumer.close();
}
代码示例来源:origin: HomeAdvisor/Kafdrop
groupId,
topic.getPartitions().stream()
.map(p -> new TopicAndPartition(topic.getName(), p.getId()))
.collect(Collectors.toList()),
(short) (zookeeperOffsets ? 0 : 1), 0, // version 0 = zookeeper offsets, 1 = kafka offsets
.collect(Collectors.toMap(entry -> entry.getKey().partition(), entry -> entry.getValue().offset()));
内容来源于网络,如有侵权,请联系作者删除!