本文整理了Java中org.apache.kafka.clients.admin.AdminClient.createPartitions()
方法的一些代码示例,展示了AdminClient.createPartitions()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AdminClient.createPartitions()
方法的具体详情如下:
包路径:org.apache.kafka.clients.admin.AdminClient
类名称:AdminClient
方法名:createPartitions
[英]Increase the number of partitions of the topics given as the keys of newPartitionsaccording to the corresponding values. If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected.
This is a convenience method for #createPartitions(Map,CreatePartitionsOptions) with default options. See the overload for more details.
[中]根据相应的值,增加作为NewPartitions键的主题的分区数。如果为具有密钥的主题增加分区,则分区逻辑或消息顺序将受到影响。
对于带有默认选项的#createPartitions(映射、CreatePartitionsOptions),这是一种方便的方法。有关更多详细信息,请参阅重载。
代码示例来源:origin: apache/kafka
/**
* <p>Increase the number of partitions of the topics given as the keys of {@code newPartitions}
* according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
* the partition logic or ordering of the messages will be affected.</strong></p>
*
* <p>This is a convenience method for {@link #createPartitions(Map, CreatePartitionsOptions)} with default options.
* See the overload for more details.</p>
*
* @param newPartitions The topics which should have new partitions created, and corresponding parameters
* for the created partitions.
* @return The CreatePartitionsResult.
*/
public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions) {
return createPartitions(newPartitions, new CreatePartitionsOptions());
}
代码示例来源:origin: apache/kafka
@Test
public void testCreatePartitions() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
Map<String, ApiError> m = new HashMap<>();
m.put("my_topic", ApiError.NONE);
m.put("other_topic", ApiError.fromThrowable(new InvalidTopicException("some detailed reason")));
// Test a call where one filter has an error.
env.kafkaClient().prepareResponse(new CreatePartitionsResponse(0, m));
Map<String, NewPartitions> counts = new HashMap<>();
counts.put("my_topic", NewPartitions.increaseTo(3));
counts.put("other_topic", NewPartitions.increaseTo(3, asList(asList(2), asList(3))));
CreatePartitionsResult results = env.adminClient().createPartitions(counts);
Map<String, KafkaFuture<Void>> values = results.values();
KafkaFuture<Void> myTopicResult = values.get("my_topic");
myTopicResult.get();
KafkaFuture<Void> otherTopicResult = values.get("other_topic");
try {
otherTopicResult.get();
fail("get() should throw ExecutionException");
} catch (ExecutionException e0) {
assertTrue(e0.getCause() instanceof InvalidTopicException);
InvalidTopicException e = (InvalidTopicException) e0.getCause();
assertEquals("some detailed reason", e.getMessage());
}
}
}
代码示例来源:origin: apache/kafka
counts.put("my_topic", NewPartitions.increaseTo(3));
counts.put("other_topic", NewPartitions.increaseTo(3, asList(asList(2), asList(3))));
env.adminClient().createPartitions(counts).all().get();
fail("Expected an authentication error.");
} catch (ExecutionException e) {
代码示例来源:origin: spring-projects/spring-kafka
private void modifyTopics(AdminClient adminClient, Map<String, NewPartitions> topicsToModify) {
CreatePartitionsResult partitionsResult = adminClient.createPartitions(topicsToModify);
try {
partitionsResult.all().get(this.operationTimeout, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Interrupted while waiting for partition creation results", e);
}
catch (TimeoutException e) {
throw new KafkaException("Timed out waiting for create partitions results", e);
}
catch (ExecutionException e) {
if (e.getCause() instanceof InvalidPartitionsException) { // Possible race with another app instance
logger.debug("Failed to create partitions", e.getCause());
}
else {
logger.error("Failed to create partitions", e.getCause());
if (!(e.getCause() instanceof UnsupportedVersionException)) {
throw new KafkaException("Failed to create partitions", e.getCause()); // NOSONAR
}
}
}
}
代码示例来源:origin: variflight/feeyo-redisproxy
/**
* 给topic增加分区
*/
public CreatePartitionsResult addPartitionsForTopic(String topic, int partitions) {
Map<String, NewPartitions> map = new HashMap<>();
NewPartitions np = NewPartitions.increaseTo(partitions);
map.put(topic, np);
CreatePartitionsOptions cpo = new CreatePartitionsOptions();
cpo.timeoutMs(5 * 1000);
return adminClient.createPartitions(map, cpo);
}
代码示例来源:origin: strimzi/strimzi-kafka-operator
@Override
public void increasePartitions(Topic topic, Handler<AsyncResult<Void>> handler) {
final NewPartitions newPartitions = NewPartitions.increaseTo(topic.getNumPartitions());
final Map<String, NewPartitions> request = Collections.singletonMap(topic.getTopicName().toString(), newPartitions);
KafkaFuture<Void> future = adminClient.createPartitions(request).values().get(topic.getTopicName().toString());
queueWork(new UniWork<>("increasePartitions", future, handler));
}
代码示例来源:origin: nbogojevic/kafka-operator
private void updateTopic(Topic topic) {
log.debug("Topic exists. name {}", topic.getName());
TopicWithParitions oldTopic = topic(topic.getName());
if (topic.getPartitions() > oldTopic.getPartitions()) {
adminClient.createPartitions(singletonMap(topic.getName(), NewPartitions.increaseTo(topic.getPartitions())));
changedTopics.inc();
log.info("Updated topic. name: {}, new partitions: {}", topic.getName(), topic.getPartitions());
} else if (topic.getPartitions() < oldTopic.getPartitions()) {
log.warn("Unable to reduce number of partitions. name: {}, requested partitions: {}, original partitions {}",
topic.getName(), topic.getPartitions(), oldTopic.getPartitions());
}
if (topic.getReplicationFactor() != 0 && topic.getReplicationFactor() != oldTopic.getReplicationFactor()) {
log.error("Replication factor change not supported. name: {}, requested replication-factor: {}, original replication-factor {}",
topic.getName(), topic.getReplicationFactor(), oldTopic.getReplicationFactor());
}
if (topic.getProperties() == null || !topic.getProperties().equals(oldTopic.getProperties())) {
log.info("Updating topic properties. name: {}, new properties: {}", topic.getName(), topic.getProperties());
ConfigResource cr = new ConfigResource(ConfigResource.Type.TOPIC, topic.getName());
List<ConfigEntry> entries = new ArrayList<>();
topic.getProperties().forEach((k, v) -> entries.add(new ConfigEntry(k.toString(), v.toString())));
adminClient.alterConfigs(singletonMap(cr, new Config(entries)));
changedTopics.inc();
} else {
log.debug("Topic properties are same. name: {}, new properties={}, old properties={}", topic.getName(), topic.getProperties(), oldTopic.getProperties());
}
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka
if (partitionSize < effectivePartitionCount) {
if (this.configurationProperties.isAutoAddPartitions()) {
CreatePartitionsResult partitions = adminClient.createPartitions(
Collections.singletonMap(topicName, NewPartitions.increaseTo(effectivePartitionCount)));
partitions.all().get(this.operationTimeout, TimeUnit.SECONDS);
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka-core
if (partitionSize < effectivePartitionCount) {
if (this.configurationProperties.isAutoAddPartitions()) {
CreatePartitionsResult partitions = adminClient.createPartitions(
Collections.singletonMap(topicName, NewPartitions.increaseTo(effectivePartitionCount)));
partitions.all().get(this.operationTimeout, TimeUnit.SECONDS);
内容来源于网络,如有侵权,请联系作者删除!