org.apache.flume.channelexception:无法将批处理放在所需的通道:filechannel ch\ u文件上,原因是:java.util.concurrentmodificationexception

7dl7o3gd  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(397)

我的Flume系统有一个错误,为什么以及如何修复它??????
org.apache.flume.channelexception:无法将批处理放在所需的通道:filechannel ch\ u文件上,原因是:java.util.concurrentmodificationexception!

2016-02-24 17:42:54,715 ERROR org.apache.flume.source.AvroSource: Avro source src_avro_c1: Unable to process event batch. Exception follows.
org.apache.flume.ChannelException: Unable to put batch on required channel: FileChannel ch_file { dataDirs: [/data01/flume/data] }
    at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
    at org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:386)
    at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:91)
    at org.apache.avro.ipc.Responder.respond(Responder.java:151)
    at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:560)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:787)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:560)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:555)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:107)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:88)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException
    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
    at java.util.HashMap$KeyIterator.next(HashMap.java:1453)
    at org.apache.flume.channel.file.Put.writeProtos(Put.java:82)
    at org.apache.flume.channel.file.TransactionEventRecord.toByteBuffer(TransactionEventRecord.java:174)
    at org.apache.flume.channel.file.Log.put(Log.java:622)
    at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:469)
    at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
    at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
    at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)
    ... 30 more
z9gpfhce

z9gpfhce1#

是的,我修复了这个问题,首先我设置了3个回复通道,如下所示,但不幸的是,我更改了kafka sink事件头,从channel:ch_mem_kafka,它造成了这个问题,但我也不能理解,为什么会造成事件从channel:ch_file 要修改吗?


# The channel can be defined as follows.

collector1.sources.src_avro_c1.channels = ch_file ch_mem ch_mem_kafka
collector1.sources.src_avro_c1.selector.type = replicating

相关问题