org.apache.kafka.clients.admin.AdminClient.create()方法的使用及代码示例

x33g5p2x  于2022-01-15 转载在 其他  
字(9.2k)|赞(0)|评价(0)|浏览(142)

本文整理了Java中org.apache.kafka.clients.admin.AdminClient.create()方法的一些代码示例,展示了AdminClient.create()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AdminClient.create()方法的具体详情如下:
包路径:org.apache.kafka.clients.admin.AdminClient
类名称:AdminClient
方法名:create

AdminClient.create介绍

[英]Create a new AdminClient with the given configuration.
[中]使用给定的配置创建新的AdminClient。

代码示例

代码示例来源:origin: openzipkin/brave

@Override public AdminClient getAdminClient(Map<String, Object> config) {
 return AdminClient.create(config);
}

代码示例来源:origin: confluentinc/ksql

/**
 * Create a Kafka topic with the given parameters.
 *
 * @param topic       The name of the topic.
 * @param partitions  The number of partitions for this topic.
 * @param replication The replication factor for (partitions of) this topic.
 * @param topicConfig Additional topic-level configuration settings.
 */
void createTopic(final String topic,
  final int partitions,
  final int replication,
  final Map<String, String> topicConfig) {
 log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
   topic, partitions, replication, topicConfig);
 final ImmutableMap<String, Object> props = ImmutableMap.of(
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList(),
   AdminClientConfig.RETRIES_CONFIG, 5);
 try (AdminClient adminClient = AdminClient.create(props)) {
  final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication);
  newTopic.configs(topicConfig);
  try {
   final CreateTopicsResult result = adminClient.createTopics(ImmutableList.of(newTopic));
   result.all().get();
  } catch (final Exception e) {
   throw new RuntimeException("Failed to create topic:" + topic, e);
  }
 }
}

代码示例来源:origin: linkedin/cruise-control

private AdminClient getAdminClient(String bootstrapServer) {
 Properties props = new Properties();
 props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
 return AdminClient.create(props);
}

代码示例来源:origin: apache/flink

@Override
public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties properties) {
  LOG.info("Creating topic {}", topic);
  try (AdminClient adminClient = AdminClient.create(getStandardProperties())) {
    NewTopic topicObj = new NewTopic(topic, numberOfPartitions, (short) replicationFactor);
    adminClient.createTopics(Collections.singleton(topicObj)).all().get();
  } catch (Exception e) {
    e.printStackTrace();
    fail("Create test topic : " + topic + " failed, " + e.getMessage());
  }
}

代码示例来源:origin: apache/flume

private AdminClient getAdminClient() {
 if (adminClient == null) {
  Properties adminClientProps = createAdminClientProperties();
  adminClient = AdminClient.create(adminClientProps);
 }
 return adminClient;
}

代码示例来源:origin: debezium/debezium

@Override
public void initializeStorage() {
  super.initializeStorage();
  try (AdminClient admin = AdminClient.create(this.producerConfig.asProperties())) {
    // Find default replication factor
    Config brokerConfig = getKafkaBrokerConfig(admin);
    final short replicationFactor = Short.parseShort(brokerConfig.get(DEFAULT_TOPIC_REPLICATION_FACTOR_PROP_NAME).value());
    // Create topic
    final NewTopic topic = new NewTopic(topicName, (short)1, replicationFactor);
    topic.configs(Collect.hashMapOf("cleanup.policy", "delete", "retention.ms", Long.toString(Long.MAX_VALUE), "retention.bytes", "-1"));
    admin.createTopics(Collections.singleton(topic));
    logger.info("Database history topic '{}' created", topic);
  }
  catch (Exception e) {
    throw new ConnectException("Creation of database history topic failed, please create the topic manually", e);
  }
}

代码示例来源:origin: apache/flink

@Override
public void deleteTestTopic(String topic) {
  LOG.info("Deleting topic {}", topic);
  try (AdminClient adminClient = AdminClient.create(getStandardProperties())) {
    tryDelete(adminClient, topic);
  } catch (Exception e) {
    e.printStackTrace();
    fail(String.format("Delete test topic : %s failed, %s", topic, e.getMessage()));
  }
}

代码示例来源:origin: apache/kafka

@Test
public void testAdminClientWithInvalidCredentials() {
  Map<String, Object> props = new HashMap<>(saslClientConfigs);
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
  try (AdminClient client = AdminClient.create(props)) {
    DescribeTopicsResult result = client.describeTopics(Collections.singleton("test"));
    result.all().get();
    fail("Expected an authentication error!");
  } catch (Exception e) {
    assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(),
        e.getCause() instanceof SaslAuthenticationException);
  }
}

代码示例来源:origin: apache/storm

public void setUp() throws Exception {
  // setup ZK
  zookeeper = new TestingServer(true);
  // setup Broker
  kafkaDir = new TmpPath(Files.createTempDirectory("kafka-").toAbsolutePath().toString());
  Properties brokerProps = new Properties();
  brokerProps.setProperty("zookeeper.connect", zookeeper.getConnectString());
  brokerProps.setProperty("broker.id", "0");
  brokerProps.setProperty("log.dirs", kafkaDir.getPath());
  brokerProps.setProperty("listeners", String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT));
  brokerProps.setProperty("offsets.topic.replication.factor", "1");
  KafkaConfig config = new KafkaConfig(brokerProps);
  MockTime mock = new MockTime();
  kafkaServer = TestUtils.createServer(config, mock);
  // setup default Producer
  createProducer();
  kafkaAdminClient = AdminClient.create(Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT));
}

代码示例来源:origin: apache/flink

@Override
public int getLeaderToShutDown(String topic) throws Exception {
  AdminClient client = AdminClient.create(getStandardProperties());
  TopicDescription result = client.describeTopics(Collections.singleton(topic)).all().get().get(topic);
  return result.partitions().get(0).leader().id();
}

代码示例来源:origin: spring-projects/spring-kafka

/**
 * Create an {@link AdminClient}; invoke the callback and reliably close the admin.
 * @param callback the callback.
 */
public void doWithAdmin(java.util.function.Consumer<AdminClient> callback) {
  Map<String, Object> adminConfigs = new HashMap<>();
  adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString());
  AdminClient admin = null;
  try {
    admin = AdminClient.create(adminConfigs);
    callback.accept(admin);
  }
  finally {
    if (admin != null) {
      admin.close(this.adminTimeout, TimeUnit.SECONDS);
    }
  }
}

代码示例来源:origin: spring-projects/spring-kafka

AdminClient adminClient = null;
try {
  adminClient = AdminClient.create(this.config);

代码示例来源:origin: org.apache.kafka/kafka

kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties);
validateNoActiveConsumers(groupId, kafkaAdminClient);

代码示例来源:origin: org.apache.kafka/kafka_2.12

kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties);
validateNoActiveConsumers(groupId, kafkaAdminClient);

代码示例来源:origin: org.apache.kafka/kafka_2.11

kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties);
validateNoActiveConsumers(groupId, kafkaAdminClient);

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void alreadyExists() throws Exception {
  AtomicReference<Method> addTopics = new AtomicReference<>();
  AtomicReference<Method> modifyTopics = new AtomicReference<>();
  ReflectionUtils.doWithMethods(KafkaAdmin.class, m -> {
    m.setAccessible(true);
    if (m.getName().equals("addTopics")) {
      addTopics.set(m);
    }
    else if (m.getName().equals("modifyTopics")) {
      modifyTopics.set(m);
    }
  }, m -> {
    return m.getName().endsWith("Topics");
  });
  try (AdminClient adminClient = AdminClient.create(this.admin.getConfig())) {
    addTopics.get().invoke(this.admin, adminClient, Collections.singletonList(this.topic1));
    modifyTopics.get().invoke(this.admin, adminClient, Collections.singletonMap(
        this.topic1.name(), NewPartitions.increaseTo(this.topic1.numPartitions())));
  }
}

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testAddTopics() throws Exception {
  AdminClient adminClient = AdminClient.create(this.admin.getConfig());
  DescribeTopicsResult topics = adminClient.describeTopics(Arrays.asList("foo", "bar"));
  topics.all().get();
  new DirectFieldAccessor(this.topic1).setPropertyValue("numPartitions", 2);
  new DirectFieldAccessor(this.topic2).setPropertyValue("numPartitions", 3);
  this.admin.initialize();
  topics = adminClient.describeTopics(Arrays.asList("foo", "bar"));
  Map<String, TopicDescription> results = topics.all().get();
  results.forEach((name, td) -> assertThat(td.partitions()).hasSize(name.equals("foo") ? 2 : 3));
  adminClient.close(10, TimeUnit.SECONDS);
}

代码示例来源:origin: allegro/hermes

private AdminClient brokerAdminClient(String bootstrapServers) {
  Properties props = new Properties();
  props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  return AdminClient.create(props);
}

代码示例来源:origin: com.opentable.components/otj-kafka

private AdminClient makeAdminClient(final String brokerList) {
  final Properties props = new Properties();
  props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
  return AdminClient.create(props);
}

代码示例来源:origin: sitewhere/sitewhere

@Override
public void start(ILifecycleProgressMonitor monitor) throws SiteWhereException {
getLogger().info(
  "Producer connecting to Kafka: " + getMicroservice().getInstanceSettings().getKafkaBootstrapServers());
getLogger().info("Will be producing messages for: " + getTargetTopicName());
this.producer = new KafkaProducer<String, byte[]>(buildConfiguration());
this.kafkaAdmin = AdminClient.create(buildAdminConfiguration());
waitForKafkaAvailable();
}

相关文章