flume blob处理程序和hdfs sink:错误连接被对等方重置

bq8i3lrv  于 2021-06-04  发布在  Flume
关注(0)|答案(0)|浏览(303)

我有一个配置,使用这个curl命令将gzip数据从flume服务器发布到hdfs sink
curl-v-s--trace ascii http\u trace.log--data binary@amina.sh.gz-h“内容类型:text/xml”-h“内容编码:gzip”-x posthttp://localhost:9043
当我执行这个命令时,我得到这个错误

== Info: About to connect() to localhost port 9043 (#0)
== Info:   Trying ::1...
== Info: Connected to localhost (::1) port 9043 (#0)
=> Send header, 143 bytes (0x8f)
0000: POST / HTTP/1.1
0011: User-Agent: curl/7.29.0
002a: Host: localhost:9043
0040: Accept: */*
004d: Content-Type: text
0061: Content-Encoding: gzip
0079: Content-Length: 89
008d:
=> Send data, 89 bytes (0x59)
0000: ....GU.W..1.1.1.1_1.%.1..@...{.b.0..M.....).s1FQ.1....*......q.3
0040: ...5.V ...5......q..[]...
== Info: upload completely sent off: 89 out of 89 bytes
== Info: Recv failure: Connection reset by peer
== Info: Closing connection 0
My flume.conf looks like this a1.sources.r1.port = 5143

a1.sources.r2.port = 80

a1.sources.r3.port = 9043

a1.sources = r1 r2 r3
a1.sinks = k1 k2
a1.channels = c1

a1.sources.r1.type = com.baeai.logm.flume.sources.LogManagementThriftSource
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port=5143
a1.sources.r1.interceptors = ECEFExtractionInterceptor
a1.sources.r1.interceptors.ECEFExtractionInterceptor.type=com.baeai.logm.flume.interceptors.DataExtractionInterceptor$Builder
a1.sources.r1.interceptors.ECEFExtractionInterceptor.dateFormat=yyyyMMdd
a1.sources.r1.interceptors.ECEFExtractionInterceptor.headers.customerName=customerId
a1.sources.r1.interceptors.ECEFExtractionInterceptor.headers.ID=eventId
a1.sources.r1.interceptors.ECEFExtractionInterceptor.headers.receiptDate=agentReceiptTime
a1.sources.r1.interceptors.ECEFExtractionInterceptor.headertypes.receiptDate=date
a1.sources.r1.interceptors.ECEFExtractionInterceptor.name=i1

a1.sources.r2.type = http
a1.sources.r2.bind = 0.0.0.0
a1.sources.r2.port=80
a1.sources.r2.interceptors = ECEFExtractionInterceptor
a1.sources.r2.interceptors.ECEFExtractionInterceptor.type=com.baeai.logm.flume.interceptors.DataExtractionInterceptor$Builder
a1.sources.r2.interceptors.ECEFExtractionInterceptor.dateFormat=yyyyMMdd
a1.sources.r2.interceptors.ECEFExtractionInterceptor.headers.customerName=customerId
a1.sources.r2.interceptors.ECEFExtractionInterceptor.headers.ID=eventId
a1.sources.r2.interceptors.ECEFExtractionInterceptor.headers.receiptDate=agentReceiptTime
a1.sources.r2.interceptors.ECEFExtractionInterceptor.headertypes.receiptDate=date
a1.sources.r2.interceptors.ECEFExtractionInterceptor.name=i1

a1.sources.r3.type     = http
a1.sources.r3.handler     = org.apache.flume.sink.solr.morphline.BlobHandler
a1.sources.r3.port     = 9043

a1.sinks.k1.type = com.baeai.logm.flume.sinks.ElasticSearchSink
a1.sinks.k1.serverNodeList=per7-lm-els01:9300,per7-lm-els02:9300,per7-lm-els03:9300
a1.sinks.k1.indexFormat=%{customerName}-%{receiptDate}
a1.sinks.k1.idHeaderKey=ID
a1.sinks.k1.batchSize=2000
a1.sinks.k1.esConfig.cluster.name=log-management-01
a1.sinks.k1.esConfig.client.transport.ping_timeout=15s
a1.sinks.k1.typeInformation=/opt/flume/conf/types.conf
a1.sinks.k1.isInIntegration=true

# Describe the sink

a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://lm01acq03.test.usa.net:9000/user/logmgt
a1.sinks.k2.hdfs.filePrefix = log-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = minute

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.brokerList=per7-lm-flu01:9092,per7-lm-flu02:9092,per7-lm-flu03:9092
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.zookeeperConnect=per7-lm-flu01:2181,per7-lm-flu02:2181,per7-lm-flu03:2181
a1.channels.c1.topic=logm

a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题