org.apache.kafka.clients.admin.AdminClient.alterConfigs()方法的使用及代码示例

x33g5p2x  于2022-01-16 转载在 其他  
字(4.4k)|赞(0)|评价(0)|浏览(124)

本文整理了Java中org.apache.kafka.clients.admin.AdminClient.alterConfigs()方法的一些代码示例,展示了AdminClient.alterConfigs()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AdminClient.alterConfigs()方法的具体详情如下:
包路径:org.apache.kafka.clients.admin.AdminClient
类名称:AdminClient
方法名:alterConfigs

AdminClient.alterConfigs介绍

[英]Update the configuration for the specified resources with the default options. This is a convenience method for # AdminClient#alterConfigs(Map,AlterConfigsOptions) with default options. See the overload for more details. This operation is supported by brokers with version 0.11.0.0 or higher.
[中]使用默认选项更新指定资源的配置。对于带有默认选项的#AdminClient#alterConfigs(映射、AlterConfigsOptions),这是一种方便的方法。有关更多详细信息,请参阅重载。版本为0.11.0.0或更高版本的代理支持此操作。

代码示例

代码示例来源:origin: apache/kafka

/**
 * Update the configuration for the specified resources with the default options.
 *
 * This is a convenience method for #{@link AdminClient#alterConfigs(Map, AlterConfigsOptions)} with default options.
 * See the overload for more details.
 *
 * This operation is supported by brokers with version 0.11.0.0 or higher.
 *
 * @param configs         The resources with their configs (topic is the only resource type with configs that can
 *                        be updated currently)
 * @return                The AlterConfigsResult
 */
public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs) {
  return alterConfigs(configs, new AlterConfigsOptions());
}

代码示例来源:origin: strimzi/strimzi-kafka-operator

@Override
public void updateTopicConfig(Topic topic, Handler<AsyncResult<Void>> handler) {
  Map<ConfigResource, Config> configs = TopicSerialization.toTopicConfig(topic);
  KafkaFuture<Void> future = adminClient.alterConfigs(configs).values().get(configs.keySet().iterator().next());
  queueWork(new UniWork<>("updateTopicConfig", future, handler));
}

代码示例来源:origin: SourceLabOrg/kafka-webview

/**
 * Modify configuration values for a specific topic.
 * @param topic The topic to modify.
 * @param configItems Map of Key to Value to modify.
 * @return boolean
 */
public TopicConfig alterTopicConfig(final String topic, final Map<String, String> configItems) {
  try {
    // Define the resource we want to modify, the topic.
    final ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
    final List<ConfigEntry> configEntries = new ArrayList<>();
    for (final Map.Entry<String, String> entry : configItems.entrySet()) {
      configEntries.add(
        new ConfigEntry(entry.getKey(), entry.getValue())
      );
    }
    // Define the configuration set
    final Config config = new Config(configEntries);
    // Create the topic
    final AlterConfigsResult result = adminClient.alterConfigs(Collections.singletonMap(configResource, config));
    // Wait for the async request to process.
    result.all().get();
    // Lets return updated topic details
    return getTopicConfig(topic);
  } catch (final InterruptedException | ExecutionException exception) {
    // TODO Handle this
    throw new RuntimeException(exception.getMessage(), exception);
  }
}

代码示例来源:origin: nbogojevic/kafka-operator

private void updateTopic(Topic topic) {
 log.debug("Topic exists. name {}", topic.getName());
 TopicWithParitions oldTopic = topic(topic.getName());
 if (topic.getPartitions() > oldTopic.getPartitions()) {
  adminClient.createPartitions(singletonMap(topic.getName(), NewPartitions.increaseTo(topic.getPartitions())));
  changedTopics.inc();
  log.info("Updated topic. name: {}, new partitions: {}", topic.getName(), topic.getPartitions());
 } else if (topic.getPartitions() < oldTopic.getPartitions()) {
  log.warn("Unable to reduce number of partitions. name: {}, requested partitions: {}, original partitions {}",
       topic.getName(), topic.getPartitions(),  oldTopic.getPartitions());
 }
 if (topic.getReplicationFactor() != 0 && topic.getReplicationFactor() != oldTopic.getReplicationFactor()) {
  log.error("Replication factor change not supported. name: {}, requested replication-factor: {}, original replication-factor {}",
       topic.getName(), topic.getReplicationFactor(), oldTopic.getReplicationFactor());
 }
 if (topic.getProperties() == null || !topic.getProperties().equals(oldTopic.getProperties())) {
  log.info("Updating topic properties. name: {}, new properties: {}", topic.getName(), topic.getProperties());
  ConfigResource cr = new ConfigResource(ConfigResource.Type.TOPIC, topic.getName());
  List<ConfigEntry> entries = new ArrayList<>();
  topic.getProperties().forEach((k, v) -> entries.add(new ConfigEntry(k.toString(), v.toString())));
  adminClient.alterConfigs(singletonMap(cr, new Config(entries)));
  changedTopics.inc();
 } else {
  log.debug("Topic properties are same. name: {}, new properties={}, old properties={}", topic.getName(), topic.getProperties(), oldTopic.getProperties());      
 }
}

相关文章