apache flume:kafka.consumer.consumertimeoutexception

l2osamch  于 2021-06-04  发布在  Flume
关注(0)|答案(3)|浏览(373)

我正在尝试用apacheflume构建管道:spooldir->kafka channel->hdfs sink
事件去Kafka主题没有问题,我可以看到他们与Kafka的要求。但是kafka频道不能通过sink将文件写入hdfs。错误是:
等待来自Kafka的数据时超时
完整日志:
2016-02-26 18:25:17125(sinkrunner pollingrunner defaultsinkprocessor sendthread(zoo02:2181))[调试-org.apache.zookeeper.clientcnxn$sendthread.readresponse(clientcnxn。java:717)]0毫秒后获得sessionid:0x2524a81676d02aa的ping响应
2016-02-26 18:25:19127(sinkrunner pollingrunner defaultsinkprocessor sendthread(zoo02:2181))[调试-org.apache.zookeeper.clientcnxn$sendthread.readresponse(clientcnxn。java:717)]1ms后获得sessionid:0x2524a81676d02aa的ping响应
2016-02-26 18:25:21129(sinkrunner pollingrunner defaultsinkprocessor sendthread(zoo02:2181))[调试-org.apache.zookeeper.clientcnxn$sendthread.readresponse(clientcnxn。java:717)]0毫秒后获得sessionid:0x2524a81676d02aa的ping响应
2016-02-26 18:25:21,775(sinkrunner pollingrunner defaultsinkprocessor)[debug-org.apache.flume.channel.kafka.kafkachannel$kafkatransaction.dotake(kafkachannel。java:327)]等待数据来自kafka.consumer.consumertimeoutexception的kafka.consumer.consumeriterator.makenext(consumeriterator)时超时。scala:69)在kafka.consumer.consumeriterator.makenext(consumeriterator。scala:33)在kafka.utils.iteratortemplate.maybecomputenext(iteratortemplate。scala:66)在kafka.utils.iteratortemplate.hasnext(iteratortemplate。scala:58)在org.apache.flume.channel.kafka.kafkachannel$kafkatransaction.dotake(kafkachannel。java:306)在org.apache.flume.channel.basictransactionsemantics.take(basictransactionsemantics)。java:113)在org.apache.flume.channel.basicchannelsemantics.take(basicchannelsemantics。java:95)在org.apache.flume.sink.hdfs.hdfseventsink.process(hdfseventsink。java:374)在org.apache.flume.sink.defaultsinkprocessor.process(defaultsinkprocessor。java:68)在org.apache.flume.sinkrunner$pollingrunner.run(sinkrunner。java:147)在java.lang.thread.run(线程。java:745)
我的Flume的配置是:


# Name the components on this agent

a1.sources = r1
a1.sinks = k1
a1.channels = c2

# Describe/configure the source

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/alex/spoolFlume

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =  hdfs://10.12.0.1:54310/logs/flumetest/
a1.sinks.k1.hdfs.filePrefix = flume-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text

a1.channels.c2.type   = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 1000
a1.channels.c2.brokerList=kafka10:9092,kafka11:9092,kafka12:9092
a1.channels.c2.topic=flume_test_001
a1.channels.c2.zookeeperConnect=zoo00:2181,zoo01:2181,zoo02:2181

# Bind the source and sink to the channel

a1.sources.r1.channels = c2
a1.sinks.k1.channel = c2

用记忆通道代替Kafka通道一切正常。
感谢您提前提出任何建议!

zmeyuzjn

zmeyuzjn1#

consumertimeoutexception意味着很长一段时间没有新消息,并不意味着Kafka的连接超时。
http://kafka.apache.org/documentation.html
consumer.timeout.ms-1如果在指定的间隔之后没有消息可供使用,则向使用者抛出超时异常

64jmpszr

64jmpszr2#

kafka的consumerconfig类具有“consumer.timeout.ms”配置属性,kafka将该属性默认设置为-1。任何新的Kafka消费者都需要用合适的值覆盖该属性。
以下是Kafka文献的参考资料:

consumer.timeout.ms     -1  
By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption. By setting the value to a positive integer, a timeout exception is thrown to the consumer if no message is available for consumption after the specified timeout value.

当flume创建kafka通道时,它会将timeout.ms值设置为100,这在info级别的flume日志中可以看到。这就解释了为什么我们会看到大量的这些消费异常。

level: INFO Post-validation flume configuration contains configuration for agents: [agent]
 level: INFO Creating channels
 level: DEBUG Channel type org.apache.flume.channel.kafka.KafkaChannel is a custom type
 level: INFO Creating instance of channel c1 type org.apache.flume.channel.kafka.KafkaChannel
 level: DEBUG Channel type org.apache.flume.channel.kafka.KafkaChannel is a custom type
 level: INFO Group ID was not specified. Using flume as the group id.
 level: INFO {metadata.broker.list=kafka:9092, request.required.acks=-1, group.id=flume, 
              zookeeper.connect=zookeeper:2181,**consumer.timeout.ms=100**, auto.commit.enable=false}
 level: INFO Created channel c1

根据《关于kafka频道设置的flume用户指南》,我试图通过指定以下值来覆盖此值,但似乎不起作用:

agent.channels.c1.kafka.consumer.timeout.ms=5000

另外,我们做了一个负载测试,不断地通过通道冲击数据,在测试过程中没有出现这个异常。

wkftcu5l

wkftcu5l3#

我阅读了flume的源代码,发现flume读取“consumer.timeout.ms”的键“timeout”的值。
因此,您可以像这样配置“consumer.timeout.ms”的值: agent1.channels.kafka_channel.timeout=-1

相关问题