无法使用apacheavro反序列化数据

rggaifut  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(143)

我有一个spring引导应用程序,它从kafka代理发送和接收数据,我使用apacheavro作为serder。到目前为止,我所做的是使用maven plugin生成类,模式非常简单:

{"namespace": "com.domain",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_number",  "type": ["int", "null"]},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

然而,我得到了一个非常奇怪的例外:
org.springframework.messaging.converter.messageconversionexception:无法从[com.domain.user]转换为[com.domain.user]genericmessage[payload={“name”:“a”,“favorite\u number”:4,“favorite\u color”:“test6”},headers={kafka\u offset=0,kafka\u receivedmessagekey=null,kafka\u receivedpartitionid=1,kafka\u receivedtopic=batchqueue}]
以下是我的配置类:

@Configuration
public class ConsumerConfig {
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
        props.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, "avro");

        return props;
    }

    @Bean
    public ConsumerFactory<String, User> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
            new StringDeserializer(),
            new AvroDeserializer<>(User.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, User> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public Consumer receiver() {
        return new Consumer();
    }
}

public class ProducerConfig {
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, User> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, User> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public Producer sender() {
        return new Producer();
    }

}

知道为什么会这样吗?
更新:我没有使用包含“avroserializer”类的依赖性,我使用的是从本教程复制的类。
对于制作类:

public class Producer {
    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    public void send(User data) {
        LOGGER.info("sending job data='{}'", data.toString());
        kafkaTemplate.send(topicname, data);
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题