执行重新平衡时重复读取kafka事务消息

svgewumm  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(208)

我有一个消费者主题ctopic(数据由外部代理编写)和一个生产者ptopic。这个流是消费者从ctopic读取消息,处理它并写入生产者主题。代码如下

tranxId = "O8XXD";
cache = new HashMap()

RestartSource.onFailuresWithBackoff(minBackoff = 2.seconds, maxBackoff = 600.seconds, randomFactor = 0.4)
{ () =>

  Transactional.source(consumerProperties, Subscriptions.topics("CTOPIC"))

    .mapAsync(1)(myMessage => {

        upsertDLQ(myMessage) // insert/update myMessge into DLQ and with the  count of retries

        EvaluateMyMessage(myMessage)  //calls google API  via n/w call for e.g.
            .recoverWith{

                case exception:NetworkException =>{
                    //log  the error and do a backoff-retry via the above RestartSource block

                    if (DLQ_retry_count(myMessage)> 4{
                        Future(ProducerMessage.passThrough[String, String, PartitionOffset](myMessage.partitionOffset))
                        }
                    else {
                        Future failed exception

                    }
                }
            }
    })
    .via(Transactional.flow(producerProperties, tranxId))  //write to  "PTOPIC"
}.runWith(Sink.foreach(res => log.info(s" Response:" +s" ${res.passThrough}")))

[info] a.k.i.TransactionalSourceLogic - [0cd3d] Starting. StageActor                                     
Actor[akka://application/system/Materializers/StreamSupervisor-0/$$b#987621478]

''consumerconfig值:

auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
fetch.max.wait.ms = 500
group.id = mygroup-1
group.instance.id = null
heartbeat.interval.ms = 3000
isolation.level = read_committed
max.poll.interval.ms = 300000
max.poll.records = 500
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100

''producerconfig值:

acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = default
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
max.block.ms = 60000
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
transaction.timeout.ms = 60000
transactional.id = O8XXD

消息m1、m2、m3、m4依次出现,当第二条消息m2到达时出现网络错误,将发生重新定位。
我的经验是m2应该重试4次,但我看到m3,m4也立即进来,它像m2,m3,m4都试图得到处理。这个过程持续了大约3-4分钟,在这段时间之后它被提交(m2,m3和m4如预期的那样在dlq中)。
但是,当m2还没有提交时,我怎么才能停止阅读m3、m4呢?因为重新平衡,m2被代码读取了大约7-8次。请帮忙

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题