本文整理了Java中org.apache.kafka.clients.admin.AdminClient.create()
方法的一些代码示例,展示了AdminClient.create()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AdminClient.create()
方法的具体详情如下:
包路径:org.apache.kafka.clients.admin.AdminClient
类名称:AdminClient
方法名:create
[英]Create a new AdminClient with the given configuration.
[中]使用给定的配置创建新的AdminClient。
代码示例来源:origin: openzipkin/brave
@Override public AdminClient getAdminClient(Map<String, Object> config) {
return AdminClient.create(config);
}
代码示例来源:origin: confluentinc/ksql
/**
* Create a Kafka topic with the given parameters.
*
* @param topic The name of the topic.
* @param partitions The number of partitions for this topic.
* @param replication The replication factor for (partitions of) this topic.
* @param topicConfig Additional topic-level configuration settings.
*/
void createTopic(final String topic,
final int partitions,
final int replication,
final Map<String, String> topicConfig) {
log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
topic, partitions, replication, topicConfig);
final ImmutableMap<String, Object> props = ImmutableMap.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList(),
AdminClientConfig.RETRIES_CONFIG, 5);
try (AdminClient adminClient = AdminClient.create(props)) {
final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication);
newTopic.configs(topicConfig);
try {
final CreateTopicsResult result = adminClient.createTopics(ImmutableList.of(newTopic));
result.all().get();
} catch (final Exception e) {
throw new RuntimeException("Failed to create topic:" + topic, e);
}
}
}
代码示例来源:origin: linkedin/cruise-control
private AdminClient getAdminClient(String bootstrapServer) {
Properties props = new Properties();
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
return AdminClient.create(props);
}
代码示例来源:origin: apache/flink
@Override
public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties properties) {
LOG.info("Creating topic {}", topic);
try (AdminClient adminClient = AdminClient.create(getStandardProperties())) {
NewTopic topicObj = new NewTopic(topic, numberOfPartitions, (short) replicationFactor);
adminClient.createTopics(Collections.singleton(topicObj)).all().get();
} catch (Exception e) {
e.printStackTrace();
fail("Create test topic : " + topic + " failed, " + e.getMessage());
}
}
代码示例来源:origin: apache/flume
private AdminClient getAdminClient() {
if (adminClient == null) {
Properties adminClientProps = createAdminClientProperties();
adminClient = AdminClient.create(adminClientProps);
}
return adminClient;
}
代码示例来源:origin: debezium/debezium
@Override
public void initializeStorage() {
super.initializeStorage();
try (AdminClient admin = AdminClient.create(this.producerConfig.asProperties())) {
// Find default replication factor
Config brokerConfig = getKafkaBrokerConfig(admin);
final short replicationFactor = Short.parseShort(brokerConfig.get(DEFAULT_TOPIC_REPLICATION_FACTOR_PROP_NAME).value());
// Create topic
final NewTopic topic = new NewTopic(topicName, (short)1, replicationFactor);
topic.configs(Collect.hashMapOf("cleanup.policy", "delete", "retention.ms", Long.toString(Long.MAX_VALUE), "retention.bytes", "-1"));
admin.createTopics(Collections.singleton(topic));
logger.info("Database history topic '{}' created", topic);
}
catch (Exception e) {
throw new ConnectException("Creation of database history topic failed, please create the topic manually", e);
}
}
代码示例来源:origin: apache/flink
@Override
public void deleteTestTopic(String topic) {
LOG.info("Deleting topic {}", topic);
try (AdminClient adminClient = AdminClient.create(getStandardProperties())) {
tryDelete(adminClient, topic);
} catch (Exception e) {
e.printStackTrace();
fail(String.format("Delete test topic : %s failed, %s", topic, e.getMessage()));
}
}
代码示例来源:origin: apache/kafka
@Test
public void testAdminClientWithInvalidCredentials() {
Map<String, Object> props = new HashMap<>(saslClientConfigs);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
try (AdminClient client = AdminClient.create(props)) {
DescribeTopicsResult result = client.describeTopics(Collections.singleton("test"));
result.all().get();
fail("Expected an authentication error!");
} catch (Exception e) {
assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(),
e.getCause() instanceof SaslAuthenticationException);
}
}
代码示例来源:origin: apache/storm
public void setUp() throws Exception {
// setup ZK
zookeeper = new TestingServer(true);
// setup Broker
kafkaDir = new TmpPath(Files.createTempDirectory("kafka-").toAbsolutePath().toString());
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zookeeper.getConnectString());
brokerProps.setProperty("broker.id", "0");
brokerProps.setProperty("log.dirs", kafkaDir.getPath());
brokerProps.setProperty("listeners", String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT));
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);
MockTime mock = new MockTime();
kafkaServer = TestUtils.createServer(config, mock);
// setup default Producer
createProducer();
kafkaAdminClient = AdminClient.create(Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT));
}
代码示例来源:origin: apache/flink
@Override
public int getLeaderToShutDown(String topic) throws Exception {
AdminClient client = AdminClient.create(getStandardProperties());
TopicDescription result = client.describeTopics(Collections.singleton(topic)).all().get().get(topic);
return result.partitions().get(0).leader().id();
}
代码示例来源:origin: spring-projects/spring-kafka
/**
* Create an {@link AdminClient}; invoke the callback and reliably close the admin.
* @param callback the callback.
*/
public void doWithAdmin(java.util.function.Consumer<AdminClient> callback) {
Map<String, Object> adminConfigs = new HashMap<>();
adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString());
AdminClient admin = null;
try {
admin = AdminClient.create(adminConfigs);
callback.accept(admin);
}
finally {
if (admin != null) {
admin.close(this.adminTimeout, TimeUnit.SECONDS);
}
}
}
代码示例来源:origin: spring-projects/spring-kafka
AdminClient adminClient = null;
try {
adminClient = AdminClient.create(this.config);
代码示例来源:origin: org.apache.kafka/kafka
kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties);
validateNoActiveConsumers(groupId, kafkaAdminClient);
代码示例来源:origin: org.apache.kafka/kafka_2.12
kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties);
validateNoActiveConsumers(groupId, kafkaAdminClient);
代码示例来源:origin: org.apache.kafka/kafka_2.11
kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties);
validateNoActiveConsumers(groupId, kafkaAdminClient);
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void alreadyExists() throws Exception {
AtomicReference<Method> addTopics = new AtomicReference<>();
AtomicReference<Method> modifyTopics = new AtomicReference<>();
ReflectionUtils.doWithMethods(KafkaAdmin.class, m -> {
m.setAccessible(true);
if (m.getName().equals("addTopics")) {
addTopics.set(m);
}
else if (m.getName().equals("modifyTopics")) {
modifyTopics.set(m);
}
}, m -> {
return m.getName().endsWith("Topics");
});
try (AdminClient adminClient = AdminClient.create(this.admin.getConfig())) {
addTopics.get().invoke(this.admin, adminClient, Collections.singletonList(this.topic1));
modifyTopics.get().invoke(this.admin, adminClient, Collections.singletonMap(
this.topic1.name(), NewPartitions.increaseTo(this.topic1.numPartitions())));
}
}
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testAddTopics() throws Exception {
AdminClient adminClient = AdminClient.create(this.admin.getConfig());
DescribeTopicsResult topics = adminClient.describeTopics(Arrays.asList("foo", "bar"));
topics.all().get();
new DirectFieldAccessor(this.topic1).setPropertyValue("numPartitions", 2);
new DirectFieldAccessor(this.topic2).setPropertyValue("numPartitions", 3);
this.admin.initialize();
topics = adminClient.describeTopics(Arrays.asList("foo", "bar"));
Map<String, TopicDescription> results = topics.all().get();
results.forEach((name, td) -> assertThat(td.partitions()).hasSize(name.equals("foo") ? 2 : 3));
adminClient.close(10, TimeUnit.SECONDS);
}
代码示例来源:origin: allegro/hermes
private AdminClient brokerAdminClient(String bootstrapServers) {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return AdminClient.create(props);
}
代码示例来源:origin: com.opentable.components/otj-kafka
private AdminClient makeAdminClient(final String brokerList) {
final Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
return AdminClient.create(props);
}
代码示例来源:origin: sitewhere/sitewhere
@Override
public void start(ILifecycleProgressMonitor monitor) throws SiteWhereException {
getLogger().info(
"Producer connecting to Kafka: " + getMicroservice().getInstanceSettings().getKafkaBootstrapServers());
getLogger().info("Will be producing messages for: " + getTargetTopicName());
this.producer = new KafkaProducer<String, byte[]>(buildConfiguration());
this.kafkaAdmin = AdminClient.create(buildAdminConfiguration());
waitForKafkaAvailable();
}
内容来源于网络,如有侵权,请联系作者删除!