org.springframework.kafka.test.utils.KafkaTestUtils.consumerProps()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(11.0k)|赞(0)|评价(0)|浏览(88)

本文整理了Java中org.springframework.kafka.test.utils.KafkaTestUtils.consumerProps()方法的一些代码示例,展示了KafkaTestUtils.consumerProps()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KafkaTestUtils.consumerProps()方法的具体详情如下:
包路径:org.springframework.kafka.test.utils.KafkaTestUtils
类名称:KafkaTestUtils
方法名:consumerProps

KafkaTestUtils.consumerProps介绍

[英]Set up test properties for an consumer.
[中]为使用者设置测试属性。

代码示例

代码示例来源:origin: spring-projects/spring-kafka

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka) {
  return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit);
}

代码示例来源:origin: spring-projects/spring-kafka

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @param embeddedKafka a {@link org.springframework.kafka.test.rule.KafkaEmbedded} instance.
 * @return the properties.
 * @deprecated since 2.2 in favor of {@link #consumerProps(String, String, EmbeddedKafkaBroker)}
 */
@SuppressWarnings("deprecation")
@Deprecated
public static Map<String, Object> consumerProps(String group, String autoCommit,
    org.springframework.kafka.test.rule.KafkaEmbedded embeddedKafka) {
  return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit);
}

代码示例来源:origin: spring-projects/spring-kafka

@Bean
public Map<String, Object> consumerConfigs() {
  Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.brokerAddresses, "testGroup",
      "false");
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  return consumerProps;
}

代码示例来源:origin: spring-projects/spring-kafka

@Bean
public Map<String, Object> consumerConfigs() {
  Map<String, Object> consumerProps =
      KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka);
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  return consumerProps;
}

代码示例来源:origin: spring-projects/spring-kafka

@Bean
public Map<String, Object> consumerConfigs() {
  Map<String, Object> consumerProps =
      KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka.getEmbeddedKafka());
  consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  return consumerProps;
}

代码示例来源:origin: spring-projects/spring-kafka

@Bean
public Map<String, Object> consumerConfigs() {
  Map<String, Object> consumerProps =
      KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka.getEmbeddedKafka());
  consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
  consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
  consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedFooProvider.class);
  consumerProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Foo.class.getName());
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  return consumerProps;
}

代码示例来源:origin: spring-projects/spring-kafka

@Bean
public Map<String, Object> consumerConfigs() {
  Map<String, Object> consumerProps =
      KafkaTestUtils.consumerProps("myAliasGroup", "false", embeddedKafka());
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  return consumerProps;
}

代码示例来源:origin: spring-projects/spring-kafka

@Bean
public DefaultKafkaConsumerFactory<Integer, String> cf() {
  Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("serverSide", "false", embeddedKafka);
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  return new DefaultKafkaConsumerFactory<>(consumerProps);
}

代码示例来源:origin: spring-projects/spring-kafka

@Bean
public Map<String, Object> consumerConfigs() {
  Map<String, Object> consumerProps =
      KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka.getEmbeddedKafka());
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  return consumerProps;
}

代码示例来源:origin: spring-projects/spring-kafka

@Bean
public ConsumerFactory<String, String> cf() {
  Map<String, Object> props = KafkaTestUtils.consumerProps(TOPIC + ".g1", "false", embeddedKafka());
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
  props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, FailSometimesDeserializer.class);
  props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, FailSometimesDeserializer.class.getName());
  return new DefaultKafkaConsumerFactory<>(props);
}

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testSeekAutoCommitDefault() throws Exception {
  Map<String, Object> props = KafkaTestUtils.consumerProps("test15", "true", embeddedKafka);
  props.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); // test true by default
  testSeekGuts(props, topic15, true);
}

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testSeekAutoCommit() throws Exception {
  Map<String, Object> props = KafkaTestUtils.consumerProps("test12", "true", embeddedKafka);
  testSeekGuts(props, topic12, true);
}

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testSeek() throws Exception {
  Map<String, Object> props = KafkaTestUtils.consumerProps("test11", "false", embeddedKafka);
  testSeekGuts(props, topic11, false);
}

代码示例来源:origin: spring-projects/spring-kafka

@BeforeClass
public static void setUp() throws Exception {
  Map<String, Object> consumerProps = KafkaTestUtils
      .consumerProps("KafkaTemplatetests" + UUID.randomUUID().toString(), "false", embeddedKafka);
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
  consumer = cf.createConsumer();
  embeddedKafka.consumeFromAnEmbeddedTopic(consumer, INT_KEY_TOPIC);
}

代码示例来源:origin: spring-projects/spring-kafka

private <K, V> Consumer<K, V> consumer(String topic, Serde<K> keySerde, Serde<V> valueSerde) throws Exception {
  Map<String, Object> consumerProps =
      KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "false", this.embeddedKafka);
  consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
  DefaultKafkaConsumerFactory<K, V> kafkaConsumerFactory =
      new DefaultKafkaConsumerFactory<>(consumerProps, keySerde.deserializer(), valueSerde.deserializer());
  Consumer<K, V> consumer = kafkaConsumerFactory.createConsumer();
  this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic);
  return consumer;
}

代码示例来源:origin: spring-projects/spring-kafka

private Consumer<String, String> createConsumer() {
  Map<String, Object> consumerProps =
      KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "false", this.embeddedKafka);
  consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
  DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
      new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer());
  return kafkaConsumerFactory.createConsumer();
}

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testLateStartedConsumer() throws Exception {
  Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(TEST_EMBEDDED, "false", this.broker);
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
  this.broker.consumeFromAnEmbeddedTopic(consumer, TEST_EMBEDDED);
  Producer<String, Object> producer = new KafkaProducer<>(KafkaTestUtils.producerProps(this.broker));
  producer.send(new ProducerRecord<>(TEST_EMBEDDED, "foo"));
  producer.close();
  KafkaTestUtils.getSingleRecord(consumer, TEST_EMBEDDED);
  consumerProps = KafkaTestUtils.consumerProps("another" + TEST_EMBEDDED, "false", this.broker);
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  Consumer<Integer, String> consumer2 = new KafkaConsumer<>(consumerProps);
  this.broker.consumeFromAnEmbeddedTopic(consumer2, TEST_EMBEDDED);
  KafkaTestUtils.getSingleRecord(consumer2, TEST_EMBEDDED);
  consumer.close();
  consumer2.close();
}

代码示例来源:origin: spring-projects/spring-kafka

public ReplyingKafkaTemplate<Integer, String, String> createTemplate(TopicPartitionInitialOffset topic)
    throws Exception {
  ContainerProperties containerProperties = new ContainerProperties(topic);
  Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.testName.getMethodName(), "false",
      embeddedKafka);
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
  KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
      containerProperties);
  container.setBeanName(this.testName.getMethodName());
  ReplyingKafkaTemplate<Integer, String, String> template = new ReplyingKafkaTemplate<>(this.config.pf(), container);
  template.setSharedReplyTopic(true);
  template.start();
  assertThat(template.getAssignedReplyTopicPartitions()).hasSize(1);
  assertThat(template.getAssignedReplyTopicPartitions().iterator().next().topic()).isEqualTo(topic.topic());
  return template;
}

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testMissingTopicCMLC() throws Exception {
  Map<String, Object> props = KafkaTestUtils.consumerProps("missing1", "true", embeddedKafka);
  DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
  ContainerProperties containerProps = new ContainerProperties("notexisting");
  containerProps.setMessageListener((MessageListener<Integer, String>) message -> { });
  ConcurrentMessageListenerContainer<Integer, String> container =
      new ConcurrentMessageListenerContainer<>(cf, containerProps);
  container.setBeanName("testMissing1");
  try {
    container.start();
    fail("Expected exception");
  }
  catch (IllegalStateException e) {
    assertThat(e.getMessage()).contains("missingTopicsFatal");
  }
}

代码示例来源:origin: spring-projects/spring-kafka

@Bean
public ConsumerFactory<String, String> cfWithExplicitDeserializers() {
  Map<String, Object> props = KafkaTestUtils.consumerProps(TOPIC + ".g2", "false", embeddedKafka());
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  return new DefaultKafkaConsumerFactory<>(props,
      new ErrorHandlingDeserializer2<String>(new FailSometimesDeserializer()).keyDeserializer(true),
      new ErrorHandlingDeserializer2<String>(new FailSometimesDeserializer()));
}

相关文章