本文整理了Java中org.apache.kafka.clients.admin.AdminClient.describeCluster()
方法的一些代码示例,展示了AdminClient.describeCluster()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AdminClient.describeCluster()
方法的具体详情如下:
包路径:org.apache.kafka.clients.admin.AdminClient
类名称:AdminClient
方法名:describeCluster
[英]Get information about the nodes in the cluster, using the default options. This is a convenience method for # AdminClient#describeCluster(DescribeClusterOptions) with default options. See the overload for more details.
[中]使用默认选项获取有关群集中节点的信息。这是带有默认选项的#AdminClient#describeCluster(DescribeClusterOptions)的一种方便方法。有关更多详细信息,请参阅重载。
代码示例来源:origin: apache/kafka
/**
* Get information about the nodes in the cluster, using the default options.
*
* This is a convenience method for #{@link AdminClient#describeCluster(DescribeClusterOptions)} with default options.
* See the overload for more details.
*
* @return The DescribeClusterResult.
*/
public DescribeClusterResult describeCluster() {
return describeCluster(new DescribeClusterOptions());
}
代码示例来源:origin: debezium/debezium
private Config getKafkaBrokerConfig(AdminClient admin) throws Exception {
final Collection<Node> nodes = admin.describeCluster().nodes().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
if (nodes.isEmpty()) {
throw new ConnectException("No brokers available to obtain default settings");
}
String nodeId = nodes.iterator().next().idString();
Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, nodeId));
final Map<ConfigResource, Config> configs = admin.describeConfigs(resources).all().get(
KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS
);
if (configs.isEmpty()) {
throw new ConnectException("No configs have been received");
}
return configs.values().iterator().next();
}
}
代码示例来源:origin: confluentinc/ksql
private static String getKafkaClusterId(final ServiceContext serviceContext) {
try {
return serviceContext.getAdminClient().describeCluster().clusterId().get();
} catch (final UnsupportedVersionException e) {
throw new KsqlException(
"The kafka brokers are incompatible with. "
+ "KSQL requires broker versions >= 0.10.1.x"
);
} catch (final Exception e) {
throw new KsqlException("Failed to get Kafka cluster information", e);
}
}
代码示例来源:origin: strimzi/strimzi-kafka-operator
/** Use the AdminClient to get a comma-separated list of the broker ids in the Kafka cluster */
private String brokerList() throws InterruptedException, ExecutionException {
StringBuilder sb = new StringBuilder();
for (Node node: adminClient.describeCluster().nodes().get()) {
if (sb.length() != 0) {
sb.append(",");
}
sb.append(node.id());
}
return sb.toString();
}
代码示例来源:origin: nbogojevic/kafka-operator
private int numberOfBrokers() {
try {
return adminClient.describeCluster().nodes().get().size();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException("Unable to get number of brokers.", e);
}
}
代码示例来源:origin: salesforce/kafka-junit
/**
* Describe nodes within Kafka cluster.
* @return Collection of nodes within the Kafka cluster.
*/
public List<Node> describeClusterNodes() {
// Create admin client
try (final AdminClient adminClient = getAdminClient()) {
final DescribeClusterResult describeClusterResult = adminClient.describeCluster();
return new ArrayList<>(describeClusterResult.nodes().get());
} catch (final InterruptedException | ExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
代码示例来源:origin: org.apache.kafka/connect-runtime
static String lookupKafkaClusterId(AdminClient adminClient) {
log.debug("Looking up Kafka cluster ID");
try {
KafkaFuture<String> clusterIdFuture = adminClient.describeCluster().clusterId();
if (clusterIdFuture == null) {
log.info("Kafka cluster version is too old to return cluster ID");
return null;
}
log.debug("Fetching Kafka cluster ID");
String kafkaClusterId = clusterIdFuture.get();
log.info("Kafka cluster ID: {}", kafkaClusterId);
return kafkaClusterId;
} catch (InterruptedException e) {
throw new ConnectException("Unexpectedly interrupted when looking up Kafka cluster info", e);
} catch (ExecutionException e) {
throw new ConnectException("Failed to connect to and describe Kafka cluster. "
+ "Check worker's broker connection and security properties.", e);
}
}
}
代码示例来源:origin: io.debezium/debezium-core
private Config getKafkaBrokerConfig(AdminClient admin) throws Exception {
final Collection<Node> nodes = admin.describeCluster().nodes().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
if (nodes.isEmpty()) {
throw new ConnectException("No brokers available to obtain default settings");
}
String nodeId = nodes.iterator().next().idString();
Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, nodeId));
final Map<ConfigResource, Config> configs = admin.describeConfigs(resources).all().get(
KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS
);
if (configs.isEmpty()) {
throw new ConnectException("No configs have been received");
}
return configs.values().iterator().next();
}
}
代码示例来源:origin: io.zipkin.zipkin2/zipkin-collector-kafka
@Override
public CheckResult check() {
try {
CheckResult failure = kafkaWorkers.failure.get(); // check the kafka workers didn't quit
if (failure != null) return failure;
KafkaFuture<String> maybeClusterId = getAdminClient().describeCluster().clusterId();
maybeClusterId.get(1, TimeUnit.SECONDS);
return CheckResult.OK;
} catch (Exception e) {
return CheckResult.failed(e);
}
}
代码示例来源:origin: com.opentable.components/otj-kafka
private void waitForCoordinator() throws InterruptedException {
retry("coordinator available", () -> {
admin.describeCluster().controller().get(10, TimeUnit.SECONDS);
});
}
代码示例来源:origin: io.github.jeqo.zipkin/zipkin-storage-kafka
@Override
public CheckResult check() {
try {
KafkaFuture<String> maybeClusterId = getAdminClient().describeCluster().clusterId();
maybeClusterId.get(1, TimeUnit.SECONDS);
return CheckResult.OK;
} catch (Exception e) {
return CheckResult.failed(e);
}
}
代码示例来源:origin: SourceLabOrg/kafka-webview
/**
* Get information about brokers within the cluster.
*/
public NodeList getClusterNodes() {
final List<NodeDetails> nodeDetails = new ArrayList<>();
try {
final Collection<Node> nodes = adminClient.describeCluster().nodes().get();
for (final Node node: nodes) {
nodeDetails.add(new NodeDetails(node.id(), node.host(), node.port(), node.rack()));
}
return new NodeList(nodeDetails);
} catch (InterruptedException | ExecutionException e) {
// TODO Handle
throw new RuntimeException(e.getMessage(), e);
}
}
代码示例来源:origin: io.zipkin.reporter2/zipkin-sender-kafka11
/** Ensures there are no problems reading metadata about the topic. */
@Override public CheckResult check() {
try {
KafkaFuture<String> maybeClusterId = getAdminClient().describeCluster().clusterId();
maybeClusterId.get(1, TimeUnit.SECONDS);
return CheckResult.OK;
} catch (Exception e) {
return CheckResult.failed(e);
}
}
代码示例来源:origin: variflight/feeyo-redisproxy
/**
* 获取kafka集群配置信息
*/
public Collection<Node> getClusterNodes() {
try {
DescribeClusterOptions dco = new DescribeClusterOptions();
dco.timeoutMs(5 * 1000);
DescribeClusterResult dcr = adminClient.describeCluster(dco);
return dcr.nodes().get();
} catch (Exception e) {
return null;
}
}
代码示例来源:origin: org.nuxeo.lib.stream/nuxeo-stream
@SuppressWarnings("TryFinallyCanBeTryWithResources")
public static boolean kafkaDetected() {
AdminClient client = AdminClient.create(getDefaultAdminProperties());
try {
client.describeCluster().nodes().get(5, TimeUnit.SECONDS);
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StreamRuntimeException(e);
} catch (ExecutionException e) {
throw new StreamRuntimeException(e);
} catch (TimeoutException e) {
return false;
} finally {
// cannot use try with resource because of timeout
client.close(0, TimeUnit.SECONDS);
}
}
代码示例来源:origin: amient/kafka-metrics
int controllerId = admin.describeCluster().controller().get(pollingIntervalSeconds, TimeUnit.SECONDS).id();
if (brokerId == controllerId) {
final Map<TopicPartition, Long> logEndOffsets = new HashMap<>();
代码示例来源:origin: rayokota/kafka-graphs
@Override
public Mono<Health> health() {
Health.Builder builder = new Health.Builder();
Properties properties = new Properties();
properties.put("bootstrap.servers", props.getBootstrapServers());
try (AdminClient adminClient = AdminClient.create(properties)) {
DescribeClusterResult result = adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(3000));
builder.withDetail("clusterId", result.clusterId().get());
builder.up();
} catch (Exception e) {
builder.down();
}
return Mono.just(builder.build());
}
}
代码示例来源:origin: gwenshap/kafka-streams-stockstats
DescribeClusterResult dcr = ac.describeCluster();
int clusterSize = dcr.nodes().get().size();
代码示例来源:origin: com.obsidiandynamics.jackdaw/jackdaw-core
/**
* Describes the cluster, blocking until all operations have completed or a timeout occurs.
*
* @param timeoutMillis The timeout to wait for.
* @return The resulting {@link DescribeClusterOutcome}.
* @throws ExecutionException If an unexpected error occurred.
* @throws InterruptedException If the thread was interrupted.
* @throws TimeoutException If a timeout occurred.
*/
public DescribeClusterOutcome describeCluster(int timeoutMillis) throws ExecutionException, TimeoutException, InterruptedException {
return runWithRetry(() -> {
final DescribeClusterResult result = admin.describeCluster(new DescribeClusterOptions().timeoutMs((int) timeoutMillis));
awaitFutures(timeoutMillis, result.nodes(), result.controller(), result.clusterId());
return new DescribeClusterOutcome(result.nodes().get(), result.controller().get(), result.clusterId().get());
});
}
内容来源于网络,如有侵权,请联系作者删除!