如何检查avro模式注册表使用情况

kmpatx3s  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(337)

我正在通过avro4s使用avro。这是我对消费者/生产者的配置

def producerSettings(system: ActorSystem): ProducerSettings[String, Array[Byte]] = ProducerSettings(
    system,
    new StringSerializer,
    new ByteArraySerializer)
    .withBootstrapServers("localhost:9092")
    .withProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
    .withProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
    .withProperty("key.converter.schema.registry.url", "http://localhost:8081")
    .withProperty("value.converter.schema.registry.url", "http://localhost:8081")
    .withProperty("schema.registry.url", "http://localhost:8081")
    .withProperty("auto.create.topics.enable", "true")

  def consumerSettings(system: ActorSystem): ConsumerSettings[String, Array[Byte]] =
    ConsumerSettings(
      system,
      new StringDeserializer,
      new ByteArrayDeserializer)
      .withBootstrapServers("localhost:9092")
      .withProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
      .withProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
      .withProperty("key.converter.schema.registry.url", "http://localhost:8081")
      .withProperty("value.converter.schema.registry.url", "http://localhost:8081")
      .withProperty("schema.registry.url", "http://localhost:8081")
      .withGroupId("test")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

我怀疑这些寄存器是否被使用过。当我的应用程序运行时,在模式注册表日志中是静默的。
如何检查我的应用程序是否正在使用注册表?
如果不是-怎么解决?

ecfsfe2w

ecfsfe2w1#

您使用了错误的类,因此您的属性可能会出错
你真的需要使用 KafkaAvroSerializer 为制片人准备的

new StringSerializer,
new ByteArraySerializer)

以及 KafkaAvroDeserializer 对于这里的消费者

new StringDeserializer,
new ByteArrayDeserializer)

试着改变 String, Array[Byte]GenericRecord 或者你用avro4s做的箱子

相关问题