本文整理了Java中org.springframework.kafka.test.utils.KafkaTestUtils.consumerProps()
方法的一些代码示例,展示了KafkaTestUtils.consumerProps()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KafkaTestUtils.consumerProps()
方法的具体详情如下:
包路径:org.springframework.kafka.test.utils.KafkaTestUtils
类名称:KafkaTestUtils
方法名:consumerProps
[英]Set up test properties for an consumer.
[中]为使用者设置测试属性。
代码示例来源:origin: spring-projects/spring-kafka
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param group the group id.
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka) {
return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit);
}
代码示例来源:origin: spring-projects/spring-kafka
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param group the group id.
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link org.springframework.kafka.test.rule.KafkaEmbedded} instance.
* @return the properties.
* @deprecated since 2.2 in favor of {@link #consumerProps(String, String, EmbeddedKafkaBroker)}
*/
@SuppressWarnings("deprecation")
@Deprecated
public static Map<String, Object> consumerProps(String group, String autoCommit,
org.springframework.kafka.test.rule.KafkaEmbedded embeddedKafka) {
return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit);
}
代码示例来源:origin: spring-projects/spring-kafka
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.brokerAddresses, "testGroup",
"false");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return consumerProps;
}
代码示例来源:origin: spring-projects/spring-kafka
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return consumerProps;
}
代码示例来源:origin: spring-projects/spring-kafka
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka.getEmbeddedKafka());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return consumerProps;
}
代码示例来源:origin: spring-projects/spring-kafka
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka.getEmbeddedKafka());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedFooProvider.class);
consumerProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Foo.class.getName());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return consumerProps;
}
代码示例来源:origin: spring-projects/spring-kafka
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps("myAliasGroup", "false", embeddedKafka());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return consumerProps;
}
代码示例来源:origin: spring-projects/spring-kafka
@Bean
public DefaultKafkaConsumerFactory<Integer, String> cf() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("serverSide", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(consumerProps);
}
代码示例来源:origin: spring-projects/spring-kafka
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka.getEmbeddedKafka());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return consumerProps;
}
代码示例来源:origin: spring-projects/spring-kafka
@Bean
public ConsumerFactory<String, String> cf() {
Map<String, Object> props = KafkaTestUtils.consumerProps(TOPIC + ".g1", "false", embeddedKafka());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, FailSometimesDeserializer.class);
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, FailSometimesDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
}
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testSeekAutoCommitDefault() throws Exception {
Map<String, Object> props = KafkaTestUtils.consumerProps("test15", "true", embeddedKafka);
props.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); // test true by default
testSeekGuts(props, topic15, true);
}
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testSeekAutoCommit() throws Exception {
Map<String, Object> props = KafkaTestUtils.consumerProps("test12", "true", embeddedKafka);
testSeekGuts(props, topic12, true);
}
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testSeek() throws Exception {
Map<String, Object> props = KafkaTestUtils.consumerProps("test11", "false", embeddedKafka);
testSeekGuts(props, topic11, false);
}
代码示例来源:origin: spring-projects/spring-kafka
@BeforeClass
public static void setUp() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps("KafkaTemplatetests" + UUID.randomUUID().toString(), "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, INT_KEY_TOPIC);
}
代码示例来源:origin: spring-projects/spring-kafka
private <K, V> Consumer<K, V> consumer(String topic, Serde<K> keySerde, Serde<V> valueSerde) throws Exception {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "false", this.embeddedKafka);
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
DefaultKafkaConsumerFactory<K, V> kafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(consumerProps, keySerde.deserializer(), valueSerde.deserializer());
Consumer<K, V> consumer = kafkaConsumerFactory.createConsumer();
this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic);
return consumer;
}
代码示例来源:origin: spring-projects/spring-kafka
private Consumer<String, String> createConsumer() {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "false", this.embeddedKafka);
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer());
return kafkaConsumerFactory.createConsumer();
}
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testLateStartedConsumer() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(TEST_EMBEDDED, "false", this.broker);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
this.broker.consumeFromAnEmbeddedTopic(consumer, TEST_EMBEDDED);
Producer<String, Object> producer = new KafkaProducer<>(KafkaTestUtils.producerProps(this.broker));
producer.send(new ProducerRecord<>(TEST_EMBEDDED, "foo"));
producer.close();
KafkaTestUtils.getSingleRecord(consumer, TEST_EMBEDDED);
consumerProps = KafkaTestUtils.consumerProps("another" + TEST_EMBEDDED, "false", this.broker);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<Integer, String> consumer2 = new KafkaConsumer<>(consumerProps);
this.broker.consumeFromAnEmbeddedTopic(consumer2, TEST_EMBEDDED);
KafkaTestUtils.getSingleRecord(consumer2, TEST_EMBEDDED);
consumer.close();
consumer2.close();
}
代码示例来源:origin: spring-projects/spring-kafka
public ReplyingKafkaTemplate<Integer, String, String> createTemplate(TopicPartitionInitialOffset topic)
throws Exception {
ContainerProperties containerProperties = new ContainerProperties(topic);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.testName.getMethodName(), "false",
embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
containerProperties);
container.setBeanName(this.testName.getMethodName());
ReplyingKafkaTemplate<Integer, String, String> template = new ReplyingKafkaTemplate<>(this.config.pf(), container);
template.setSharedReplyTopic(true);
template.start();
assertThat(template.getAssignedReplyTopicPartitions()).hasSize(1);
assertThat(template.getAssignedReplyTopicPartitions().iterator().next().topic()).isEqualTo(topic.topic());
return template;
}
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testMissingTopicCMLC() throws Exception {
Map<String, Object> props = KafkaTestUtils.consumerProps("missing1", "true", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties("notexisting");
containerProps.setMessageListener((MessageListener<Integer, String>) message -> { });
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testMissing1");
try {
container.start();
fail("Expected exception");
}
catch (IllegalStateException e) {
assertThat(e.getMessage()).contains("missingTopicsFatal");
}
}
代码示例来源:origin: spring-projects/spring-kafka
@Bean
public ConsumerFactory<String, String> cfWithExplicitDeserializers() {
Map<String, Object> props = KafkaTestUtils.consumerProps(TOPIC + ".g2", "false", embeddedKafka());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props,
new ErrorHandlingDeserializer2<String>(new FailSometimesDeserializer()).keyDeserializer(true),
new ErrorHandlingDeserializer2<String>(new FailSometimesDeserializer()));
}
内容来源于网络,如有侵权,请联系作者删除!