本文整理了Java中org.springframework.kafka.test.utils.KafkaTestUtils
类的一些代码示例,展示了KafkaTestUtils
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KafkaTestUtils
类的具体详情如下:
包路径:org.springframework.kafka.test.utils.KafkaTestUtils
类名称:KafkaTestUtils
[英]Kafka testing utilities.
[中]卡夫卡测试实用程序。
代码示例来源:origin: spring-projects/spring-kafka
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param group the group id.
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka) {
return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit);
}
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testLateStartedConsumer() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(TEST_EMBEDDED, "false", this.broker);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
this.broker.consumeFromAnEmbeddedTopic(consumer, TEST_EMBEDDED);
Producer<String, Object> producer = new KafkaProducer<>(KafkaTestUtils.producerProps(this.broker));
producer.send(new ProducerRecord<>(TEST_EMBEDDED, "foo"));
producer.close();
KafkaTestUtils.getSingleRecord(consumer, TEST_EMBEDDED);
consumerProps = KafkaTestUtils.consumerProps("another" + TEST_EMBEDDED, "false", this.broker);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<Integer, String> consumer2 = new KafkaConsumer<>(consumerProps);
this.broker.consumeFromAnEmbeddedTopic(consumer2, TEST_EMBEDDED);
KafkaTestUtils.getSingleRecord(consumer2, TEST_EMBEDDED);
consumer.close();
consumer2.close();
}
代码示例来源:origin: spring-projects/spring-kafka
/**
* A typed version of {@link #getPropertyValue(Object, String)}.
* @param root the object.
* @param propertyPath the path.
* @param type the type to cast the object to.
* @param <T> the type.
* @return the field value.
* @see #getPropertyValue(Object, String)
*/
@SuppressWarnings("unchecked")
public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type) {
Object value = getPropertyValue(root, propertyPath);
if (value != null) {
Assert.isAssignable(type, value.getClass());
}
return (T) value;
}
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testJsonSerDeTypeMappings() throws Exception {
this.logger.info("Start JSON3");
Map<String, Object> props = KafkaTestUtils.consumerProps("testJson", "false", embeddedKafka);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "foo:" + Foo.class.getName() + ",bar:" + Bar.class.getName());
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void withListener() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
pf.destroy();
cpl.onError(records.get(0), new RuntimeException("x"));
代码示例来源:origin: spring-projects/spring-kafka
public void testAutoCommit() throws Exception {
this.logger.info("Start auto");
Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic1);
assertThat(container.getAssignedPartitions()).hasSize(2);
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
assertThat(threadName).contains("-C-");
List<KafkaMessageListenerContainer<Integer, String>> containers = KafkaTestUtils.getPropertyValue(container,
"containers", List.class);
assertThat(containers).hasSize(2);
for (int i = 0; i < 2; i++) {
assertThat(KafkaTestUtils.getPropertyValue(containers.get(i), "listenerConsumer.acks", Collection.class)
.size()).isEqualTo(0);
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testLocalTransaction() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(STRING_KEY_TOPIC);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testLocalTx", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
return null;
});
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
ConsumerRecord<String, String> record = iterator.next();
assertThat(record).has(Assertions.<ConsumerRecord<String, String>>allOf(key("foo"), value("bar")));
if (!iterator.hasNext()) {
records = KafkaTestUtils.getRecords(consumer);
iterator = records.iterator();
assertThat(record).has(Assertions.<ConsumerRecord<String, String>>allOf(key("baz"), value("qux")));
consumer.close();
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class).size()).isEqualTo(1);
pf.destroy();
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class).size()).isEqualTo(0);
代码示例来源:origin: spring-projects/spring-kafka
@Bean
public Map<String, Object> producerConfigs() {
return KafkaTestUtils.producerProps(embeddedKafka);
}
代码示例来源:origin: spring-projects/spring-kafka
container.setBeanName("testSeek" + topic);
container.start();
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.autoCommit", Boolean.class))
.isEqualTo(autoCommit);
Consumer<?, ?> consumer = spyOnConsumer(container);
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
代码示例来源:origin: spring-projects/spring-kafka
/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
* @param topic the topic.
* @param <K> the key type.
* @param <V> the value type.
* @return the record.
* @throws org.junit.ComparisonFailure if exactly one record is not received.
* @see #getSingleRecord(Consumer, String, long)
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) {
return getSingleRecord(consumer, topic, 60000); // NOSONAR magic #
}
代码示例来源:origin: spring-projects/spring-kafka
/**
* Poll the consumer for records.
* @param consumer the consumer.
* @param <K> the key type.
* @param <V> the value type.
* @return the records.
* @see #getRecords(Consumer, long)
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) {
return getRecords(consumer, 60000); // NOSONAR magic #
}
代码示例来源:origin: spring-projects/spring-kafka
/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param embeddedKafka a {@link org.springframework.kafka.test.rule.KafkaEmbedded} instance.
* @return the properties.
* @deprecated since 2.2 in favor of {@link #producerProps(EmbeddedKafkaBroker)}
*/
@SuppressWarnings("deprecation")
@Deprecated
public static Map<String, Object> producerProps(org.springframework.kafka.test.rule.KafkaEmbedded embeddedKafka) {
return senderProps(embeddedKafka.getBrokersAsString());
}
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testJsonSerDeConfiguredType() throws Exception {
this.logger.info("Start JSON1");
Map<String, Object> props = KafkaTestUtils.consumerProps("testJson", "false", embeddedKafka);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Foo.class);
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void withProducerRecordListener() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(INT_KEY_TOPIC);
final CountDownLatch latch = new CountDownLatch(1);
template.setProducerListener(new ProducerListener<Integer, String>() {
@Override
public void onSuccess(ProducerRecord<Integer, String> record, RecordMetadata recordMetadata) {
latch.countDown();
}
});
template.sendDefault("foo");
template.flush();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
//Drain the topic
KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
pf.destroy();
}
代码示例来源:origin: spring-projects/spring-kafka
@SuppressWarnings("unchecked")
@Test
public void testContainerTxProducerIsNotCached() throws Exception {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(this.embeddedKafka);
producerProps.put(ProducerConfig.RETRIES_CONFIG, 1);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerProps);
pfTx.setTransactionIdPrefix("fooTx.");
KafkaTemplate<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache2Group", "false", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
ListenableFuture<SendResult<Integer, String>> future = template.send("txCache2", "foo");
future.get();
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class)).hasSize(0);
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", BlockingQueue.class)).hasSize(0);
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testGlobalTransaction() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(STRING_KEY_TOPIC);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGlobalTx", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
return null;
});
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
ConsumerRecord<String, String> record = iterator.next();
assertThat(record).has(Assertions.<ConsumerRecord<String, String>>allOf(key("foo"), value("bar")));
if (!iterator.hasNext()) {
records = KafkaTestUtils.getRecords(consumer);
iterator = records.iterator();
assertThat(record).has(Assertions.<ConsumerRecord<String, String>>allOf(key("baz"), value("qux")));
consumer.close();
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class).size()).isEqualTo(1);
pf.destroy();
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class).size()).isEqualTo(0);
代码示例来源:origin: spring-projects/spring-kafka
@Bean
public DefaultKafkaProducerFactory<Integer, String> pf() {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
return new DefaultKafkaProducerFactory<>(producerProps);
}
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testMultiReplyTo() throws Exception {
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testMultiReplying");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromEmbeddedTopics(consumer, "annotated25reply1", "annotated25reply2");
template.send("annotated25", 0, 1, "foo");
template.flush();
ConsumerRecord<Integer, String> reply = KafkaTestUtils.getSingleRecord(consumer, "annotated25reply1");
assertThat(reply.value()).isEqualTo("FOO");
template.send("annotated25", 0, 1, null);
reply = KafkaTestUtils.getSingleRecord(consumer, "annotated25reply2");
assertThat(reply.value()).isEqualTo("BAR");
consumer.close();
}
代码示例来源:origin: spring-projects/spring-kafka
/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
* @param topic the topic.
* @param timeout max time in milliseconds to wait for records; forwarded to {@link Consumer#poll(long)}.
* @param <K> the key type.
* @param <V> the value type.
* @return the record.
* @throws org.junit.ComparisonFailure if exactly one record is not received.
* @since 2.0
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, long timeout) {
ConsumerRecords<K, V> received = getRecords(consumer, timeout);
assertThat(received.count()).as("Incorrect results returned", received.count()).isEqualTo(1);
return received.records(topic).iterator().next();
}
代码示例来源:origin: spring-projects/spring-kafka
/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) {
return senderProps(embeddedKafka.getBrokersAsString());
}
内容来源于网络,如有侵权,请联系作者删除!