为什么可选的flume通道会导致非可选的flume通道出现问题?

0md85ypi  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(318)

我有什么似乎是一个简单的Flume配置,这是给我很多问题。让我先描述一下问题,然后列出配置文件。
我有3台服务器:server1,server2,server3。
server1:netcat source/syslogtcp source(我在没有acks的netcat和syslogtcp上都测试过这个)2个内存通道2个avro接收器(每个通道一个)复制选择器,第二个内存通道可选
server2,3:avro源内存通道kafka接收器
在我的模拟中,server2模拟“生产”,因此不会发生任何数据丢失,而server3模拟“开发”,数据丢失很好。我的假设是,使用2个通道和2个源将使这两个服务器彼此解耦,如果server3宕机,它将不会影响sever2(特别是使用可选配置选项!)。然而,事实并非如此。当我运行我的模拟并用ctrl-c终止server3时,我在server2上遇到了减速,从server2到kafka接收器的输出变成了爬行。当我恢复server3上的flume代理时,一切都恢复正常。
我没想到会有这种行为。我所期望的是,因为我有两个通道和两个接收器,如果一个通道和/或接收器坏了,另一个通道和/或接收器应该没有问题。这是Flume的限制吗?这是我的源、汇或通道的限制吗?有没有一种方法让flume在我使用一个代理的情况下运行,其中多个通道和接收器相互解耦?我真的不想在一台机器上为每个“环境”(生产和开发)设置多个flume代理。附件是我的配置文件,以便您可以更专业地查看我所做的工作:
服务器1(第一层代理)


# Describe the top level configuration

agent.sources = mySource
agent.channels = defaultChannel1 defaultChannel2
agent.sinks = mySink1 mySink2

# Describe/configure the source

agent.sources.mySource.type = netcat
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.ack-every-event = false

# agent.sources.mySource.type = syslogtcp

# agent.sources.mySource.host = 0.0.0.0

# agent.sources.mySource.port = 7103

# agent.sources.mySource.eventSize = 150000

agent.sources.mySource.channels = defaultChannel1 defaultChannel2
agent.sources.mySource.selector.type = replicating
agent.sources.mySource.selector.optional = defaultChannel2

# Describe/configure the channel

agent.channels.defaultChannel1.type = memory
agent.channels.defaultChannel1.capacity = 5000
agent.channels.defaultChannel1.transactionCapacity = 200

agent.channels.defaultChannel2.type = memory
agent.channels.defaultChannel2.capacity = 5000
agent.channels.defaultChannel2.transactionCapacity = 200

# Avro Sink

agent.sinks.mySink1.channel = defaultChannel1
agent.sinks.mySink1.type = avro
agent.sinks.mySink1.hostname = Server2
agent.sinks.mySink1.port = 6666

agent.sinks.mySink2.channel = defaultChannel2
agent.sinks.mySink2.type = avro
agent.sinks.mySink2.hostname = Server3
agent.sinks.mySink2.port = 6666

server2“prod”flume代理


# Describe the top level configuration

agent.sources = mySource
agent.channels = defaultChannel
agent.sinks = mySink

# Describe/configure the source

agent.sources.mySource.type = avro
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.channels = defaultChannel

# Describe/configure the interceptor

agent.sources.mySource.interceptors = myInterceptor
agent.sources.mySource.interceptors.myInterceptor.type = myInterceptor$Builder

# Describe/configure the channel

agent.channels.defaultChannel.type = memory
agent.channels.defaultChannel.capacity = 5000
agent.channels.defaultChannel.transactionCapacity = 200

# Describe/configure the sink

agent.sinks.mySink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.mySink.topic = Server2-topic
agent.sinks.mySink.brokerList = broker1:9092, broker2:9092
agent.sinks.mySink.requiredAcks = -1
agent.sinks.mySink.batchSize = 100
agent.sinks.mySink.channel = defaultChannel

server3“dev”flume代理


# Describe the top level configuration

agent.sources = mySource
agent.channels = defaultChannel
agent.sinks = mySink

# Describe/configure the source

agent.sources.mySource.type = avro
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.channels = defaultChannel

# Describe/configure the interceptor

agent.sources.mySource.interceptors = myInterceptor
agent.sources.mySource.interceptors.myInterceptor.type = myInterceptor$Builder

# Describe/configure the channel

agent.channels.defaultChannel.type = memory
agent.channels.defaultChannel.capacity = 5000
agent.channels.defaultChannel.transactionCapacity = 200

# Describe/configure the sink

agent.sinks.mySink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.mySink.topic = Server3-topic
agent.sinks.mySink.brokerList = broker1:9092, broker2:9092
agent.sinks.mySink.requiredAcks = -1
agent.sinks.mySink.batchSize = 100
agent.sinks.mySink.channel = defaultChannel

谢谢你的帮助!

zzoitvuj

zzoitvuj1#

我将考虑调整此配置参数,因为它与内存通道有关:
agent.channels.defaultchannel.capacity=5000 agent.channels.defaultchannel.transactioncapacity=200
可能先尝试加倍,然后再次执行测试,您将看到改进:
agent.channels.defaultchannel.capacity=10000 agent.channels.defaultchannel.transactioncapacity=400
在测试期间,观察apacheflume示例的jvm也是一件好事

相关问题