如何测试使用avro和合流模式注册表的springcloudstreamkafka流应用程序?

n3ipq98p  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(434)

我在弄清楚如何测试使用avro作为消息格式和(合流)模式注册表的springcloudstreamkafka流应用程序时遇到了困难。
配置可以如下所示:

spring:
  application:
    name: shipping-service
  cloud:
    stream:
      schema-registry-client:
        endpoint: http://localhost:8081
      kafka:
        streams:
          binder:
            configuration:
              application:
                id: shipping-service
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
              schema:
                registry:
                  url: ${spring.cloud.stream.schema-registry-client.endpoint}
              value:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
          bindings:
            input:
              consumer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            order:
              consumer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
            output:
              producer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
      bindings:
        input:
          destination: customer
        order:
          destination: order
        output:
          destination: order

server:
  port: 8086

logging:
  level:
    org.springframework.kafka.config: debug

笔记:
它正在使用本机序列化/反序列化。
测试框架:JUnit5
我想关于kafka代理,我应该使用一个嵌入式kafkabroker bean,但是正如您所看到的,它还依赖于一个应该以某种方式进行模拟的模式注册表。怎样?

woobm2wo

woobm2wo1#

解决这个问题确实很痛苦,但最后我还是通过流畅的Kafka流测试成功了:
额外依赖项:

testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("com.bakdata.fluent-kafka-streams-tests:schema-registry-mock-junit5:2.0.0")

关键是将必要的配置设置为系统属性。为此,我创建了一个单独的测试配置类:

@Configuration
class KafkaTestConfiguration(private val embeddedKafkaBroker: EmbeddedKafkaBroker) {

    private val schemaRegistryMock = SchemaRegistryMock()

    @PostConstruct
    fun init() {
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaBroker.brokersAsString)
        System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaBroker.brokersAsString)
        schemaRegistryMock.start()
        System.setProperty("spring.cloud.stream.schema-registry-client.endpoint", schemaRegistryMock.url)
        System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url", schemaRegistryMock.url)
    }

    @Bean
    fun schemaRegistryMock(): SchemaRegistryMock {
        return schemaRegistryMock
    }

    @PreDestroy
    fun preDestroy() {
        schemaRegistryMock.stop()
    }
}

最后是test类,您现在可以在其中生成和使用avro消息,并使用kstream处理它们并利用模拟模式注册表:

@EmbeddedKafka
@SpringBootTest(properties = [
    "spring.profiles.active=local",
    "schema-registry.user=",
    "schema-registry.password=",
    "spring.cloud.stream.bindings.event.destination=event",
    "spring.cloud.stream.bindings.event.producer.useNativeEncoding=true",
    "spring.cloud.stream.kafka.streams.binder.configuration.application.server=localhost:8080",
    "spring.cloud.stream.kafka.streams.bindings.event.consumer.keySerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde",
    "spring.cloud.stream.kafka.streams.bindings.event.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde"])
class MyApplicationTests {

    @Autowired
    private lateinit var embeddedKafka: EmbeddedKafkaBroker

    @Autowired
    private lateinit var schemaRegistryMock: SchemaRegistryMock

    @Test
    fun `should process events`() {
        val senderProps = KafkaTestUtils.producerProps(embeddedKafka)
        senderProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "io.confluent.kafka.serializers.KafkaAvroSerializer"
        senderProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = "io.confluent.kafka.serializers.KafkaAvroSerializer"
        senderProps["schema.registry.url"] = schemaRegistryMock.url
        val pf = DefaultKafkaProducerFactory<Int, String>(senderProps)
        try {
            val template = KafkaTemplate(pf, true)
            template.defaultTopic = "event"
            ...

    }

相关问题