org.apache.kafka.clients.admin.AdminClient.deleteTopics()方法的使用及代码示例

x33g5p2x  于2022-01-15 转载在 其他  
字(9.6k)|赞(0)|评价(0)|浏览(155)

本文整理了Java中org.apache.kafka.clients.admin.AdminClient.deleteTopics()方法的一些代码示例,展示了AdminClient.deleteTopics()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AdminClient.deleteTopics()方法的具体详情如下:
包路径:org.apache.kafka.clients.admin.AdminClient
类名称:AdminClient
方法名:deleteTopics

AdminClient.deleteTopics介绍

[英]This is a convenience method for # AdminClient#deleteTopics(Collection,DeleteTopicsOptions)with default options. See the overload for more details. This operation is supported by brokers with version 0.10.1.0 or higher.
[中]这是使用默认选项的#AdminClient#deleteTopics(集合、DeleteTopicsOptions)的一种方便方法。有关更多详细信息,请参阅重载。版本为0.10.1.0或更高版本的代理支持此操作。

代码示例

代码示例来源:origin: apache/kafka

/**
 * This is a convenience method for #{@link AdminClient#deleteTopics(Collection, DeleteTopicsOptions)}
 * with default options. See the overload for more details.
 *
 * This operation is supported by brokers with version 0.10.1.0 or higher.
 *
 * @param topics            The topic names to delete.
 * @return                  The DeleteTopicsResult.
 */
public DeleteTopicsResult deleteTopics(Collection<String> topics) {
  return deleteTopics(topics, new DeleteTopicsOptions());
}

代码示例来源:origin: apache/flume

public void deleteTopic(String topicName) {
 getAdminClient().deleteTopics(Collections.singletonList(topicName));
}

代码示例来源:origin: apache/flink

private void tryDelete(AdminClient adminClient, String topic)
    throws Exception {
  try {
    adminClient.deleteTopics(Collections.singleton(topic)).all().get(DELETE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
  } catch (TimeoutException e) {
    LOG.info("Did not receive delete topic response within %d seconds. Checking if it succeeded",
      DELETE_TIMEOUT_SECONDS);
    if (adminClient.listTopics().names().get(DELETE_TIMEOUT_SECONDS, TimeUnit.SECONDS).contains(topic)) {
      throw new Exception("Topic still exists after timeout");
    }
  }
}

代码示例来源:origin: apache/kafka

@Test
public void testDeleteTopics() throws Exception {
  try (AdminClientUnitTestEnv env = mockClientEnv()) {
    env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
    env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest,
        new DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.NONE)));
    KafkaFuture<Void> future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
        new DeleteTopicsOptions()).all();
    future.get();
    env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest,
        new DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.TOPIC_DELETION_DISABLED)));
    future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
        new DeleteTopicsOptions()).all();
    TestUtils.assertFutureError(future, TopicDeletionDisabledException.class);
    env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest,
        new DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.UNKNOWN_TOPIC_OR_PARTITION)));
    future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
        new DeleteTopicsOptions()).all();
    TestUtils.assertFutureError(future, UnknownTopicOrPartitionException.class);
  }
}

代码示例来源:origin: apache/kafka

@Test
public void testInvalidTopicNames() throws Exception {
  try (AdminClientUnitTestEnv env = mockClientEnv()) {
    env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
    List<String> sillyTopicNames = asList("", null);
    Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values();
    for (String sillyTopicName : sillyTopicNames) {
      TestUtils.assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
    }
    assertEquals(0, env.kafkaClient().inFlightRequestCount());
    Map<String, KafkaFuture<TopicDescription>> describeFutures =
        env.adminClient().describeTopics(sillyTopicNames).values();
    for (String sillyTopicName : sillyTopicNames) {
      TestUtils.assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
    }
    assertEquals(0, env.kafkaClient().inFlightRequestCount());
    List<NewTopic> newTopics = new ArrayList<>();
    for (String sillyTopicName : sillyTopicNames) {
      newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1));
    }
    Map<String, KafkaFuture<Void>> createFutures = env.adminClient().createTopics(newTopics).values();
    for (String sillyTopicName : sillyTopicNames) {
      TestUtils.assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
    }
    assertEquals(0, env.kafkaClient().inFlightRequestCount());
  }
}

代码示例来源:origin: org.apache.kafka/kafka_2.12

public void doDelete(final List<String> topicsToDelete,
           final AdminClient adminClient) {
  boolean hasDeleteErrors = false;
  final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
  final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
  for (final Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) {
    try {
      entry.getValue().get(30, TimeUnit.SECONDS);
    } catch (Exception e) {
      System.err.println("ERROR: deleting topic " + entry.getKey());
      e.printStackTrace(System.err);
      hasDeleteErrors = true;
    }
  }
  if (hasDeleteErrors) {
    throw new RuntimeException("Encountered an error deleting one or more topics");
  }
}

代码示例来源:origin: org.apache.kafka/kafka_2.11

public void doDelete(final List<String> topicsToDelete,
           final AdminClient adminClient) {
  boolean hasDeleteErrors = false;
  final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
  final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
  for (final Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) {
    try {
      entry.getValue().get(30, TimeUnit.SECONDS);
    } catch (Exception e) {
      System.err.println("ERROR: deleting topic " + entry.getKey());
      e.printStackTrace(System.err);
      hasDeleteErrors = true;
    }
  }
  if (hasDeleteErrors) {
    throw new RuntimeException("Encountered an error deleting one or more topics");
  }
}

代码示例来源:origin: org.apache.kafka/kafka

public void doDelete(final List<String> topicsToDelete,
           final AdminClient adminClient) {
  boolean hasDeleteErrors = false;
  final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
  final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
  for (final Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) {
    try {
      entry.getValue().get(30, TimeUnit.SECONDS);
    } catch (Exception e) {
      System.err.println("ERROR: deleting topic " + entry.getKey());
      e.printStackTrace(System.err);
      hasDeleteErrors = true;
    }
  }
  if (hasDeleteErrors) {
    throw new RuntimeException("Encountered an error deleting one or more topics");
  }
}

代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka_2.11

public void doDelete(final List<String> topicsToDelete,
           final AdminClient adminClient) {
  boolean hasDeleteErrors = false;
  final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
  final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
  for (final Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) {
    try {
      entry.getValue().get(30, TimeUnit.SECONDS);
    } catch (Exception e) {
      System.err.println("ERROR: deleting topic " + entry.getKey());
      e.printStackTrace(System.err);
      hasDeleteErrors = true;
    }
  }
  if (hasDeleteErrors) {
    throw new RuntimeException("Encountered an error deleting one or more topics");
  }
}

代码示例来源:origin: nbogojevic/kafka-operator

public void deleteTopic(String name) {
 if (topics().contains(name)) {
  log.warn("Deleting topic. name: {}", name);
  DeleteTopicsResult result = adminClient.deleteTopics(singleton(name));
  try {
   result.all().get();
   log.warn("Deleted topic. name: {}, result: {}", name, result);
   deletedTopics.inc(result.values().size());
  } catch (InterruptedException | ExecutionException  e) {
   log.error("Exception occured during topic deletion. name: {}", name, e);
  }
 }
}

代码示例来源:origin: strimzi/strimzi-kafka-operator

/**
 * Delete a topic via the Kafka AdminClient API, calling the given handler
 * (in a different thread) with the result.
 */
@Override
public void deleteTopic(TopicName topicName, Handler<AsyncResult<Void>> handler) {
  LOGGER.debug("Deleting topic {}", topicName);
  KafkaFuture<Void> future = adminClient.deleteTopics(
      Collections.singleton(topicName.toString())).values().get(topicName.toString());
  queueWork(new UniWork<>("deleteTopic", future, handler));
}

代码示例来源:origin: apache/samza

@Override
public boolean clearStream(StreamSpec streamSpec) {
 LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
 String topicName = streamSpec.getPhysicalName();
 try {
  DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(ImmutableSet.of(topicName));
  deleteTopicsResult.all().get(KAFKA_ADMIN_OPS_TIMEOUT_MS, TimeUnit.MILLISECONDS);
 } catch (Exception e) {
  LOG.error("Failed to delete topic {} with exception {}.", topicName, e);
  return false;
 }
 return true;
}

代码示例来源:origin: com.obsidiandynamics.jackdaw/jackdaw-core

final DeleteTopicsResult result = admin.deleteTopics(topics, 
                           new DeleteTopicsOptions().timeoutMs(timeoutMillis));
awaitFutures(timeoutMillis, result.values().values());

代码示例来源:origin: org.onap.dmaap.messagerouter.msgrtr/msgrtr

@Override
public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
  log.info("Deleting topic: " + topic);
  ZkClient zkClient = null;
  try {
    log.info("Loading zookeeper client for topic deletion.");
        // topic creation. (Otherwise, the topic is only partially created
    // in ZK.)
    
    
    fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
    log.info("Zookeeper client loaded successfully. Deleting topic.");
    
  } catch (Exception e) {
    log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
    throw new ConfigDbException(e);
  }  finally {
    log.info("Closing zookeeper connection.");
    if (zkClient != null)
      zkClient.close();
  }
  // throw new UnsupportedOperationException ( "We can't programmatically
  // delete Kafka topics yet." );
}

相关文章