org.apache.kafka.clients.admin.AdminClient.close()方法的使用及代码示例

x33g5p2x  于2022-01-15 转载在 其他  
字(6.1k)|赞(0)|评价(0)|浏览(198)

本文整理了Java中org.apache.kafka.clients.admin.AdminClient.close()方法的一些代码示例,展示了AdminClient.close()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AdminClient.close()方法的具体详情如下:
包路径:org.apache.kafka.clients.admin.AdminClient
类名称:AdminClient
方法名:close

AdminClient.close介绍

[英]Close the AdminClient and release all associated resources. See AdminClient#close(long,TimeUnit)
[中]关闭AdminClient并释放所有相关资源。请参阅AdminClient#close(长时间,时间单位)

代码示例

代码示例来源:origin: apache/kafka

/**
 * Close the AdminClient and release all associated resources.
 *
 * See {@link AdminClient#close(long, TimeUnit)}
 */
@Override
public void close() {
  close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}

代码示例来源:origin: apache/kafka

/**
 * Close the AdminClient and release all associated resources.
 *
 * The close operation has a grace period during which current operations will be allowed to
 * complete, specified by the given duration and time unit.
 * New operations will not be accepted during the grace period.  Once the grace period is over,
 * all operations that have not yet been completed will be aborted with a TimeoutException.
 *
 * @param duration  The duration to use for the wait time.
 * @param unit      The time unit to use for the wait time.
 * @deprecated Since 2.2. Use {@link #close(Duration)} or {@link #close()}.
 */
@Deprecated
public void close(long duration, TimeUnit unit) {
  close(Duration.ofMillis(unit.toMillis(duration)));
}

代码示例来源:origin: apache/flume

public void tearDown() {
 logger.info("Shutting down the Kafka Consumer.");
 if (consumer != null) {
  consumer.close();
 }
 if (adminClient != null) {
  adminClient.close();
  adminClient = null;
 }
 try {
  Thread.sleep(3 * 1000);   // add this sleep time to
  // ensure that the server is fully started before proceeding with tests.
 } catch (InterruptedException e) {
  // ignore
 }
 if (kafkaServer != null) {
  logger.info("Shutting down the kafka Server.");
  kafkaServer.stop();
 }
 logger.info("Completed the tearDown phase.");
}

代码示例来源:origin: apache/storm

public void tearDown() throws Exception {
  kafkaAdminClient.close();
  closeProducer();
  kafkaServer.shutdown();
  kafkaDir.close();
  zookeeper.close();
}

代码示例来源:origin: spring-projects/spring-kafka

/**
 * Create an {@link AdminClient}; invoke the callback and reliably close the admin.
 * @param callback the callback.
 */
public void doWithAdmin(java.util.function.Consumer<AdminClient> callback) {
  Map<String, Object> adminConfigs = new HashMap<>();
  adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString());
  AdminClient admin = null;
  try {
    admin = AdminClient.create(adminConfigs);
    callback.accept(admin);
  }
  finally {
    if (admin != null) {
      admin.close(this.adminTimeout, TimeUnit.SECONDS);
    }
  }
}

代码示例来源:origin: spring-projects/spring-kafka

adminClient.close(this.closeTimeout, TimeUnit.SECONDS);

代码示例来源:origin: org.apache.kafka/connect-runtime

@Override
public void close() {
  admin.close();
}

代码示例来源:origin: com.obsidiandynamics.jackdaw/jackdaw-core

public void close(long duration, TimeUnit unit) {
 admin.close(duration, unit);
}

代码示例来源:origin: variflight/feeyo-redisproxy

public void close() {
    adminClient.close();
  }
}

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testAddTopics() throws Exception {
  AdminClient adminClient = AdminClient.create(this.admin.getConfig());
  DescribeTopicsResult topics = adminClient.describeTopics(Arrays.asList("foo", "bar"));
  topics.all().get();
  new DirectFieldAccessor(this.topic1).setPropertyValue("numPartitions", 2);
  new DirectFieldAccessor(this.topic2).setPropertyValue("numPartitions", 3);
  this.admin.initialize();
  topics = adminClient.describeTopics(Arrays.asList("foo", "bar"));
  Map<String, TopicDescription> results = topics.all().get();
  results.forEach((name, td) -> assertThat(td.partitions()).hasSize(name.equals("foo") ? 2 : 3));
  adminClient.close(10, TimeUnit.SECONDS);
}

代码示例来源:origin: org.nuxeo.lib.stream/nuxeo-stream

@Override
public void close() {
  adminClient.close(ADMIN_CLIENT_CLOSE_TIMEOUT_S, TimeUnit.SECONDS);
  log.debug("Closed.");
}

代码示例来源:origin: io.zipkin.reporter2/zipkin-sender-kafka11

@Override public synchronized void close() {
 if (closeCalled) return;
 KafkaProducer<byte[], byte[]>  producer = this.producer;
 if (producer != null) producer.close();
 AdminClient adminClient = this.adminClient;
 if (adminClient != null) adminClient.close(1, TimeUnit.SECONDS);
 closeCalled = true;
}

代码示例来源:origin: SourceLabOrg/kafka-webview

/**
   * Close out the Client.
   */
  public void close() {
    adminClient.close();
    consumerClient.close();
  }
}

代码示例来源:origin: com.opentable.components/otj-kafka

@Override
public void close() {
  try {
    consumer.close();
  } finally {
    adminClient.close();
  }
}

代码示例来源:origin: io.zipkin.zipkin2/zipkin-collector-kafka

@Override
public void close() {
 kafkaWorkers.close();
 if (adminClient != null) adminClient.close(1, TimeUnit.SECONDS);
}

代码示例来源:origin: amient/kafka-metrics

@Override
public void shutdown() {
  try {
    super.shutdown();
  } finally {
    admin.close();
  }
}

代码示例来源:origin: org.springframework.kafka/spring-kafka-test

/**
 * Create an {@link AdminClient}; invoke the callback and reliably close the admin.
 * @param callback the callback.
 */
public void doWithAdmin(java.util.function.Consumer<AdminClient> callback) {
  Map<String, Object> adminConfigs = new HashMap<>();
  adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString());
  AdminClient admin = null;
  try {
    admin = AdminClient.create(adminConfigs);
    callback.accept(admin);
  }
  finally {
    if (admin != null) {
      admin.close(this.adminTimeout, TimeUnit.SECONDS);
    }
  }
}

代码示例来源:origin: openmessaging/openmessaging-benchmark

@Override
public void close() throws Exception {
  producer.close();
  for (BenchmarkConsumer consumer : consumers) {
    consumer.close();
  }
  admin.close();
}

代码示例来源:origin: io.github.jeqo.zipkin/zipkin-storage-kafka

void doClose() {
 try {
  if (adminClient != null) adminClient.close(1, TimeUnit.SECONDS);
  if (producer != null) {
   producer.flush();
   producer.close(1, TimeUnit.SECONDS);
  }
  if (processStreams != null) processStreams.close(Duration.ofSeconds(1));
  if (indexStreams != null) indexStreams.close(Duration.ofSeconds(1));
 } catch (Exception | Error e) {
  LOG.warn("error closing client {}", e.getMessage(), e);
 }
}

代码示例来源:origin: rayokota/kafka-graphs

public static void createTopic(String topic, int numPartitions, short replicationFactor, Properties props) {
    NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor);
    AdminClient adminClient = AdminClient.create(props);
    adminClient.createTopics(Collections.singletonList(newTopic));
    adminClient.close();
  }
}

相关文章