spring kafka-遇到“magic v0不支持记录头”错误

q43xntqr  于 2021-06-04  发布在  Flume
关注(0)|答案(3)|浏览(622)

我正在运行一个spring启动应用程序,已经开始工作了 compile('org.springframework.kafka:spring-kafka:2.1.5.RELEASE') 我尝试使用以下版本来反对cloudera安装: Cloudera Distribution of Apache Kafka Version 3.0.0-1.3.0.0.p0.40 Version 0.11.0+kafka3.0.0+50 我的kafkaproducerconfig类非常简单:

@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.template.default-topic}")
private String defaultTopicName;

@Value("${spring.kafka.producer.compression-type}")
private String producerCompressionType;

@Value("${spring.kafka.producer.client-id}")
private String producerClientId;

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.producerCompressionType);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, this.producerClientId);
    props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

    return props;
}

@Bean
public ProducerFactory<String, Pdid> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, Pdid> kafkaTemplate() {
    KafkaTemplate<String, Pdid> kafkaTemplate = new KafkaTemplate<>(producerFactory());

    kafkaTemplate.setDefaultTopic(this.defaultTopicName);

    return kafkaTemplate;
}

@PostConstruct
public void postConstruct() {
    LOGGER.info("Kafka producer configuration: " + this.producerConfigs().toString());
    LOGGER.info("Kafka topic name: " + this.defaultTopicName);
}

}
启动应用程序时,我会收到:
2018-05-01 17:15:41.355 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1 2018-05-01 17:15:41.356 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e 然后,我发送一个有效载荷。它出现在Kafka工具的主题。但是,在Kafka一侧的日志中,当试图接收数据时,我收到:

[KafkaApi-131] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=profiles-pdid,partitions=[{partition=0,fetch_offset=7,max_bytes=1048576}]}]}java.lang.IllegalArgumentException: Magic v0 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
at java.lang.Thread.run(Thread.java:748)

我从producer应用程序方面尝试了以下几点:
降级到SpringKafka2.0.4。我本来希望Kafka版本的0.11.0能帮助解决这个问题,但没有效果。
已验证所有节点的版本都相同。据我的管理员说,他们是。
与我的管理员核实,我们没有混合安装。又一次,我被告知我们没有。
基于一个类似的堆栈溢出问题,我回到了2.1.5版本,并尝试将jsonserializer.add\ type\ info\ headers设置为false。我想也许它会删除日志引用的头。同样,不通过,同样的错误被记录。
我希望我遗漏了一些明显的东西。我是否需要打开/关闭其他头文件来帮助解决任何人都知道的magicv0问题?
我们有其他的应用程序可以在同一个环境中成功地编写其他主题,但是它们是手工制作必要的springbean的较老的应用程序。此外,这些应用程序还使用了更老的客户机(0.8.2.2),并且它们对producer值使用了stringserializer,而不是json。我需要我的数据是在json,我真的不想降级到0.8.2.2当我们在一个系统,应该支持0.11.x。

fcwjkofz

fcwjkofz1#

这个问题的解决方案是两方面的结合:
我需要加上 JsonSerializer.ADD_TYPE_INFO_HEADERS to false ,就像加里·罗素建议的那样。
在将配置放入应用程序之前,我需要刷新已放入主题的所有记录。以前的记录有标题,这会腐 eclipse flume使用者。

doinxwow

doinxwow2#

我已经将我的应用程序升级到springboot2x,并且我与kafka客户机依赖关系有一些兼容性问题(参见springboot和springkafka兼容性矩阵),所以我也必须升级它。另一方面,我有一个旧的代理(kafka0.10)在服务器上运行,然后我无法向它发送消息。我也意识到,即使设置 JsonSerializer.ADD_TYPE_INFO_HEADERS 为false,Kafka制作人在内部设置了标题,由于魔术是根据Kafka的版本(在 RecordBatch ),在这种情况下没有办法不陷入 MemoryRecordsBuilder.appendWithOffset : if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) throw new IllegalArgumentException("Magic v" + magic + " does not support record headers"); . 最后,解决这个问题的唯一方法是升级我的kafka服务器。

qyyhg6bp

qyyhg6bp3#

但它们是手工制作必要的springbean的较老的应用程序。
访问org.apache.kafka.common.record.filerecords。向下转换(文件记录)。java:245)
我不熟悉kafka代理的内部结构,但是“听起来”这些主题是用一个旧的代理创建的,它们的格式不支持头,而不是代理版本本身(提示:downconvert)。
你和一个干净的经纪人试过吗?
只要您不尝试使用代理不支持的功能,1.0.x客户端就可以与较旧的代理对话(回到0.10.2.x iirc)。您的代理是0.11(它确实支持头文件)这一事实进一步表明,问题出在主题记录格式上。
我已经成功地测试了up/down broker/client/topic版本,没有问题,只要您使用公共特性子集。
jsonserializer.add\ type\ info\ headers为false。
这将阻止框架设置任何头文件;您需要显示生产者代码(以及所有配置)。
您还可以添加 ProducerInterceptor 并检查
ProducerRecord headers 中的属性 onSend() 方法,以确定在输出消息中设置标头的内容。
如果您使用的是spring消息( template.setn(Message<?> m) ,默认情况下将Map标头)。使用raw template.send() 方法不会设置任何头(除非您发送 ProducerRecord 已填充标头。

相关问题