本文整理了Java中org.apache.kafka.clients.admin.AdminClient.describeTopics()
方法的一些代码示例,展示了AdminClient.describeTopics()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AdminClient.describeTopics()
方法的具体详情如下:
包路径:org.apache.kafka.clients.admin.AdminClient
类名称:AdminClient
方法名:describeTopics
[英]Describe some topics in the cluster, with the default options. This is a convenience method for # AdminClient#describeTopics(Collection,DescribeTopicsOptions) with default options. See the overload for more details.
[中]使用默认选项描述集群中的一些主题。对于带有默认选项的#AdminClient#describeTopics(集合,DescribeTopicsOptions),这是一种方便的方法。有关更多详细信息,请参阅重载。
代码示例来源:origin: apache/kafka
/**
* Describe some topics in the cluster, with the default options.
*
* This is a convenience method for #{@link AdminClient#describeTopics(Collection, DescribeTopicsOptions)} with
* default options. See the overload for more details.
*
* @param topicNames The names of the topics to describe.
*
* @return The DescribeTopicsResult.
*/
public DescribeTopicsResult describeTopics(Collection<String> topicNames) {
return describeTopics(topicNames, new DescribeTopicsOptions());
}
代码示例来源:origin: apache/kafka
@Test
public void testAdminClientWithInvalidCredentials() {
Map<String, Object> props = new HashMap<>(saslClientConfigs);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
try (AdminClient client = AdminClient.create(props)) {
DescribeTopicsResult result = client.describeTopics(Collections.singleton("test"));
result.all().get();
fail("Expected an authentication error!");
} catch (Exception e) {
assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(),
e.getCause() instanceof SaslAuthenticationException);
}
}
代码示例来源:origin: apache/flink
@Override
public int getLeaderToShutDown(String topic) throws Exception {
AdminClient client = AdminClient.create(getStandardProperties());
TopicDescription result = client.describeTopics(Collections.singleton(topic)).all().get().get(topic);
return result.partitions().get(0).leader().id();
}
代码示例来源:origin: apache/flume
public void createTopics(List<String> topicNames, int numPartitions) {
List<NewTopic> newTopics = new ArrayList<>();
for (String topicName: topicNames) {
NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
newTopics.add(newTopic);
}
getAdminClient().createTopics(newTopics);
//the following lines are a bit of black magic to ensure the topic is ready when we return
DescribeTopicsResult dtr = getAdminClient().describeTopics(topicNames);
try {
dtr.all().get(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Error getting topic info", e);
}
}
public void deleteTopic(String topicName) {
代码示例来源:origin: apache/kafka
singletonList(partitionMetadata)))));
DescribeTopicsResult result = env.adminClient().describeTopics(Collections.singleton(topic));
Map<String, TopicDescription> topicDescriptions = result.all().get();
assertEquals(leader, topicDescriptions.get(topic).partitions().get(0).leader());
代码示例来源:origin: linkedin/cruise-control
private Map<String, TopicDescription> createTopics() throws InterruptedException {
AdminClient adminClient = getAdminClient(broker(0).getPlaintextAddr());
adminClient.createTopics(Arrays.asList(new NewTopic(TOPIC0, 1, (short) 1),
new NewTopic(TOPIC1, 1, (short) 2)));
// We need to use the admin clients to query the metadata from two different brokers to make sure that
// both brokers have the latest metadata. Otherwise the Executor may get confused when it does not
// see expected topics in the metadata.
Map<String, TopicDescription> topicDescriptions0 = null;
Map<String, TopicDescription> topicDescriptions1 = null;
do {
try (AdminClient adminClient0 = getAdminClient(broker(0).getPlaintextAddr());
AdminClient adminClient1 = getAdminClient(broker(1).getPlaintextAddr())) {
topicDescriptions0 = adminClient0.describeTopics(Arrays.asList(TOPIC0, TOPIC1)).all().get();
topicDescriptions1 = adminClient1.describeTopics(Arrays.asList(TOPIC0, TOPIC1)).all().get();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (ExecutionException ee) {
// Let it go.
}
} while (topicDescriptions0 == null || topicDescriptions0.size() < 2
|| topicDescriptions1 == null || topicDescriptions1.size() < 2);
return topicDescriptions0;
}
代码示例来源:origin: apache/kafka
@Test
public void testInvalidTopicNames() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
List<String> sillyTopicNames = asList("", null);
Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values();
for (String sillyTopicName : sillyTopicNames) {
TestUtils.assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
}
assertEquals(0, env.kafkaClient().inFlightRequestCount());
Map<String, KafkaFuture<TopicDescription>> describeFutures =
env.adminClient().describeTopics(sillyTopicNames).values();
for (String sillyTopicName : sillyTopicNames) {
TestUtils.assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
}
assertEquals(0, env.kafkaClient().inFlightRequestCount());
List<NewTopic> newTopics = new ArrayList<>();
for (String sillyTopicName : sillyTopicNames) {
newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1));
}
Map<String, KafkaFuture<Void>> createFutures = env.adminClient().createTopics(newTopics).values();
for (String sillyTopicName : sillyTopicNames) {
TestUtils.assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
}
assertEquals(0, env.kafkaClient().inFlightRequestCount());
}
}
代码示例来源:origin: spring-projects/spring-kafka
private void addTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
if (topics.size() > 0) {
Map<String, NewTopic> topicNameToTopic = new HashMap<>();
topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> v = t));
DescribeTopicsResult topicInfo = adminClient
.describeTopics(topics.stream()
.map(NewTopic::name)
.collect(Collectors.toList()));
List<NewTopic> topicsToAdd = new ArrayList<>();
Map<String, NewPartitions> topicsToModify = checkPartitions(topicNameToTopic, topicInfo, topicsToAdd);
if (topicsToAdd.size() > 0) {
addTopics(adminClient, topicsToAdd);
}
if (topicsToModify.size() > 0) {
modifyTopics(adminClient, topicsToModify);
}
}
}
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testAddTopics() throws Exception {
AdminClient adminClient = AdminClient.create(this.admin.getConfig());
DescribeTopicsResult topics = adminClient.describeTopics(Arrays.asList("foo", "bar"));
topics.all().get();
new DirectFieldAccessor(this.topic1).setPropertyValue("numPartitions", 2);
new DirectFieldAccessor(this.topic2).setPropertyValue("numPartitions", 3);
this.admin.initialize();
topics = adminClient.describeTopics(Arrays.asList("foo", "bar"));
Map<String, TopicDescription> results = topics.all().get();
results.forEach((name, td) -> assertThat(td.partitions()).hasSize(name.equals("foo") ? 2 : 3));
adminClient.close(10, TimeUnit.SECONDS);
}
代码示例来源:origin: kloiasoft/eventapis
private int getDefaultNumberOfPartitions(AdminClient adminClient) {
try {
return adminClient.describeTopics(Collections.singleton(Operation.OPERATION_EVENTS))
.all().get().values().iterator().next()
.partitions().size();
} catch (InterruptedException | ExecutionException | NullPointerException e) {
log.warn("Error while Calculating Number of Partitions from Topic: " + Operation.OPERATION_EVENTS + " Assuming " + DEFAULT_NUM_PARTITIONS);
return DEFAULT_NUM_PARTITIONS;
}
}
}
代码示例来源:origin: salesforce/kafka-junit
/**
* Describes a topic in Kafka.
* @param topicName the topic to describe.
* @return Description of the topic.
*/
public TopicDescription describeTopic(final String topicName) {
// Create admin client
try (final AdminClient adminClient = getAdminClient()) {
// Make async call to describe the topic.
final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topicName));
return describeTopicsResult.values().get(topicName).get();
} catch (final InterruptedException | ExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
代码示例来源:origin: nbogojevic/kafka-operator
TopicDescription topicDescription(String topicName) {
DescribeTopicsResult dt = adminClient.describeTopics(singleton(topicName));
try {
dt.all().get();
TopicDescription topicDescription = dt.values().get(topicName).get();
return topicDescription;
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() != null && e.getCause() instanceof UnknownTopicOrPartitionException) {
return null;
}
throw new IllegalStateException("Exception occured during topic details retrieval. name: " + topicName, e);
}
}
代码示例来源:origin: org.nuxeo.lib.stream/nuxeo-stream
public int getNumberOfPartitions(String topic) {
DescribeTopicsResult descriptions = adminClient.describeTopics(Collections.singletonList(topic));
try {
return descriptions.values().get(topic).get().partitions().size();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StreamRuntimeException(e);
} catch (ExecutionException e) {
throw new StreamRuntimeException(e);
}
}
代码示例来源:origin: allegro/hermes
private int numberOfPartitionsForTopic(Topic topic) throws ExecutionException, InterruptedException {
List<String> kafkaTopicsNames = kafkaNamesMapper.toKafkaTopics(topic).stream()
.map(kafkaTopic -> kafkaTopic.name().asString())
.collect(Collectors.toList());
return adminClient.describeTopics(kafkaTopicsNames).all().get().values().stream()
.map(v -> v.partitions().size())
.reduce(0, Integer::sum);
}
}
代码示例来源:origin: variflight/feeyo-redisproxy
/**
* 获取指定topic的配置信息
*/
public TopicDescription getDescriptionByTopicName(String topic) throws Exception {
List<String> topics = new ArrayList<String>();
topics.add(topic);
DescribeTopicsOptions dto = new DescribeTopicsOptions();
dto.timeoutMs(5 * 1000);
DescribeTopicsResult dtr = adminClient.describeTopics(topics, dto);
return dtr.all().get().get(topic);
}
代码示例来源:origin: com.opentable.components/otj-kafka
private void maybeCreateConsumerOffsets() throws InterruptedException {
final String consumerOffsets = "__consumer_offsets";
retry("create consumer offsets", () -> {
Map<String, TopicDescription> description = admin.describeTopics(Collections.singleton(consumerOffsets)).all().get(10, TimeUnit.SECONDS);
if (description.isEmpty()) {
createTopic(consumerOffsets);
}
});
}
代码示例来源:origin: com.opentable.components/otj-kafka
private int numberOfPartitions(final String topic, final String brokerList) throws ExecutionException, InterruptedException {
if (!brokerConfig.isEnabled()) {
return 1;
}
final AdminClient client = makeAdminClient(brokerList);
final DescribeTopicsResult result = client.describeTopics(Lists.newArrayList(topic));
final TopicDescription topicDescription = result.all().get().get(topic);
return topicDescription.partitions().size();
}
代码示例来源:origin: strimzi/strimzi-kafka-operator
/**
* Get a topic config via the Kafka AdminClient API, calling the given handler
* (in a different thread) with the result.
*/
@Override
public void topicMetadata(TopicName topicName, Handler<AsyncResult<TopicMetadata>> handler) {
LOGGER.debug("Getting metadata for topic {}", topicName);
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName.toString());
KafkaFuture<TopicDescription> descriptionFuture = adminClient.describeTopics(
Collections.singleton(topicName.toString())).values().get(topicName.toString());
KafkaFuture<Config> configFuture = adminClient.describeConfigs(
Collections.singleton(resource)).values().get(resource);
queueWork(new MetadataWork(descriptionFuture,
configFuture,
result -> handler.handle(result)));
}
代码示例来源:origin: kloiasoft/eventapis
@Override
boolean runInternal(StopWatch stopWatch) throws InterruptedException, ExecutionException {
stopWatch.start("adminClient.listTopics()");
Collection<String> topicNames = adminClient.listTopics().listings().get()
.stream().map(TopicListing::name).filter(this::shouldCollectEvent).collect(Collectors.toList());
topicsMap.removeAll(new RemoveTopicPredicate(topicNames));
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicNames);
describeTopicsResult.all().get().forEach(
(topic, topicDescription) -> topicsMap.executeOnKey(topic, new SetTopicPartitionsProcessor(
topicDescription.partitions().stream().map(TopicPartitionInfo::partition).collect(Collectors.toList()))
)
);
metaMap.set(this.getName() + TopicServiceScheduler.LAST_SUCCESS_PREFIX, System.currentTimeMillis());
log.debug("Topics:" + topicsMap.entrySet());
log.debug(stopWatch.prettyPrint());
return true;
}
代码示例来源:origin: variflight/feeyo-redisproxy
public Map<String, TopicDescription> getTopicAndDescriptions() throws Exception {
try {
// 查询topic
ListTopicsOptions lto = new ListTopicsOptions();
lto.timeoutMs(10 * 1000);
ListTopicsResult ltr = adminClient.listTopics(lto);
// 查询topic配置信息
DescribeTopicsOptions dto = new DescribeTopicsOptions();
dto.timeoutMs(15 * 1000);
DescribeTopicsResult dtr = adminClient.describeTopics(ltr.names().get(), dto);
return dtr.all().get();
} catch (Exception e) {
throw e;
}
}
内容来源于网络,如有侵权,请联系作者删除!