org.apache.kafka.clients.admin.AdminClient.createPartitions()方法的使用及代码示例

x33g5p2x  于2022-01-16 转载在 其他  
字(7.5k)|赞(0)|评价(0)|浏览(185)

本文整理了Java中org.apache.kafka.clients.admin.AdminClient.createPartitions()方法的一些代码示例,展示了AdminClient.createPartitions()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AdminClient.createPartitions()方法的具体详情如下:
包路径:org.apache.kafka.clients.admin.AdminClient
类名称:AdminClient
方法名:createPartitions

AdminClient.createPartitions介绍

[英]Increase the number of partitions of the topics given as the keys of newPartitionsaccording to the corresponding values. If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected.

This is a convenience method for #createPartitions(Map,CreatePartitionsOptions) with default options. See the overload for more details.
[中]根据相应的值,增加作为NewPartitions键的主题的分区数。如果为具有密钥的主题增加分区,则分区逻辑或消息顺序将受到影响。
对于带有默认选项的#createPartitions(映射、CreatePartitionsOptions),这是一种方便的方法。有关更多详细信息,请参阅重载。

代码示例

代码示例来源:origin: apache/kafka

/**
 * <p>Increase the number of partitions of the topics given as the keys of {@code newPartitions}
 * according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
 * the partition logic or ordering of the messages will be affected.</strong></p>
 *
 * <p>This is a convenience method for {@link #createPartitions(Map, CreatePartitionsOptions)} with default options.
 * See the overload for more details.</p>
 *
 * @param newPartitions The topics which should have new partitions created, and corresponding parameters
 *                      for the created partitions.
 * @return              The CreatePartitionsResult.
 */
public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions) {
  return createPartitions(newPartitions, new CreatePartitionsOptions());
}

代码示例来源:origin: apache/kafka

@Test
public void testCreatePartitions() throws Exception {
  try (AdminClientUnitTestEnv env = mockClientEnv()) {
    env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
    Map<String, ApiError> m = new HashMap<>();
    m.put("my_topic", ApiError.NONE);
    m.put("other_topic", ApiError.fromThrowable(new InvalidTopicException("some detailed reason")));
    // Test a call where one filter has an error.
    env.kafkaClient().prepareResponse(new CreatePartitionsResponse(0, m));
    Map<String, NewPartitions> counts = new HashMap<>();
    counts.put("my_topic", NewPartitions.increaseTo(3));
    counts.put("other_topic", NewPartitions.increaseTo(3, asList(asList(2), asList(3))));
    CreatePartitionsResult results = env.adminClient().createPartitions(counts);
    Map<String, KafkaFuture<Void>> values = results.values();
    KafkaFuture<Void> myTopicResult = values.get("my_topic");
    myTopicResult.get();
    KafkaFuture<Void> otherTopicResult = values.get("other_topic");
    try {
      otherTopicResult.get();
      fail("get() should throw ExecutionException");
    } catch (ExecutionException e0) {
      assertTrue(e0.getCause() instanceof InvalidTopicException);
      InvalidTopicException e = (InvalidTopicException) e0.getCause();
      assertEquals("some detailed reason", e.getMessage());
    }
  }
}

代码示例来源:origin: apache/kafka

counts.put("my_topic", NewPartitions.increaseTo(3));
  counts.put("other_topic", NewPartitions.increaseTo(3, asList(asList(2), asList(3))));
  env.adminClient().createPartitions(counts).all().get();
  fail("Expected an authentication error.");
} catch (ExecutionException e) {

代码示例来源:origin: spring-projects/spring-kafka

private void modifyTopics(AdminClient adminClient, Map<String, NewPartitions> topicsToModify) {
  CreatePartitionsResult partitionsResult = adminClient.createPartitions(topicsToModify);
  try {
    partitionsResult.all().get(this.operationTimeout, TimeUnit.SECONDS);
  }
  catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    logger.error("Interrupted while waiting for partition creation results", e);
  }
  catch (TimeoutException e) {
    throw new KafkaException("Timed out waiting for create partitions results", e);
  }
  catch (ExecutionException e) {
    if (e.getCause() instanceof InvalidPartitionsException) { // Possible race with another app instance
      logger.debug("Failed to create partitions", e.getCause());
    }
    else {
      logger.error("Failed to create partitions", e.getCause());
      if (!(e.getCause() instanceof UnsupportedVersionException)) {
        throw new KafkaException("Failed to create partitions", e.getCause()); // NOSONAR
      }
    }
  }
}

代码示例来源:origin: variflight/feeyo-redisproxy

/**
 * 给topic增加分区
 */
public CreatePartitionsResult addPartitionsForTopic(String topic, int partitions) {
  Map<String, NewPartitions> map = new HashMap<>();
  NewPartitions np = NewPartitions.increaseTo(partitions);
  map.put(topic, np);
  CreatePartitionsOptions cpo = new CreatePartitionsOptions();
  cpo.timeoutMs(5 * 1000);
  return adminClient.createPartitions(map, cpo);
}

代码示例来源:origin: strimzi/strimzi-kafka-operator

@Override
public void increasePartitions(Topic topic, Handler<AsyncResult<Void>> handler) {
  final NewPartitions newPartitions = NewPartitions.increaseTo(topic.getNumPartitions());
  final Map<String, NewPartitions> request = Collections.singletonMap(topic.getTopicName().toString(), newPartitions);
  KafkaFuture<Void> future = adminClient.createPartitions(request).values().get(topic.getTopicName().toString());
  queueWork(new UniWork<>("increasePartitions", future, handler));
}

代码示例来源:origin: nbogojevic/kafka-operator

private void updateTopic(Topic topic) {
 log.debug("Topic exists. name {}", topic.getName());
 TopicWithParitions oldTopic = topic(topic.getName());
 if (topic.getPartitions() > oldTopic.getPartitions()) {
  adminClient.createPartitions(singletonMap(topic.getName(), NewPartitions.increaseTo(topic.getPartitions())));
  changedTopics.inc();
  log.info("Updated topic. name: {}, new partitions: {}", topic.getName(), topic.getPartitions());
 } else if (topic.getPartitions() < oldTopic.getPartitions()) {
  log.warn("Unable to reduce number of partitions. name: {}, requested partitions: {}, original partitions {}",
       topic.getName(), topic.getPartitions(),  oldTopic.getPartitions());
 }
 if (topic.getReplicationFactor() != 0 && topic.getReplicationFactor() != oldTopic.getReplicationFactor()) {
  log.error("Replication factor change not supported. name: {}, requested replication-factor: {}, original replication-factor {}",
       topic.getName(), topic.getReplicationFactor(), oldTopic.getReplicationFactor());
 }
 if (topic.getProperties() == null || !topic.getProperties().equals(oldTopic.getProperties())) {
  log.info("Updating topic properties. name: {}, new properties: {}", topic.getName(), topic.getProperties());
  ConfigResource cr = new ConfigResource(ConfigResource.Type.TOPIC, topic.getName());
  List<ConfigEntry> entries = new ArrayList<>();
  topic.getProperties().forEach((k, v) -> entries.add(new ConfigEntry(k.toString(), v.toString())));
  adminClient.alterConfigs(singletonMap(cr, new Config(entries)));
  changedTopics.inc();
 } else {
  log.debug("Topic properties are same. name: {}, new properties={}, old properties={}", topic.getName(), topic.getProperties(), oldTopic.getProperties());      
 }
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka

if (partitionSize < effectivePartitionCount) {
  if (this.configurationProperties.isAutoAddPartitions()) {
    CreatePartitionsResult partitions = adminClient.createPartitions(
        Collections.singletonMap(topicName, NewPartitions.increaseTo(effectivePartitionCount)));
    partitions.all().get(this.operationTimeout, TimeUnit.SECONDS);

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka-core

if (partitionSize < effectivePartitionCount) {
  if (this.configurationProperties.isAutoAddPartitions()) {
    CreatePartitionsResult partitions = adminClient.createPartitions(
        Collections.singletonMap(topicName, NewPartitions.increaseTo(effectivePartitionCount)));
    partitions.all().get(this.operationTimeout, TimeUnit.SECONDS);

相关文章