org.apache.kafka.clients.admin.AdminClient.createTopics()方法的使用及代码示例

x33g5p2x  于2022-01-15 转载在 其他  
字(15.4k)|赞(0)|评价(0)|浏览(508)

本文整理了Java中org.apache.kafka.clients.admin.AdminClient.createTopics()方法的一些代码示例,展示了AdminClient.createTopics()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AdminClient.createTopics()方法的具体详情如下:
包路径:org.apache.kafka.clients.admin.AdminClient
类名称:AdminClient
方法名:createTopics

AdminClient.createTopics介绍

[英]Create a batch of new topics with the default options. This is a convenience method for # #createTopics(Collection,CreateTopicsOptions) with default options. See the overload for more details. This operation is supported by brokers with version 0.10.1.0 or higher.
[中]使用默认选项创建一批新主题。这是###CreateTopicsOptions(集合,CreateTopicsOptions)使用默认选项的便捷方法。有关更多详细信息,请参阅重载。版本为0.10.1.0或更高版本的代理支持此操作。

代码示例

代码示例来源:origin: apache/kafka

/**
 * Create a batch of new topics with the default options.
 *
 * This is a convenience method for #{@link #createTopics(Collection, CreateTopicsOptions)} with default options.
 * See the overload for more details.
 *
 * This operation is supported by brokers with version 0.10.1.0 or higher.
 *
 * @param newTopics         The new topics to create.
 * @return                  The CreateTopicsResult.
 */
public CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
  return createTopics(newTopics, new CreateTopicsOptions());
}

代码示例来源:origin: confluentinc/ksql

/**
 * Create a Kafka topic with the given parameters.
 *
 * @param topic       The name of the topic.
 * @param partitions  The number of partitions for this topic.
 * @param replication The replication factor for (partitions of) this topic.
 * @param topicConfig Additional topic-level configuration settings.
 */
void createTopic(final String topic,
  final int partitions,
  final int replication,
  final Map<String, String> topicConfig) {
 log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
   topic, partitions, replication, topicConfig);
 final ImmutableMap<String, Object> props = ImmutableMap.of(
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList(),
   AdminClientConfig.RETRIES_CONFIG, 5);
 try (AdminClient adminClient = AdminClient.create(props)) {
  final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication);
  newTopic.configs(topicConfig);
  try {
   final CreateTopicsResult result = adminClient.createTopics(ImmutableList.of(newTopic));
   result.all().get();
  } catch (final Exception e) {
   throw new RuntimeException("Failed to create topic:" + topic, e);
  }
 }
}

代码示例来源:origin: apache/storm

public void createTopic(String topicName) throws Exception {
  kafkaAdminClient.createTopics(Collections.singleton(new NewTopic(topicName, 1, (short)1)))
    .all()
    .get(30, TimeUnit.SECONDS);
}

代码示例来源:origin: apache/flink

@Override
public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties properties) {
  LOG.info("Creating topic {}", topic);
  try (AdminClient adminClient = AdminClient.create(getStandardProperties())) {
    NewTopic topicObj = new NewTopic(topic, numberOfPartitions, (short) replicationFactor);
    adminClient.createTopics(Collections.singleton(topicObj)).all().get();
  } catch (Exception e) {
    e.printStackTrace();
    fail("Create test topic : " + topic + " failed, " + e.getMessage());
  }
}

代码示例来源:origin: debezium/debezium

@Override
public void initializeStorage() {
  super.initializeStorage();
  try (AdminClient admin = AdminClient.create(this.producerConfig.asProperties())) {
    // Find default replication factor
    Config brokerConfig = getKafkaBrokerConfig(admin);
    final short replicationFactor = Short.parseShort(brokerConfig.get(DEFAULT_TOPIC_REPLICATION_FACTOR_PROP_NAME).value());
    // Create topic
    final NewTopic topic = new NewTopic(topicName, (short)1, replicationFactor);
    topic.configs(Collect.hashMapOf("cleanup.policy", "delete", "retention.ms", Long.toString(Long.MAX_VALUE), "retention.bytes", "-1"));
    admin.createTopics(Collections.singleton(topic));
    logger.info("Database history topic '{}' created", topic);
  }
  catch (Exception e) {
    throw new ConnectException("Creation of database history topic failed, please create the topic manually", e);
  }
}

代码示例来源:origin: apache/flume

public void createTopics(List<String> topicNames, int numPartitions) {
 List<NewTopic> newTopics = new ArrayList<>();
 for (String topicName: topicNames) {
  NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
  newTopics.add(newTopic);
 }
 getAdminClient().createTopics(newTopics);
 //the following lines are a bit of black magic to ensure the topic is ready when we return
 DescribeTopicsResult dtr = getAdminClient().describeTopics(topicNames);
 try {
  dtr.all().get(10, TimeUnit.SECONDS);
 } catch (Exception e) {
  throw new RuntimeException("Error getting topic info", e);
 }
}
public void deleteTopic(String topicName) {

代码示例来源:origin: apache/kafka

@Test
public void testCreateTopics() throws Exception {
  try (AdminClientUnitTestEnv env = mockClientEnv()) {
    env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
    env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
        new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
    KafkaFuture<Void> future = env.adminClient().createTopics(
        Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
        new CreateTopicsOptions().timeoutMs(10000)).all();
    future.get();
  }
}

代码示例来源:origin: apache/kafka

/**
 * Test that the client properly times out when we don't receive any metadata.
 */
@Test
public void testTimeoutWithoutMetadata() throws Exception {
  try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, mockBootstrapCluster(),
      newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
    env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
    env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
    KafkaFuture<Void> future = env.adminClient().createTopics(
        Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
        new CreateTopicsOptions().timeoutMs(1000)).all();
    TestUtils.assertFutureError(future, TimeoutException.class);
  }
}

代码示例来源:origin: apache/kafka

}, new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
KafkaFuture<Void> future = env.adminClient().createTopics(
    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
    new CreateTopicsOptions().timeoutMs(10000)).all();

代码示例来源:origin: apache/kafka

@Test
public void testUnreachableBootstrapServer() throws Exception {
  // This tests the scenario in which the bootstrap server is unreachable for a short while,
  // which prevents AdminClient from being able to send the initial metadata request
  Cluster cluster = Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 8121)));
  try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster)) {
    Cluster discoveredCluster = mockCluster(0);
    env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
    env.kafkaClient().setUnreachable(cluster.nodes().get(0), 200);
    env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest,
        new  MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
            1, Collections.emptyList()));
    env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
        new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
    KafkaFuture<Void> future = env.adminClient().createTopics(
        Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
        new CreateTopicsOptions().timeoutMs(10000)).all();
    future.get();
  }
}

代码示例来源:origin: apache/kafka

/**
 * Test that we propagate exceptions encountered when fetching metadata.
 */
@Test
public void testPropagatedMetadataFetchException() throws Exception {
  Cluster cluster = mockCluster(0);
  try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster,
      newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121",
        AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
    env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
    env.kafkaClient().createPendingAuthenticationError(cluster.nodeById(0),
        TimeUnit.DAYS.toMillis(1));
    env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
    KafkaFuture<Void> future = env.adminClient().createTopics(
      Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
      new CreateTopicsOptions().timeoutMs(1000)).all();
    TestUtils.assertFutureError(future, SaslAuthenticationException.class);
  }
}

代码示例来源:origin: apache/kafka

private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException {
  try {
    env.adminClient().createTopics(
        Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
        new CreateTopicsOptions().timeoutMs(10000)).all().get();

代码示例来源:origin: linkedin/cruise-control

private Map<String, TopicDescription> createTopics() throws InterruptedException {
 AdminClient adminClient = getAdminClient(broker(0).getPlaintextAddr());
 adminClient.createTopics(Arrays.asList(new NewTopic(TOPIC0, 1, (short) 1),
                     new NewTopic(TOPIC1, 1, (short) 2)));
 // We need to use the admin clients to query the metadata from two different brokers to make sure that
 // both brokers have the latest metadata. Otherwise the Executor may get confused when it does not
 // see expected topics in the metadata.
 Map<String, TopicDescription> topicDescriptions0 = null;
 Map<String, TopicDescription> topicDescriptions1 = null;
 do {
  try (AdminClient adminClient0 = getAdminClient(broker(0).getPlaintextAddr());
    AdminClient adminClient1 = getAdminClient(broker(1).getPlaintextAddr())) {
   topicDescriptions0 = adminClient0.describeTopics(Arrays.asList(TOPIC0, TOPIC1)).all().get();
   topicDescriptions1 = adminClient1.describeTopics(Arrays.asList(TOPIC0, TOPIC1)).all().get();
   try {
    Thread.sleep(100);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  } catch (ExecutionException ee) {
   // Let it go.
  }
 } while (topicDescriptions0 == null || topicDescriptions0.size() < 2
   || topicDescriptions1 == null || topicDescriptions1.size() < 2);
 return topicDescriptions0;
}

代码示例来源:origin: apache/kafka

@Test
public void testConnectionFailureOnMetadataUpdate() throws Exception {
  // This tests the scenario in which we successfully connect to the bootstrap server, but
  // the server disconnects before sending the full response
  Cluster cluster = mockBootstrapCluster();
  try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster)) {
    Cluster discoveredCluster = mockCluster(0);
    env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
    env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest, null, true);
    env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest,
        new MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
            1, Collections.emptyList()));
    env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
        new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
    KafkaFuture<Void> future = env.adminClient().createTopics(
        Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
        new CreateTopicsOptions().timeoutMs(10000)).all();
    future.get();
  }
}

代码示例来源:origin: apache/kafka

@Test
public void testInvalidTopicNames() throws Exception {
  try (AdminClientUnitTestEnv env = mockClientEnv()) {
    env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
    List<String> sillyTopicNames = asList("", null);
    Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values();
    for (String sillyTopicName : sillyTopicNames) {
      TestUtils.assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
    }
    assertEquals(0, env.kafkaClient().inFlightRequestCount());
    Map<String, KafkaFuture<TopicDescription>> describeFutures =
        env.adminClient().describeTopics(sillyTopicNames).values();
    for (String sillyTopicName : sillyTopicNames) {
      TestUtils.assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
    }
    assertEquals(0, env.kafkaClient().inFlightRequestCount());
    List<NewTopic> newTopics = new ArrayList<>();
    for (String sillyTopicName : sillyTopicNames) {
      newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1));
    }
    Map<String, KafkaFuture<Void>> createFutures = env.adminClient().createTopics(newTopics).values();
    for (String sillyTopicName : sillyTopicNames) {
      TestUtils.assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
    }
    assertEquals(0, env.kafkaClient().inFlightRequestCount());
  }
}

代码示例来源:origin: apache/kafka

@Test
public void testCreateTopicsHandleNotControllerException() throws Exception {
  try (AdminClientUnitTestEnv env = mockClientEnv()) {
    env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
    env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
      Collections.singletonMap("myTopic", new ApiError(Errors.NOT_CONTROLLER, ""))),
      env.cluster().nodeById(0));
    env.kafkaClient().prepareResponse(new MetadataResponse(env.cluster().nodes(),
      env.cluster().clusterResource().clusterId(),
      1,
      Collections.<MetadataResponse.TopicMetadata>emptyList()));
    env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
        Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))),
      env.cluster().nodeById(1));
    KafkaFuture<Void> future = env.adminClient().createTopics(
        Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
        new CreateTopicsOptions().timeoutMs(10000)).all();
    future.get();
  }
}

代码示例来源:origin: spring-projects/spring-kafka

private void addTopics(AdminClient adminClient, List<NewTopic> topicsToAdd) {
  CreateTopicsResult topicResults = adminClient.createTopics(topicsToAdd);
  try {
    topicResults.all().get(this.operationTimeout, TimeUnit.SECONDS);
  }
  catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    logger.error("Interrupted while waiting for topic creation results", e);
  }
  catch (TimeoutException e) {
    throw new KafkaException("Timed out waiting for create topics results", e);
  }
  catch (ExecutionException e) {
    if (e.getCause() instanceof TopicExistsException) { // Possible race with another app instance
      logger.debug("Failed to create topics", e.getCause());
    }
    else {
      logger.error("Failed to create topics", e.getCause());
      throw new KafkaException("Failed to create topics", e.getCause()); // NOSONAR
    }
  }
}

代码示例来源:origin: spring-projects/spring-kafka

private void createTopics(AdminClient admin, List<NewTopic> newTopics) {
  CreateTopicsResult createTopics = admin.createTopics(newTopics);
  try {
    createTopics.all().get(this.adminTimeout, TimeUnit.SECONDS);
  }
  catch (Exception e) {
    throw new KafkaException(e);
  }
}

代码示例来源:origin: charithe/kafka-junit

public void createTopics(String... topics) throws ExecutionException, InterruptedException {
  Map<String, Object> adminConfigs = new HashMap<>();
  adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, connectionString());
  try(AdminClient admin = AdminClient.create(adminConfigs)) {
    List<NewTopic> newTopics = Stream.of(topics)
                     .map(t -> new NewTopic(t, numBroker, (short) numBroker))
                     .collect(Collectors.toList());
    CreateTopicsResult createTopics = admin.createTopics(newTopics);
    createTopics.all().get();
  }
}

代码示例来源:origin: strimzi/strimzi-kafka-operator

/**
 * Create a new topic via the Kafka AdminClient API, calling the given handler
 * (in a different thread) with the result.
 */
@Override
public void createTopic(Topic topic, Handler<AsyncResult<Void>> handler) {
  NewTopic newTopic = TopicSerialization.toNewTopic(topic, null);
  LOGGER.debug("Creating topic {}", newTopic);
  KafkaFuture<Void> future = adminClient.createTopics(
      Collections.singleton(newTopic)).values().get(newTopic.name());
  queueWork(new UniWork<>("createTopic", future, handler));
}

相关文章