如何在spring cloud kafka活页夹中将偏移量重置为开始

efzxgjgh  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(445)

我正在使用springcloudkafka活页夹将数据读取到kstream。在阅读某个主题的数据时,我需要从头开始阅读。
我试图设置Kafka偏移重置和启动偏移属性。但是,找不到任何参考资料。
你能帮我提供任何示例application.yaml来重置偏移量吗,这样我就可以从头开始使用主题中的消息了
添加我使用过的application.yaml:

spring.cloud.stream.bindings.input:
  destination: input-topic1
  consumer:
    useNativeDecoding: true
    headerMode: raw
spring.cloud.stream.bindings.output:
  destination: output-topic
  producer:
    useNativeDecoding: true
    headerMode: raw
spring.cloud.stream.bindings.beginningInput:
  destination: beginning-topic
  consumer:
    useNativeDecoding: true
    headerMode: raw
spring.cloud.stream.kafka.streams.bindings.input:
  consumer:
    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.output:
  producer:
    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.beginningInput:
  consumer:
    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    resetOffsets: true
    startOffset: earliest
spring.cloud.stream.kafka.streams.binder:
  brokers: 127.0.0.1
  zkNodes: 127.0.0.1
  configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    commit.interval.ms: 1000
bejyjqdl

bejyjqdl1#

resetOffsets 是坏的。它在2.0.0.0版本中恢复。
在这里公关。

相关问题