本文整理了Java中org.apache.kafka.clients.admin.AdminClient.deleteTopics()
方法的一些代码示例,展示了AdminClient.deleteTopics()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AdminClient.deleteTopics()
方法的具体详情如下:
包路径:org.apache.kafka.clients.admin.AdminClient
类名称:AdminClient
方法名:deleteTopics
[英]This is a convenience method for # AdminClient#deleteTopics(Collection,DeleteTopicsOptions)with default options. See the overload for more details. This operation is supported by brokers with version 0.10.1.0 or higher.
[中]这是使用默认选项的#AdminClient#deleteTopics(集合、DeleteTopicsOptions)的一种方便方法。有关更多详细信息,请参阅重载。版本为0.10.1.0或更高版本的代理支持此操作。
代码示例来源:origin: apache/kafka
/**
* This is a convenience method for #{@link AdminClient#deleteTopics(Collection, DeleteTopicsOptions)}
* with default options. See the overload for more details.
*
* This operation is supported by brokers with version 0.10.1.0 or higher.
*
* @param topics The topic names to delete.
* @return The DeleteTopicsResult.
*/
public DeleteTopicsResult deleteTopics(Collection<String> topics) {
return deleteTopics(topics, new DeleteTopicsOptions());
}
代码示例来源:origin: apache/flume
public void deleteTopic(String topicName) {
getAdminClient().deleteTopics(Collections.singletonList(topicName));
}
代码示例来源:origin: apache/flink
private void tryDelete(AdminClient adminClient, String topic)
throws Exception {
try {
adminClient.deleteTopics(Collections.singleton(topic)).all().get(DELETE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
LOG.info("Did not receive delete topic response within %d seconds. Checking if it succeeded",
DELETE_TIMEOUT_SECONDS);
if (adminClient.listTopics().names().get(DELETE_TIMEOUT_SECONDS, TimeUnit.SECONDS).contains(topic)) {
throw new Exception("Topic still exists after timeout");
}
}
}
代码示例来源:origin: apache/kafka
@Test
public void testDeleteTopics() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest,
new DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.NONE)));
KafkaFuture<Void> future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
new DeleteTopicsOptions()).all();
future.get();
env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest,
new DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.TOPIC_DELETION_DISABLED)));
future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
new DeleteTopicsOptions()).all();
TestUtils.assertFutureError(future, TopicDeletionDisabledException.class);
env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest,
new DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.UNKNOWN_TOPIC_OR_PARTITION)));
future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
new DeleteTopicsOptions()).all();
TestUtils.assertFutureError(future, UnknownTopicOrPartitionException.class);
}
}
代码示例来源:origin: apache/kafka
@Test
public void testInvalidTopicNames() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
List<String> sillyTopicNames = asList("", null);
Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values();
for (String sillyTopicName : sillyTopicNames) {
TestUtils.assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
}
assertEquals(0, env.kafkaClient().inFlightRequestCount());
Map<String, KafkaFuture<TopicDescription>> describeFutures =
env.adminClient().describeTopics(sillyTopicNames).values();
for (String sillyTopicName : sillyTopicNames) {
TestUtils.assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
}
assertEquals(0, env.kafkaClient().inFlightRequestCount());
List<NewTopic> newTopics = new ArrayList<>();
for (String sillyTopicName : sillyTopicNames) {
newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1));
}
Map<String, KafkaFuture<Void>> createFutures = env.adminClient().createTopics(newTopics).values();
for (String sillyTopicName : sillyTopicNames) {
TestUtils.assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
}
assertEquals(0, env.kafkaClient().inFlightRequestCount());
}
}
代码示例来源:origin: org.apache.kafka/kafka_2.12
public void doDelete(final List<String> topicsToDelete,
final AdminClient adminClient) {
boolean hasDeleteErrors = false;
final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
for (final Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) {
try {
entry.getValue().get(30, TimeUnit.SECONDS);
} catch (Exception e) {
System.err.println("ERROR: deleting topic " + entry.getKey());
e.printStackTrace(System.err);
hasDeleteErrors = true;
}
}
if (hasDeleteErrors) {
throw new RuntimeException("Encountered an error deleting one or more topics");
}
}
代码示例来源:origin: org.apache.kafka/kafka_2.11
public void doDelete(final List<String> topicsToDelete,
final AdminClient adminClient) {
boolean hasDeleteErrors = false;
final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
for (final Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) {
try {
entry.getValue().get(30, TimeUnit.SECONDS);
} catch (Exception e) {
System.err.println("ERROR: deleting topic " + entry.getKey());
e.printStackTrace(System.err);
hasDeleteErrors = true;
}
}
if (hasDeleteErrors) {
throw new RuntimeException("Encountered an error deleting one or more topics");
}
}
代码示例来源:origin: org.apache.kafka/kafka
public void doDelete(final List<String> topicsToDelete,
final AdminClient adminClient) {
boolean hasDeleteErrors = false;
final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
for (final Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) {
try {
entry.getValue().get(30, TimeUnit.SECONDS);
} catch (Exception e) {
System.err.println("ERROR: deleting topic " + entry.getKey());
e.printStackTrace(System.err);
hasDeleteErrors = true;
}
}
if (hasDeleteErrors) {
throw new RuntimeException("Encountered an error deleting one or more topics");
}
}
代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka_2.11
public void doDelete(final List<String> topicsToDelete,
final AdminClient adminClient) {
boolean hasDeleteErrors = false;
final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
for (final Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) {
try {
entry.getValue().get(30, TimeUnit.SECONDS);
} catch (Exception e) {
System.err.println("ERROR: deleting topic " + entry.getKey());
e.printStackTrace(System.err);
hasDeleteErrors = true;
}
}
if (hasDeleteErrors) {
throw new RuntimeException("Encountered an error deleting one or more topics");
}
}
代码示例来源:origin: nbogojevic/kafka-operator
public void deleteTopic(String name) {
if (topics().contains(name)) {
log.warn("Deleting topic. name: {}", name);
DeleteTopicsResult result = adminClient.deleteTopics(singleton(name));
try {
result.all().get();
log.warn("Deleted topic. name: {}, result: {}", name, result);
deletedTopics.inc(result.values().size());
} catch (InterruptedException | ExecutionException e) {
log.error("Exception occured during topic deletion. name: {}", name, e);
}
}
}
代码示例来源:origin: strimzi/strimzi-kafka-operator
/**
* Delete a topic via the Kafka AdminClient API, calling the given handler
* (in a different thread) with the result.
*/
@Override
public void deleteTopic(TopicName topicName, Handler<AsyncResult<Void>> handler) {
LOGGER.debug("Deleting topic {}", topicName);
KafkaFuture<Void> future = adminClient.deleteTopics(
Collections.singleton(topicName.toString())).values().get(topicName.toString());
queueWork(new UniWork<>("deleteTopic", future, handler));
}
代码示例来源:origin: apache/samza
@Override
public boolean clearStream(StreamSpec streamSpec) {
LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
String topicName = streamSpec.getPhysicalName();
try {
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(ImmutableSet.of(topicName));
deleteTopicsResult.all().get(KAFKA_ADMIN_OPS_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOG.error("Failed to delete topic {} with exception {}.", topicName, e);
return false;
}
return true;
}
代码示例来源:origin: com.obsidiandynamics.jackdaw/jackdaw-core
final DeleteTopicsResult result = admin.deleteTopics(topics,
new DeleteTopicsOptions().timeoutMs(timeoutMillis));
awaitFutures(timeoutMillis, result.values().values());
代码示例来源:origin: org.onap.dmaap.messagerouter.msgrtr/msgrtr
@Override
public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
log.info("Deleting topic: " + topic);
ZkClient zkClient = null;
try {
log.info("Loading zookeeper client for topic deletion.");
// topic creation. (Otherwise, the topic is only partially created
// in ZK.)
fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
log.info("Zookeeper client loaded successfully. Deleting topic.");
} catch (Exception e) {
log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
throw new ConfigDbException(e);
} finally {
log.info("Closing zookeeper connection.");
if (zkClient != null)
zkClient.close();
}
// throw new UnsupportedOperationException ( "We can't programmatically
// delete Kafka topics yet." );
}
内容来源于网络,如有侵权,请联系作者删除!