org.apache.kafka.clients.admin.AdminClient.describeTopics()方法的使用及代码示例

x33g5p2x  于2022-01-15 转载在 其他  
字(12.7k)|赞(0)|评价(0)|浏览(163)

本文整理了Java中org.apache.kafka.clients.admin.AdminClient.describeTopics()方法的一些代码示例,展示了AdminClient.describeTopics()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AdminClient.describeTopics()方法的具体详情如下:
包路径:org.apache.kafka.clients.admin.AdminClient
类名称:AdminClient
方法名:describeTopics

AdminClient.describeTopics介绍

[英]Describe some topics in the cluster, with the default options. This is a convenience method for # AdminClient#describeTopics(Collection,DescribeTopicsOptions) with default options. See the overload for more details.
[中]使用默认选项描述集群中的一些主题。对于带有默认选项的#AdminClient#describeTopics(集合,DescribeTopicsOptions),这是一种方便的方法。有关更多详细信息,请参阅重载。

代码示例

代码示例来源:origin: apache/kafka

/**
 * Describe some topics in the cluster, with the default options.
 *
 * This is a convenience method for #{@link AdminClient#describeTopics(Collection, DescribeTopicsOptions)} with
 * default options. See the overload for more details.
 *
 * @param topicNames        The names of the topics to describe.
 *
 * @return                  The DescribeTopicsResult.
 */
public DescribeTopicsResult describeTopics(Collection<String> topicNames) {
  return describeTopics(topicNames, new DescribeTopicsOptions());
}

代码示例来源:origin: apache/kafka

@Test
public void testAdminClientWithInvalidCredentials() {
  Map<String, Object> props = new HashMap<>(saslClientConfigs);
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
  try (AdminClient client = AdminClient.create(props)) {
    DescribeTopicsResult result = client.describeTopics(Collections.singleton("test"));
    result.all().get();
    fail("Expected an authentication error!");
  } catch (Exception e) {
    assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(),
        e.getCause() instanceof SaslAuthenticationException);
  }
}

代码示例来源:origin: apache/flink

@Override
public int getLeaderToShutDown(String topic) throws Exception {
  AdminClient client = AdminClient.create(getStandardProperties());
  TopicDescription result = client.describeTopics(Collections.singleton(topic)).all().get().get(topic);
  return result.partitions().get(0).leader().id();
}

代码示例来源:origin: apache/flume

public void createTopics(List<String> topicNames, int numPartitions) {
 List<NewTopic> newTopics = new ArrayList<>();
 for (String topicName: topicNames) {
  NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
  newTopics.add(newTopic);
 }
 getAdminClient().createTopics(newTopics);
 //the following lines are a bit of black magic to ensure the topic is ready when we return
 DescribeTopicsResult dtr = getAdminClient().describeTopics(topicNames);
 try {
  dtr.all().get(10, TimeUnit.SECONDS);
 } catch (Exception e) {
  throw new RuntimeException("Error getting topic info", e);
 }
}
public void deleteTopic(String topicName) {

代码示例来源:origin: apache/kafka

singletonList(partitionMetadata)))));
DescribeTopicsResult result = env.adminClient().describeTopics(Collections.singleton(topic));
Map<String, TopicDescription> topicDescriptions = result.all().get();
assertEquals(leader, topicDescriptions.get(topic).partitions().get(0).leader());

代码示例来源:origin: linkedin/cruise-control

private Map<String, TopicDescription> createTopics() throws InterruptedException {
 AdminClient adminClient = getAdminClient(broker(0).getPlaintextAddr());
 adminClient.createTopics(Arrays.asList(new NewTopic(TOPIC0, 1, (short) 1),
                     new NewTopic(TOPIC1, 1, (short) 2)));
 // We need to use the admin clients to query the metadata from two different brokers to make sure that
 // both brokers have the latest metadata. Otherwise the Executor may get confused when it does not
 // see expected topics in the metadata.
 Map<String, TopicDescription> topicDescriptions0 = null;
 Map<String, TopicDescription> topicDescriptions1 = null;
 do {
  try (AdminClient adminClient0 = getAdminClient(broker(0).getPlaintextAddr());
    AdminClient adminClient1 = getAdminClient(broker(1).getPlaintextAddr())) {
   topicDescriptions0 = adminClient0.describeTopics(Arrays.asList(TOPIC0, TOPIC1)).all().get();
   topicDescriptions1 = adminClient1.describeTopics(Arrays.asList(TOPIC0, TOPIC1)).all().get();
   try {
    Thread.sleep(100);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  } catch (ExecutionException ee) {
   // Let it go.
  }
 } while (topicDescriptions0 == null || topicDescriptions0.size() < 2
   || topicDescriptions1 == null || topicDescriptions1.size() < 2);
 return topicDescriptions0;
}

代码示例来源:origin: apache/kafka

@Test
public void testInvalidTopicNames() throws Exception {
  try (AdminClientUnitTestEnv env = mockClientEnv()) {
    env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
    List<String> sillyTopicNames = asList("", null);
    Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values();
    for (String sillyTopicName : sillyTopicNames) {
      TestUtils.assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
    }
    assertEquals(0, env.kafkaClient().inFlightRequestCount());
    Map<String, KafkaFuture<TopicDescription>> describeFutures =
        env.adminClient().describeTopics(sillyTopicNames).values();
    for (String sillyTopicName : sillyTopicNames) {
      TestUtils.assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
    }
    assertEquals(0, env.kafkaClient().inFlightRequestCount());
    List<NewTopic> newTopics = new ArrayList<>();
    for (String sillyTopicName : sillyTopicNames) {
      newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1));
    }
    Map<String, KafkaFuture<Void>> createFutures = env.adminClient().createTopics(newTopics).values();
    for (String sillyTopicName : sillyTopicNames) {
      TestUtils.assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
    }
    assertEquals(0, env.kafkaClient().inFlightRequestCount());
  }
}

代码示例来源:origin: spring-projects/spring-kafka

private void addTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
  if (topics.size() > 0) {
    Map<String, NewTopic> topicNameToTopic = new HashMap<>();
    topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> v = t));
    DescribeTopicsResult topicInfo = adminClient
        .describeTopics(topics.stream()
            .map(NewTopic::name)
            .collect(Collectors.toList()));
    List<NewTopic> topicsToAdd = new ArrayList<>();
    Map<String, NewPartitions> topicsToModify = checkPartitions(topicNameToTopic, topicInfo, topicsToAdd);
    if (topicsToAdd.size() > 0) {
      addTopics(adminClient, topicsToAdd);
    }
    if (topicsToModify.size() > 0) {
      modifyTopics(adminClient, topicsToModify);
    }
  }
}

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testAddTopics() throws Exception {
  AdminClient adminClient = AdminClient.create(this.admin.getConfig());
  DescribeTopicsResult topics = adminClient.describeTopics(Arrays.asList("foo", "bar"));
  topics.all().get();
  new DirectFieldAccessor(this.topic1).setPropertyValue("numPartitions", 2);
  new DirectFieldAccessor(this.topic2).setPropertyValue("numPartitions", 3);
  this.admin.initialize();
  topics = adminClient.describeTopics(Arrays.asList("foo", "bar"));
  Map<String, TopicDescription> results = topics.all().get();
  results.forEach((name, td) -> assertThat(td.partitions()).hasSize(name.equals("foo") ? 2 : 3));
  adminClient.close(10, TimeUnit.SECONDS);
}

代码示例来源:origin: kloiasoft/eventapis

private int getDefaultNumberOfPartitions(AdminClient adminClient) {
    try {
      return adminClient.describeTopics(Collections.singleton(Operation.OPERATION_EVENTS))
          .all().get().values().iterator().next()
          .partitions().size();
    } catch (InterruptedException | ExecutionException | NullPointerException e) {
      log.warn("Error while Calculating Number of Partitions from Topic: " + Operation.OPERATION_EVENTS + " Assuming " + DEFAULT_NUM_PARTITIONS);
      return DEFAULT_NUM_PARTITIONS;
    }
  }
}

代码示例来源:origin: salesforce/kafka-junit

/**
 * Describes a topic in Kafka.
 * @param topicName the topic to describe.
 * @return Description of the topic.
 */
public TopicDescription describeTopic(final String topicName) {
  // Create admin client
  try (final AdminClient adminClient = getAdminClient()) {
    // Make async call to describe the topic.
    final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topicName));
    return describeTopicsResult.values().get(topicName).get();
  } catch (final InterruptedException | ExecutionException e) {
    throw new RuntimeException(e.getMessage(), e);
  }
}

代码示例来源:origin: nbogojevic/kafka-operator

TopicDescription topicDescription(String topicName) {
 DescribeTopicsResult dt = adminClient.describeTopics(singleton(topicName));
 try {
  dt.all().get();
  TopicDescription topicDescription = dt.values().get(topicName).get();
  return topicDescription;
 } catch (InterruptedException | ExecutionException  e) {
  if (e.getCause() != null && e.getCause() instanceof UnknownTopicOrPartitionException) {
   return null;
  }
  throw new IllegalStateException("Exception occured during topic details retrieval. name: " + topicName, e);
 }
}

代码示例来源:origin: org.nuxeo.lib.stream/nuxeo-stream

public int getNumberOfPartitions(String topic) {
  DescribeTopicsResult descriptions = adminClient.describeTopics(Collections.singletonList(topic));
  try {
    return descriptions.values().get(topic).get().partitions().size();
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new StreamRuntimeException(e);
  } catch (ExecutionException e) {
    throw new StreamRuntimeException(e);
  }
}

代码示例来源:origin: allegro/hermes

private int numberOfPartitionsForTopic(Topic topic) throws ExecutionException, InterruptedException {
    List<String> kafkaTopicsNames = kafkaNamesMapper.toKafkaTopics(topic).stream()
        .map(kafkaTopic -> kafkaTopic.name().asString())
        .collect(Collectors.toList());

    return adminClient.describeTopics(kafkaTopicsNames).all().get().values().stream()
        .map(v -> v.partitions().size())
        .reduce(0, Integer::sum);
  }
}

代码示例来源:origin: variflight/feeyo-redisproxy

/**
 * 获取指定topic的配置信息
 */
public TopicDescription getDescriptionByTopicName(String topic) throws Exception {
  
  List<String> topics = new ArrayList<String>();
  topics.add(topic);
  DescribeTopicsOptions dto = new DescribeTopicsOptions();
  dto.timeoutMs(5 * 1000);
  DescribeTopicsResult dtr = adminClient.describeTopics(topics, dto);
  return dtr.all().get().get(topic);
}

代码示例来源:origin: com.opentable.components/otj-kafka

private void maybeCreateConsumerOffsets() throws InterruptedException {
  final String consumerOffsets = "__consumer_offsets";
  retry("create consumer offsets", () -> {
    Map<String, TopicDescription> description = admin.describeTopics(Collections.singleton(consumerOffsets)).all().get(10, TimeUnit.SECONDS);
    if (description.isEmpty()) {
      createTopic(consumerOffsets);
    }
  });
}

代码示例来源:origin: com.opentable.components/otj-kafka

private int numberOfPartitions(final String topic, final String brokerList) throws ExecutionException, InterruptedException {
    if (!brokerConfig.isEnabled()) {
      return 1;
    }
    final AdminClient client = makeAdminClient(brokerList);
    final DescribeTopicsResult result = client.describeTopics(Lists.newArrayList(topic));
    final TopicDescription topicDescription = result.all().get().get(topic);
    return topicDescription.partitions().size();
}

代码示例来源:origin: strimzi/strimzi-kafka-operator

/**
 * Get a topic config via the Kafka AdminClient API, calling the given handler
 * (in a different thread) with the result.
 */
@Override
public void topicMetadata(TopicName topicName, Handler<AsyncResult<TopicMetadata>> handler) {
  LOGGER.debug("Getting metadata for topic {}", topicName);
  ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName.toString());
  KafkaFuture<TopicDescription> descriptionFuture = adminClient.describeTopics(
      Collections.singleton(topicName.toString())).values().get(topicName.toString());
  KafkaFuture<Config> configFuture = adminClient.describeConfigs(
      Collections.singleton(resource)).values().get(resource);
  queueWork(new MetadataWork(descriptionFuture,
    configFuture,
    result -> handler.handle(result)));
}

代码示例来源:origin: kloiasoft/eventapis

@Override
boolean runInternal(StopWatch stopWatch) throws InterruptedException, ExecutionException {
  stopWatch.start("adminClient.listTopics()");
  Collection<String> topicNames = adminClient.listTopics().listings().get()
      .stream().map(TopicListing::name).filter(this::shouldCollectEvent).collect(Collectors.toList());
  topicsMap.removeAll(new RemoveTopicPredicate(topicNames));
  DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicNames);
  describeTopicsResult.all().get().forEach(
      (topic, topicDescription) -> topicsMap.executeOnKey(topic, new SetTopicPartitionsProcessor(
          topicDescription.partitions().stream().map(TopicPartitionInfo::partition).collect(Collectors.toList()))
      )
  );
  metaMap.set(this.getName() + TopicServiceScheduler.LAST_SUCCESS_PREFIX, System.currentTimeMillis());
  log.debug("Topics:" + topicsMap.entrySet());
  log.debug(stopWatch.prettyPrint());
  return true;
}

代码示例来源:origin: variflight/feeyo-redisproxy

public Map<String, TopicDescription> getTopicAndDescriptions() throws Exception {
  try {
    // 查询topic
    ListTopicsOptions lto = new ListTopicsOptions();
    lto.timeoutMs(10 * 1000);
    ListTopicsResult ltr = adminClient.listTopics(lto);
    
    // 查询topic配置信息
    DescribeTopicsOptions dto = new DescribeTopicsOptions();
    dto.timeoutMs(15 * 1000);
    DescribeTopicsResult dtr = adminClient.describeTopics(ltr.names().get(), dto);
    return dtr.all().get();
    
  } catch (Exception e) {
    throw e;
  }
}

相关文章