org.apache.kafka.clients.admin.AdminClient.describeCluster()方法的使用及代码示例

x33g5p2x  于2022-01-15 转载在 其他  
字(9.2k)|赞(0)|评价(0)|浏览(345)

本文整理了Java中org.apache.kafka.clients.admin.AdminClient.describeCluster()方法的一些代码示例,展示了AdminClient.describeCluster()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AdminClient.describeCluster()方法的具体详情如下:
包路径:org.apache.kafka.clients.admin.AdminClient
类名称:AdminClient
方法名:describeCluster

AdminClient.describeCluster介绍

[英]Get information about the nodes in the cluster, using the default options. This is a convenience method for # AdminClient#describeCluster(DescribeClusterOptions) with default options. See the overload for more details.
[中]使用默认选项获取有关群集中节点的信息。这是带有默认选项的#AdminClient#describeCluster(DescribeClusterOptions)的一种方便方法。有关更多详细信息,请参阅重载。

代码示例

代码示例来源:origin: apache/kafka

/**
 * Get information about the nodes in the cluster, using the default options.
 *
 * This is a convenience method for #{@link AdminClient#describeCluster(DescribeClusterOptions)} with default options.
 * See the overload for more details.
 *
 * @return                  The DescribeClusterResult.
 */
public DescribeClusterResult describeCluster() {
  return describeCluster(new DescribeClusterOptions());
}

代码示例来源:origin: debezium/debezium

private Config getKafkaBrokerConfig(AdminClient admin) throws Exception {
    final Collection<Node> nodes = admin.describeCluster().nodes().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
    if (nodes.isEmpty()) {
      throw new ConnectException("No brokers available to obtain default settings");
    }

    String nodeId = nodes.iterator().next().idString();
    Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, nodeId));
    final Map<ConfigResource, Config> configs = admin.describeConfigs(resources).all().get(
        KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS
    );

    if (configs.isEmpty()) {
      throw new ConnectException("No configs have been received");
    }

    return configs.values().iterator().next();
  }
}

代码示例来源:origin: confluentinc/ksql

private static String getKafkaClusterId(final ServiceContext serviceContext) {
 try {
  return serviceContext.getAdminClient().describeCluster().clusterId().get();
 } catch (final UnsupportedVersionException e) {
  throw new KsqlException(
    "The kafka brokers are incompatible with. "
    + "KSQL requires broker versions >= 0.10.1.x"
  );
 } catch (final Exception e) {
  throw new KsqlException("Failed to get Kafka cluster information", e);
 }
}

代码示例来源:origin: strimzi/strimzi-kafka-operator

/** Use the AdminClient to get a comma-separated list of the broker ids in the Kafka cluster */
private String brokerList() throws InterruptedException, ExecutionException {
  StringBuilder sb = new StringBuilder();
  for (Node node: adminClient.describeCluster().nodes().get()) {
    if (sb.length() != 0) {
      sb.append(",");
    }
    sb.append(node.id());
  }
  return sb.toString();
}

代码示例来源:origin: nbogojevic/kafka-operator

private int numberOfBrokers() {
 try {
  return adminClient.describeCluster().nodes().get().size();
 } catch (InterruptedException | ExecutionException e) {
  throw new IllegalStateException("Unable to get number of brokers.", e);
 }
}

代码示例来源:origin: salesforce/kafka-junit

/**
 * Describe nodes within Kafka cluster.
 * @return Collection of nodes within the Kafka cluster.
 */
public List<Node> describeClusterNodes() {
  // Create admin client
  try (final AdminClient adminClient = getAdminClient()) {
    final DescribeClusterResult describeClusterResult = adminClient.describeCluster();
    return new ArrayList<>(describeClusterResult.nodes().get());
  } catch (final InterruptedException | ExecutionException e) {
    throw new RuntimeException(e.getMessage(), e);
  }
}

代码示例来源:origin: org.apache.kafka/connect-runtime

static String lookupKafkaClusterId(AdminClient adminClient) {
    log.debug("Looking up Kafka cluster ID");
    try {
      KafkaFuture<String> clusterIdFuture = adminClient.describeCluster().clusterId();
      if (clusterIdFuture == null) {
        log.info("Kafka cluster version is too old to return cluster ID");
        return null;
      }
      log.debug("Fetching Kafka cluster ID");
      String kafkaClusterId = clusterIdFuture.get();
      log.info("Kafka cluster ID: {}", kafkaClusterId);
      return kafkaClusterId;
    } catch (InterruptedException e) {
      throw new ConnectException("Unexpectedly interrupted when looking up Kafka cluster info", e);
    } catch (ExecutionException e) {
      throw new ConnectException("Failed to connect to and describe Kafka cluster. "
                    + "Check worker's broker connection and security properties.", e);
    }
  }
}

代码示例来源:origin: io.debezium/debezium-core

private Config getKafkaBrokerConfig(AdminClient admin) throws Exception {
    final Collection<Node> nodes = admin.describeCluster().nodes().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
    if (nodes.isEmpty()) {
      throw new ConnectException("No brokers available to obtain default settings");
    }

    String nodeId = nodes.iterator().next().idString();
    Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, nodeId));
    final Map<ConfigResource, Config> configs = admin.describeConfigs(resources).all().get(
        KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS
    );

    if (configs.isEmpty()) {
      throw new ConnectException("No configs have been received");
    }

    return configs.values().iterator().next();
  }
}

代码示例来源:origin: io.zipkin.zipkin2/zipkin-collector-kafka

@Override
public CheckResult check() {
 try {
  CheckResult failure = kafkaWorkers.failure.get(); // check the kafka workers didn't quit
  if (failure != null) return failure;
  KafkaFuture<String> maybeClusterId = getAdminClient().describeCluster().clusterId();
  maybeClusterId.get(1, TimeUnit.SECONDS);
  return CheckResult.OK;
 } catch (Exception e) {
  return CheckResult.failed(e);
 }
}

代码示例来源:origin: com.opentable.components/otj-kafka

private void waitForCoordinator() throws InterruptedException {
  retry("coordinator available", () -> {
    admin.describeCluster().controller().get(10, TimeUnit.SECONDS);
  });
}

代码示例来源:origin: io.github.jeqo.zipkin/zipkin-storage-kafka

@Override
public CheckResult check() {
 try {
  KafkaFuture<String> maybeClusterId = getAdminClient().describeCluster().clusterId();
  maybeClusterId.get(1, TimeUnit.SECONDS);
  return CheckResult.OK;
 } catch (Exception e) {
  return CheckResult.failed(e);
 }
}

代码示例来源:origin: SourceLabOrg/kafka-webview

/**
 * Get information about brokers within the cluster.
 */
public NodeList getClusterNodes() {
  final List<NodeDetails> nodeDetails = new ArrayList<>();
  try {
    final Collection<Node> nodes = adminClient.describeCluster().nodes().get();
    for (final Node node: nodes) {
      nodeDetails.add(new NodeDetails(node.id(), node.host(), node.port(), node.rack()));
    }
    return new NodeList(nodeDetails);
  } catch (InterruptedException | ExecutionException e) {
    // TODO Handle
    throw new RuntimeException(e.getMessage(), e);
  }
}

代码示例来源:origin: io.zipkin.reporter2/zipkin-sender-kafka11

/** Ensures there are no problems reading metadata about the topic. */
@Override public CheckResult check() {
 try {
  KafkaFuture<String> maybeClusterId = getAdminClient().describeCluster().clusterId();
  maybeClusterId.get(1, TimeUnit.SECONDS);
  return CheckResult.OK;
 } catch (Exception e) {
  return CheckResult.failed(e);
 }
}

代码示例来源:origin: variflight/feeyo-redisproxy

/**
 * 获取kafka集群配置信息
 */
public Collection<Node> getClusterNodes() {
  try {
    DescribeClusterOptions dco = new DescribeClusterOptions();
    dco.timeoutMs(5 * 1000);
    DescribeClusterResult dcr = adminClient.describeCluster(dco);
    return dcr.nodes().get();
  } catch (Exception e) {
    return null;
  }
}

代码示例来源:origin: org.nuxeo.lib.stream/nuxeo-stream

@SuppressWarnings("TryFinallyCanBeTryWithResources")
public static boolean kafkaDetected() {
  AdminClient client = AdminClient.create(getDefaultAdminProperties());
  try {
    client.describeCluster().nodes().get(5, TimeUnit.SECONDS);
    return true;
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new StreamRuntimeException(e);
  } catch (ExecutionException e) {
    throw new StreamRuntimeException(e);
  } catch (TimeoutException e) {
    return false;
  } finally {
    // cannot use try with resource because of timeout
    client.close(0, TimeUnit.SECONDS);
  }
}

代码示例来源:origin: amient/kafka-metrics

int controllerId = admin.describeCluster().controller().get(pollingIntervalSeconds, TimeUnit.SECONDS).id();
if (brokerId == controllerId) {
  final Map<TopicPartition, Long> logEndOffsets = new HashMap<>();

代码示例来源:origin: rayokota/kafka-graphs

@Override
  public Mono<Health> health() {
    Health.Builder builder = new Health.Builder();
    Properties properties = new Properties();
    properties.put("bootstrap.servers", props.getBootstrapServers());
    try (AdminClient adminClient = AdminClient.create(properties)) {
      DescribeClusterResult result = adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(3000));
      builder.withDetail("clusterId", result.clusterId().get());
      builder.up();
    } catch (Exception e) {
      builder.down();
    }
    return Mono.just(builder.build());
  }
}

代码示例来源:origin: gwenshap/kafka-streams-stockstats

DescribeClusterResult dcr = ac.describeCluster();
int clusterSize = dcr.nodes().get().size();

代码示例来源:origin: com.obsidiandynamics.jackdaw/jackdaw-core

/**
 *  Describes the cluster, blocking until all operations have completed or a timeout occurs.
 *  
 *  @param timeoutMillis The timeout to wait for.
 *  @return The resulting {@link DescribeClusterOutcome}.
 *  @throws ExecutionException If an unexpected error occurred.
 *  @throws InterruptedException If the thread was interrupted.
 *  @throws TimeoutException If a timeout occurred.
 */
public DescribeClusterOutcome describeCluster(int timeoutMillis) throws ExecutionException, TimeoutException, InterruptedException {
 return runWithRetry(() -> {
  final DescribeClusterResult result = admin.describeCluster(new DescribeClusterOptions().timeoutMs((int) timeoutMillis));
  awaitFutures(timeoutMillis, result.nodes(), result.controller(), result.clusterId());
  return new DescribeClusterOutcome(result.nodes().get(), result.controller().get(), result.clusterId().get());
 });
}

相关文章