org.springframework.kafka.test.utils.KafkaTestUtils类的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(14.9k)|赞(0)|评价(0)|浏览(104)

本文整理了Java中org.springframework.kafka.test.utils.KafkaTestUtils类的一些代码示例,展示了KafkaTestUtils类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KafkaTestUtils类的具体详情如下:
包路径:org.springframework.kafka.test.utils.KafkaTestUtils
类名称:KafkaTestUtils

KafkaTestUtils介绍

[英]Kafka testing utilities.
[中]卡夫卡测试实用程序。

代码示例

代码示例来源:origin: spring-projects/spring-kafka

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka) {
  return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit);
}

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testLateStartedConsumer() throws Exception {
  Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(TEST_EMBEDDED, "false", this.broker);
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
  this.broker.consumeFromAnEmbeddedTopic(consumer, TEST_EMBEDDED);
  Producer<String, Object> producer = new KafkaProducer<>(KafkaTestUtils.producerProps(this.broker));
  producer.send(new ProducerRecord<>(TEST_EMBEDDED, "foo"));
  producer.close();
  KafkaTestUtils.getSingleRecord(consumer, TEST_EMBEDDED);
  consumerProps = KafkaTestUtils.consumerProps("another" + TEST_EMBEDDED, "false", this.broker);
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  Consumer<Integer, String> consumer2 = new KafkaConsumer<>(consumerProps);
  this.broker.consumeFromAnEmbeddedTopic(consumer2, TEST_EMBEDDED);
  KafkaTestUtils.getSingleRecord(consumer2, TEST_EMBEDDED);
  consumer.close();
  consumer2.close();
}

代码示例来源:origin: spring-projects/spring-kafka

/**
 * A typed version of {@link #getPropertyValue(Object, String)}.
 * @param root the object.
 * @param propertyPath the path.
 * @param type the type to cast the object to.
 * @param <T> the type.
 * @return the field value.
 * @see #getPropertyValue(Object, String)
 */
@SuppressWarnings("unchecked")
public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type) {
  Object value = getPropertyValue(root, propertyPath);
  if (value != null) {
    Assert.isAssignable(type, value.getClass());
  }
  return (T) value;
}

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testJsonSerDeTypeMappings() throws Exception {
  this.logger.info("Start JSON3");
  Map<String, Object> props = KafkaTestUtils.consumerProps("testJson", "false", embeddedKafka);
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
  props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
  Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
  senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
  senderProps.put(JsonSerializer.TYPE_MAPPINGS, "foo:" + Foo.class.getName() + ",bar:" + Bar.class.getName());

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void withListener() throws Exception {
  Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
  DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
  KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
  KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
  pf.destroy();
  cpl.onError(records.get(0), new RuntimeException("x"));

代码示例来源:origin: spring-projects/spring-kafka

public void testAutoCommit() throws Exception {
  this.logger.info("Start auto");
  Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka);
  DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
  ContainerProperties containerProps = new ContainerProperties(topic1);
  assertThat(container.getAssignedPartitions()).hasSize(2);
  Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
  ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
  KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
    assertThat(threadName).contains("-C-");
  List<KafkaMessageListenerContainer<Integer, String>> containers = KafkaTestUtils.getPropertyValue(container,
      "containers", List.class);
  assertThat(containers).hasSize(2);
  for (int i = 0; i < 2; i++) {
    assertThat(KafkaTestUtils.getPropertyValue(containers.get(i), "listenerConsumer.acks", Collection.class)
        .size()).isEqualTo(0);

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testLocalTransaction() throws Exception {
  Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
  senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
  DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
  KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
  template.setDefaultTopic(STRING_KEY_TOPIC);
  Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testLocalTx", "false", embeddedKafka);
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
    return null;
  });
  ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
  Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
  ConsumerRecord<String, String> record = iterator.next();
  assertThat(record).has(Assertions.<ConsumerRecord<String, String>>allOf(key("foo"), value("bar")));
  if (!iterator.hasNext()) {
    records = KafkaTestUtils.getRecords(consumer);
    iterator = records.iterator();
  assertThat(record).has(Assertions.<ConsumerRecord<String, String>>allOf(key("baz"), value("qux")));
  consumer.close();
  assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class).size()).isEqualTo(1);
  pf.destroy();
  assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class).size()).isEqualTo(0);

代码示例来源:origin: spring-projects/spring-kafka

@Bean
public Map<String, Object> producerConfigs() {
  return KafkaTestUtils.producerProps(embeddedKafka);
}

代码示例来源:origin: spring-projects/spring-kafka

container.setBeanName("testSeek" + topic);
container.start();
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.autoCommit", Boolean.class))
    .isEqualTo(autoCommit);
Consumer<?, ?> consumer = spyOnConsumer(container);
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);

代码示例来源:origin: spring-projects/spring-kafka

/**
 * Poll the consumer, expecting a single record for the specified topic.
 * @param consumer the consumer.
 * @param topic the topic.
 * @param <K> the key type.
 * @param <V> the value type.
 * @return the record.
 * @throws org.junit.ComparisonFailure if exactly one record is not received.
 * @see #getSingleRecord(Consumer, String, long)
 */
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) {
  return getSingleRecord(consumer, topic, 60000); // NOSONAR magic #
}

代码示例来源:origin: spring-projects/spring-kafka

/**
 * Poll the consumer for records.
 * @param consumer the consumer.
 * @param <K> the key type.
 * @param <V> the value type.
 * @return the records.
 * @see #getRecords(Consumer, long)
 */
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) {
  return getRecords(consumer, 60000); // NOSONAR magic #
}

代码示例来源:origin: spring-projects/spring-kafka

/**
 * Set up test properties for an {@code <Integer, String>} producer.
 * @param embeddedKafka a {@link org.springframework.kafka.test.rule.KafkaEmbedded} instance.
 * @return the properties.
 * @deprecated since 2.2 in favor of {@link #producerProps(EmbeddedKafkaBroker)}
 */
@SuppressWarnings("deprecation")
@Deprecated
public static Map<String, Object> producerProps(org.springframework.kafka.test.rule.KafkaEmbedded embeddedKafka) {
  return senderProps(embeddedKafka.getBrokersAsString());
}

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testJsonSerDeConfiguredType() throws Exception {
  this.logger.info("Start JSON1");
  Map<String, Object> props = KafkaTestUtils.consumerProps("testJson", "false", embeddedKafka);
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
  props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Foo.class);
  Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
  senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
  senderProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void withProducerRecordListener() throws Exception {
  Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
  DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
  KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
  template.setDefaultTopic(INT_KEY_TOPIC);
  final CountDownLatch latch = new CountDownLatch(1);
  template.setProducerListener(new ProducerListener<Integer, String>() {
    @Override
    public void onSuccess(ProducerRecord<Integer, String> record, RecordMetadata recordMetadata) {
      latch.countDown();
    }
  });
  template.sendDefault("foo");
  template.flush();
  assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
  //Drain the topic
  KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
  pf.destroy();
}

代码示例来源:origin: spring-projects/spring-kafka

@SuppressWarnings("unchecked")
@Test
public void testContainerTxProducerIsNotCached() throws Exception {
  Map<String, Object> producerProps = KafkaTestUtils.producerProps(this.embeddedKafka);
  producerProps.put(ProducerConfig.RETRIES_CONFIG, 1);
  DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerProps);
  pfTx.setTransactionIdPrefix("fooTx.");
  KafkaTemplate<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
  Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache2Group", "false", this.embeddedKafka);
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
    ListenableFuture<SendResult<Integer, String>> future = template.send("txCache2", "foo");
    future.get();
    assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class)).hasSize(0);
    assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
    assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", BlockingQueue.class)).hasSize(0);

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testGlobalTransaction() throws Exception {
  Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
  senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
  DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
  KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
  template.setDefaultTopic(STRING_KEY_TOPIC);
  Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGlobalTx", "false", embeddedKafka);
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
    return null;
  });
  ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
  Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
  ConsumerRecord<String, String> record = iterator.next();
  assertThat(record).has(Assertions.<ConsumerRecord<String, String>>allOf(key("foo"), value("bar")));
  if (!iterator.hasNext()) {
    records = KafkaTestUtils.getRecords(consumer);
    iterator = records.iterator();
  assertThat(record).has(Assertions.<ConsumerRecord<String, String>>allOf(key("baz"), value("qux")));
  consumer.close();
  assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class).size()).isEqualTo(1);
  pf.destroy();
  assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class).size()).isEqualTo(0);

代码示例来源:origin: spring-projects/spring-kafka

@Bean
public DefaultKafkaProducerFactory<Integer, String> pf() {
  Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
  return new DefaultKafkaProducerFactory<>(producerProps);
}

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testMultiReplyTo() throws Exception {
  Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
  consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testMultiReplying");
  ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
  Consumer<Integer, String> consumer = cf.createConsumer();
  embeddedKafka.consumeFromEmbeddedTopics(consumer, "annotated25reply1", "annotated25reply2");
  template.send("annotated25", 0, 1, "foo");
  template.flush();
  ConsumerRecord<Integer, String> reply = KafkaTestUtils.getSingleRecord(consumer, "annotated25reply1");
  assertThat(reply.value()).isEqualTo("FOO");
  template.send("annotated25", 0, 1, null);
  reply = KafkaTestUtils.getSingleRecord(consumer, "annotated25reply2");
  assertThat(reply.value()).isEqualTo("BAR");
  consumer.close();
}

代码示例来源:origin: spring-projects/spring-kafka

/**
 * Poll the consumer, expecting a single record for the specified topic.
 * @param consumer the consumer.
 * @param topic the topic.
 * @param timeout max time in milliseconds to wait for records; forwarded to {@link Consumer#poll(long)}.
 * @param <K> the key type.
 * @param <V> the value type.
 * @return the record.
 * @throws org.junit.ComparisonFailure if exactly one record is not received.
 * @since 2.0
 */
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, long timeout) {
  ConsumerRecords<K, V> received = getRecords(consumer, timeout);
  assertThat(received.count()).as("Incorrect results returned", received.count()).isEqualTo(1);
  return received.records(topic).iterator().next();
}

代码示例来源:origin: spring-projects/spring-kafka

/**
 * Set up test properties for an {@code <Integer, String>} producer.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) {
  return senderProps(embeddedKafka.getBrokersAsString());
}

相关文章