本文整理了Java中kafka.common.TopicAndPartition.topic()
方法的一些代码示例,展示了TopicAndPartition.topic()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TopicAndPartition.topic()
方法的具体详情如下:
包路径:kafka.common.TopicAndPartition
类名称:TopicAndPartition
方法名:topic
暂无
代码示例来源:origin: linkedin/cruise-control
/**
* Check whether the topic has partitions undergoing partition reassignment and wait for the reassignments to finish.
*
* @param zkUtils the ZkUtils class used to check ongoing partition reassignments.
* @return Whether there are no ongoing partition reassignments.
*/
public static boolean ensureTopicNotUnderPartitionReassignment(ZkUtils zkUtils, String topic) {
int attempt = 0;
while (JavaConversions.asJavaCollection(zkUtils.getPartitionsBeingReassigned().keys()).stream()
.anyMatch(tp -> tp.topic().equals(topic))) {
try {
sleep(1000 << attempt);
} catch (InterruptedException e) {
// Let it go.
}
if (++attempt == 10) {
return false;
}
}
return true;
}
代码示例来源:origin: Graylog2/graylog2-server
/**
* A Java transliteration of what the scala implementation does, which unfortunately is declared as private
*/
protected void flushDirtyLogs() {
LOG.debug("Checking for dirty logs to flush...");
final Set<Map.Entry<TopicAndPartition, Log>> entries = JavaConversions.mapAsJavaMap(logManager.logsByTopicPartition()).entrySet();
for (final Map.Entry<TopicAndPartition, Log> topicAndPartitionLogEntry : entries) {
final TopicAndPartition topicAndPartition = topicAndPartitionLogEntry.getKey();
final Log kafkaLog = topicAndPartitionLogEntry.getValue();
final long timeSinceLastFlush = JODA_TIME.milliseconds() - kafkaLog.lastFlushTime();
try {
LOG.debug(
"Checking if flush is needed on {} flush interval {} last flushed {} time since last flush: {}",
topicAndPartition.topic(),
kafkaLog.config().flushInterval(),
kafkaLog.lastFlushTime(),
timeSinceLastFlush);
if (timeSinceLastFlush >= kafkaLog.config().flushMs()) {
kafkaLog.flush();
}
} catch (Exception e) {
LOG.error("Error flushing topic " + topicAndPartition.topic(), e);
}
}
}
代码示例来源:origin: apache/incubator-pinot
@Override
public FetchResponse fetch(FetchRequest request) {
scala.collection.Traversable<Tuple2<TopicAndPartition, PartitionFetchInfo>> requestInfo = request.requestInfo();
java.util.Map<TopicAndPartition, Short> errorMap = new HashMap<>();
while (requestInfo.headOption().isDefined()) {
// jfim: IntelliJ erroneously thinks the following line is an incompatible type error, but it's only because
// it doesn't understand scala covariance when called from Java (ie. it thinks head() is of type A even though
// it's really of type Tuple2[TopicAndPartition, PartitionFetchInfo])
Tuple2<TopicAndPartition, PartitionFetchInfo> t2 = requestInfo.head();
TopicAndPartition topicAndPartition = t2._1();
PartitionFetchInfo partitionFetchInfo = t2._2();
if (!topicAndPartition.topic().equals(topicName)) {
errorMap.put(topicAndPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
} else if (partitionLeaderIndices.length < topicAndPartition.partition()) {
errorMap.put(topicAndPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
} else if (partitionLeaderIndices[topicAndPartition.partition()] != index) {
errorMap.put(topicAndPartition, Errors.NOT_LEADER_FOR_PARTITION.code());
} else {
// Do nothing, we'll generate a fake message
}
requestInfo = requestInfo.tail();
}
return new MockFetchResponse(errorMap);
}
代码示例来源:origin: linkedin/camus
private String generateLogWarnForSkippedTopics(Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetInfo,
SimpleConsumer consumer) {
StringBuilder sb = new StringBuilder();
sb.append("The following topics will be skipped due to failure in fetching latest offsets from leader "
+ consumer.host() + ":" + consumer.port());
for (TopicAndPartition topicAndPartition : offsetInfo.keySet()) {
sb.append(" " + topicAndPartition.topic());
}
return sb.toString();
}
代码示例来源:origin: uber/chaperone
private void injectMetrics(final TopicAndPartition topicAndPartition) {
if (!partitionInjected.contains(topicAndPartition)) {
Metrics.getRegistry().register(
String.format(OFFSET_LAG_NAME_FORMAT, topicAndPartition.topic(), topicAndPartition.partition()),
new Gauge<Long>() {
@Override
public Long getValue() {
if (partitionLag.containsKey(topicAndPartition)) {
return partitionLag.get(topicAndPartition);
} else {
return -1L;
}
}
});
partitionInjected.add(topicAndPartition);
}
}
代码示例来源:origin: uber/chaperone
private static long getLatestOffset(SimpleConsumer consumer, TopicAndPartition topicAndPartition) {
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request =
new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
logger.warn("Failed to fetch offset for {} due to {}", topicAndPartition,
response.errorCode(topicAndPartition.topic(), topicAndPartition.partition()));
return -1;
}
long[] offsets = response.offsets(topicAndPartition.topic(), topicAndPartition.partition());
return offsets[0];
}
}
代码示例来源:origin: linkedin/camus
long latestOffset = latestOffsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
long earliestOffset =
earliestOffsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
new EtlRequest(context, topicAndPartition.topic(), Integer.toString(leader.getLeaderId()),
topicAndPartition.partition(), leader.getUri());
etlRequest.setLatestOffset(latestOffset);
代码示例来源:origin: uber/uReplicator
private static String getOffsetLagName(TopicAndPartition tp) {
return "OffsetMonitorLag." + tp.topic().replace('.', '_') + "." + tp.partition();
}
代码示例来源:origin: com.cerner.common.kafka/common-kafka-admin
/**
* Returns the set of all partitions for the given topic in the Kafka cluster
*
* @param topic
* a Kafka topic
* @return unmodifiable set of all partitions for the given topic in the Kafka cluster
* @throws AdminOperationException
* if there is an issue reading partitions from Kafka
*/
public Set<TopicAndPartition> getPartitions(String topic) {
LOG.debug("Retrieving all partitions for topic [{}]", topic);
return Collections.unmodifiableSet(
getPartitions().stream().filter(p -> p.topic().equals(topic)).collect(Collectors.toSet()));
}
代码示例来源:origin: com.linkedin.camus/camus-etl-kafka
private String generateLogWarnForSkippedTopics(Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetInfo,
SimpleConsumer consumer) {
StringBuilder sb = new StringBuilder();
sb.append("The following topics will be skipped due to failure in fetching latest offsets from leader "
+ consumer.host() + ":" + consumer.port());
for (TopicAndPartition topicAndPartition : offsetInfo.keySet()) {
sb.append(" " + topicAndPartition.topic());
}
return sb.toString();
}
代码示例来源:origin: allegro/hermes
@Override
@SuppressWarnings("unchecked")
public int readLeaderForPartition(TopicAndPartition topicAndPartition) {
try {
TopicPartition topicPartition = new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition());
return (int)kafkaZkClient.getLeaderForPartition(topicPartition).get();
} catch (Exception exception) {
throw new BrokerNotFoundForPartitionException(topicAndPartition.topic(), topicAndPartition.partition(), exception);
}
}
代码示例来源:origin: uber/uReplicator
protected void updateOffset() {
logger.debug("OffsetMonitor updates offset with leaders=" + partitionLeader);
offsetMonitorFailureCount.set(0);
for (Map.Entry<TopicAndPartition, BrokerEndPoint> entry : partitionLeader.entrySet()) {
String leaderBroker = getHostPort(entry.getValue());
TopicAndPartition tp = entry.getKey();
if (StringUtils.isEmpty(leaderBroker)) {
logger.warn("{} does not have leader partition", tp);
} else {
try {
cronExecutor.submit(updateOffsetTask(leaderBroker, tp));
} catch (RejectedExecutionException re) {
offsetMonitorFailureCount.getAndAdd(1);
logger.warn(String.format("cronExecutor is full! Drop task for topic: %s, partition: %d",
tp.topic(), tp.partition()), re);
throw re;
} catch (Throwable t) {
offsetMonitorFailureCount.getAndAdd(1);
logger.error(String.format("cronExecutor got throwable! Drop task for topic: %s, partition: %d",
tp.topic(), tp.partition()), t);
throw t;
}
}
}
}
代码示例来源:origin: uber/uReplicator
/**
* Get stuck topic partitions via offset manager.
*
* @return the topic partitions that have been stuck for at least _movePartitionAfterStuckMillis.
*/
private Set<TopicPartition> getStuckTopicPartitions() {
Set<TopicPartition> partitions = new HashSet<>();
if (_movePartitionAfterStuckMillis <= 0) {
return partitions;
}
Map<TopicAndPartition, TopicPartitionLag> noProgressMap = _helixMirrorMakerManager.getOffsetMonitor()
.getNoProgressTopicToOffsetMap();
long now = System.currentTimeMillis();
for (Map.Entry<TopicAndPartition, TopicPartitionLag> entry : noProgressMap.entrySet()) {
TopicPartitionLag lastLag = entry.getValue();
if (now - lastLag.getTimeStamp() > _movePartitionAfterStuckMillis) {
partitions.add(new TopicPartition(entry.getKey().topic(), entry.getKey().partition()));
}
}
return partitions;
}
代码示例来源:origin: org.opendaylight.centinel/centinel-laas
/**
* @param zookeeperHosts
* Zookeeper hosts e.g. localhost:2181. If multiple zookeeper
* then host1:port1[,host2:port2,...]
* @param groupID
* consumer group to update
* @param offsets
* mapping of (topic and) partition to offset to push to
* Zookeeper
*/
public void createOffsets(String zookeeperHosts, String groupID, Map<TopicAndPartition, Long> offsets) {
try (SuperZkClient zkClient = new SuperZkClient(zookeeperHosts)) {
for (Map.Entry<TopicAndPartition, Long> entry : offsets.entrySet()) {
TopicAndPartition topicAndPartition = entry.getKey();
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topicAndPartition.topic());
int partition = topicAndPartition.partition();
long offset = entry.getValue();
String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
ZkUtils.updatePersistentPath(zkClient, partitionOffsetPath, Long.toString(offset));
}
}
}
代码示例来源:origin: michal-harish/kafka-hadoop-loader
private long getEarliestOffset() {
// return kafka.api.OffsetRequest.EarliestTime();
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<>();
requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(EARLIEST_TIME, 1));
OffsetRequest offsetRequest = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion(), CLIENT_ID);
if (earliestOffset <= 0) {
OffsetResponse offsetResponse = consumer.getOffsetsBefore(offsetRequest);
earliestOffset = offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
}
return earliestOffset;
}
代码示例来源:origin: michal-harish/kafka-hadoop-loader
private long getLatestOffset() {
// return kafka.api.OffsetRequest.LatestTime();
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<>();
requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(LATEST_TIME, 1));
OffsetRequest offsetRequest = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion(), CLIENT_ID);
if (latestOffset <= 0) {
OffsetResponse offsetResponse = consumer.getOffsetsBefore(offsetRequest);
latestOffset = offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
}
return latestOffset;
}
代码示例来源:origin: uber/uReplicator
private long getLatestOffset(SimpleConsumer consumer, TopicAndPartition topicAndPartition) {
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
logger.warn("Failed to fetch offset for {} due to {}", topicAndPartition,
response.errorCode(topicAndPartition.topic(), topicAndPartition.partition()));
return -1;
}
long[] offsets = response.offsets(topicAndPartition.topic(), topicAndPartition.partition());
return offsets[0];
}
代码示例来源:origin: org.graylog2/graylog2-shared
/**
* A Java transliteration of what the scala implementation does, which unfortunately is declared as private
*/
protected void flushDirtyLogs() {
LOG.debug("Checking for dirty logs to flush...");
final Set<Map.Entry<TopicAndPartition, Log>> entries = JavaConversions.mapAsJavaMap(logManager.logsByTopicPartition()).entrySet();
for (final Map.Entry<TopicAndPartition, Log> topicAndPartitionLogEntry : entries) {
final TopicAndPartition topicAndPartition = topicAndPartitionLogEntry.getKey();
final Log kafkaLog = topicAndPartitionLogEntry.getValue();
final long timeSinceLastFlush = JODA_TIME.milliseconds() - kafkaLog.lastFlushTime();
try {
LOG.debug(
"Checking if flush is needed on {} flush interval {} last flushed {} time since last flush: {}",
topicAndPartition.topic(),
kafkaLog.config().flushInterval(),
kafkaLog.lastFlushTime(),
timeSinceLastFlush);
if (timeSinceLastFlush >= kafkaLog.config().flushMs()) {
kafkaLog.flush();
}
} catch (Exception e) {
LOG.error("Error flushing topic " + topicAndPartition.topic(), e);
}
}
}
代码示例来源:origin: michal-harish/kafka-hadoop-loader
@Override
public void close() throws IOException {
log.info(
"Topic: {}, broker: {}, partition: {} ~ num. processed messages {}",
topicAndPartition.topic(),
split.getBrokerId(),
topicAndPartition.partition(),
numProcessedMessages
);
if (numProcessedMessages > 0) {
try(KafkaZkUtils zk = new KafkaZkUtils(
conf.get(KafkaInputFormat.CONFIG_ZK_CONNECT),
conf.getInt(KafkaInputFormat.CONFIG_ZK_SESSION_TIMEOUT_MS, 10000),
conf.getInt(KafkaInputFormat.CONFIG_ZK_SESSION_TIMEOUT_MS, 10000)
)) {
new CheckpointManager(conf, zk)
.commitOffsets(split.getTopic(), split.getPartition(), nextOffsetToConsume - 1);
}
}
consumer.close();
}
代码示例来源:origin: allegro/hermes
private void createLeaderForPartition(TopicAndPartition topicAndPartition, int leaderId) throws Exception {
String path = "/brokers/topics/" + topicAndPartition.topic() + "/partitions/" + topicAndPartition.partition() + "/state";
zookeeperClient.create().creatingParentsIfNeeded().forPath(path);
zookeeperClient.setData().forPath(path, getSampleLeaderDetails(leaderId).getBytes());
}
内容来源于网络,如有侵权,请联系作者删除!