我正在运行一个spring启动应用程序,已经开始工作了 compile('org.springframework.kafka:spring-kafka:2.1.5.RELEASE')
我尝试使用以下版本来反对cloudera安装: Cloudera Distribution of Apache Kafka Version 3.0.0-1.3.0.0.p0.40 Version 0.11.0+kafka3.0.0+50
我的kafkaproducerconfig类非常简单:
@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.template.default-topic}")
private String defaultTopicName;
@Value("${spring.kafka.producer.compression-type}")
private String producerCompressionType;
@Value("${spring.kafka.producer.client-id}")
private String producerClientId;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.producerCompressionType);
props.put(ProducerConfig.CLIENT_ID_CONFIG, this.producerClientId);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}
@Bean
public ProducerFactory<String, Pdid> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Pdid> kafkaTemplate() {
KafkaTemplate<String, Pdid> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(this.defaultTopicName);
return kafkaTemplate;
}
@PostConstruct
public void postConstruct() {
LOGGER.info("Kafka producer configuration: " + this.producerConfigs().toString());
LOGGER.info("Kafka topic name: " + this.defaultTopicName);
}
}
启动应用程序时,我会收到:
2018-05-01 17:15:41.355 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1 2018-05-01 17:15:41.356 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e
然后,我发送一个有效载荷。它出现在Kafka工具的主题。但是,在Kafka一侧的日志中,当试图接收数据时,我收到:
[KafkaApi-131] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=profiles-pdid,partitions=[{partition=0,fetch_offset=7,max_bytes=1048576}]}]}java.lang.IllegalArgumentException: Magic v0 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
at java.lang.Thread.run(Thread.java:748)
我从producer应用程序方面尝试了以下几点:
降级到SpringKafka2.0.4。我本来希望Kafka版本的0.11.0能帮助解决这个问题,但没有效果。
已验证所有节点的版本都相同。据我的管理员说,他们是。
与我的管理员核实,我们没有混合安装。又一次,我被告知我们没有。
基于一个类似的堆栈溢出问题,我回到了2.1.5版本,并尝试将jsonserializer.add\ type\ info\ headers设置为false。我想也许它会删除日志引用的头。同样,不通过,同样的错误被记录。
我希望我遗漏了一些明显的东西。我是否需要打开/关闭其他头文件来帮助解决任何人都知道的magicv0问题?
我们有其他的应用程序可以在同一个环境中成功地编写其他主题,但是它们是手工制作必要的springbean的较老的应用程序。此外,这些应用程序还使用了更老的客户机(0.8.2.2),并且它们对producer值使用了stringserializer,而不是json。我需要我的数据是在json,我真的不想降级到0.8.2.2当我们在一个系统,应该支持0.11.x。
3条答案
按热度按时间fcwjkofz1#
这个问题的解决方案是两方面的结合:
我需要加上
JsonSerializer.ADD_TYPE_INFO_HEADERS to false
,就像加里·罗素建议的那样。在将配置放入应用程序之前,我需要刷新已放入主题的所有记录。以前的记录有标题,这会腐 eclipse flume使用者。
doinxwow2#
我已经将我的应用程序升级到springboot2x,并且我与kafka客户机依赖关系有一些兼容性问题(参见springboot和springkafka兼容性矩阵),所以我也必须升级它。另一方面,我有一个旧的代理(kafka0.10)在服务器上运行,然后我无法向它发送消息。我也意识到,即使设置
JsonSerializer.ADD_TYPE_INFO_HEADERS
为false,Kafka制作人在内部设置了标题,由于魔术是根据Kafka的版本(在RecordBatch
),在这种情况下没有办法不陷入MemoryRecordsBuilder.appendWithOffset
:if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
. 最后,解决这个问题的唯一方法是升级我的kafka服务器。qyyhg6bp3#
但它们是手工制作必要的springbean的较老的应用程序。
访问org.apache.kafka.common.record.filerecords。向下转换(文件记录)。java:245)
我不熟悉kafka代理的内部结构,但是“听起来”这些主题是用一个旧的代理创建的,它们的格式不支持头,而不是代理版本本身(提示:downconvert)。
你和一个干净的经纪人试过吗?
只要您不尝试使用代理不支持的功能,1.0.x客户端就可以与较旧的代理对话(回到0.10.2.x iirc)。您的代理是0.11(它确实支持头文件)这一事实进一步表明,问题出在主题记录格式上。
我已经成功地测试了up/down broker/client/topic版本,没有问题,只要您使用公共特性子集。
jsonserializer.add\ type\ info\ headers为false。
这将阻止框架设置任何头文件;您需要显示生产者代码(以及所有配置)。
您还可以添加
ProducerInterceptor
并检查ProducerRecord
headers
中的属性onSend()
方法,以确定在输出消息中设置标头的内容。如果您使用的是spring消息(
template.setn(Message<?> m)
,默认情况下将Map标头)。使用rawtemplate.send()
方法不会设置任何头(除非您发送ProducerRecord
已填充标头。