超过内存事务容量后,flume不会恢复

dbf7pr2w  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(279)

我正在创建一个flume代理的概念验证,它将缓冲事件,并在接收器不可用时停止使用源中的事件。只有当接收器再次可用时,才应处理缓冲事件,然后源重新启动消耗。
为此,我创建了一个简单的代理,它从spooldir读取数据并写入文件。为了模拟接收器服务已关闭,我更改了文件权限,以便flume无法对其进行写入。然后我启动flume,一些事件被缓冲在内存通道中,当通道容量达到预期值时,它停止消耗事件。一旦文件变得可写,接收器就可以处理事件并恢复flume。但是,只有在不超过事务容量时,这种方法才有效。一旦超过事务容量,flume将永远不会恢复,并继续写入以下错误:

2015-10-02 14:52:51,940 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - 
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to 
deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to process transaction
    at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:218)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Take list for MemoryTransaction, 
capacity 4 full, consider committing more frequently, increasing capacity, or 
increasing thread count
    at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
    at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
    at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
    at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
    ... 3 more

一旦内存中缓冲的事件数超过事务容量(4),就会发生此错误。我不明白为什么,因为fileout的batchsize是1,所以它应该逐个删除事件。
这是我正在使用的配置:

agent.sources = spool-src
agent.channels = mem-channel
agent.sinks = fileout

agent.sources.spool-src.channels = mem-channel
agent.sources.spool-src.type = spooldir
agent.sources.spool-src.spoolDir = /tmp/flume-spool
agent.sources.spool-src.batchSize = 1

agent.channels.mem-channel.type = memory
agent.channels.mem-channel.capacity = 10
agent.channels.mem-channel.transactionCapacity = 4

agent.sinks.fileout.channel = mem-channel
agent.sinks.fileout.type = file_roll
agent.sinks.fileout.sink.directory = /tmp/flume-output
agent.sinks.fileout.sink.rollInterval = 0
agent.sinks.fileout.batchSize = 1

我已经用不同的通道容量和事务容量值(例如,3和3)测试了这个配置,但是没有发现通道容量已满并且flume能够恢复的情况。

2cmtqfgy

2cmtqfgy1#

在flume邮件列表上有人告诉我可能是这个bug影响了我的概念验证。这个错误意味着批处理大小是100,即使在配置中指定了不同的批处理大小。我重新运行了测试,将source和sink batchsize设置为100,内存通道transactioncapacity设置为100,其容量设置为300。有了这些值,概念证明就完全按照预期工作了。

相关问题