kafka代理无法通过kafka.javaapi.producer.producer.send()批处理api使用消息

67up9zun  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(192)

我编写了一个应用程序,它将存储在cassandra中的事件数据流化并推送到kafka broker(版本kafka 2.9.2-0.8.1.1),但是在kafka broker关闭的情况下,读取事件会继续缓存在cassandra的缓存存储中,并在对send()的单个调用中重试。如果kafka代理长时间处于关闭状态,则缓存大小将快速增加。
我遇到过这样的情况,应用程序试图使用以下方法向kafka broker发送超过75000条消息:

class kafka.javaapi.producer.Producer<K,V> {
...
    public void send(List<KeyedMessage<K,V>> messages);
...
}

但是,它失败了,只有以下例外:

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at com.tfsc.ilabs.chronos.app.gen.event.pipeline.impl.KafkaEventProcessor.sendTuplesToKafka(KafkaEventProcessor.java:64)
        at com.tfsc.ilabs.chronos.app.gen.event.pipeline.processing.loader.impl.GenericTupleProcessingLoader.loadTuples(GenericTupleProcessingLoader.java:22)
        at com.tfsc.ilabs.chronos.processing.transactionmanager.impl.TransactionManager$TransactionManagerLoaderJob.run(TransactionManager.java:159)
        at com.tfsc.ilabs.core.trackingservice.TrackableRunnable.run(TrackableRunnable.java:21)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

另一台缓存中消息较少的主机能够在没有任何问题的情况下写入kafka代理,因此不存在安装或网络问题。
有人能解释一下kafka代理在一次调用上述api函数时可以处理的消息数量方面是否有限制,并为这个问题提出一些解决方案吗。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题