如何使用flume同时将数据写入kafka主题和hdfs

2fjabf4q  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(320)

我读了文本文件,想用flume把数据写入kafka主题和hdfs。你可以在下面看到我的配置。我在flume应用程序中实现了source,我从文本文件中读取并创建一个事件将其推送到hdfs,还想处理数据并将事件推送到kafka,但我不能同时编写这两个文件。
我的意思是我想创建另一个这样的事件,这将把被操纵的数据推到Kafka的另一个主题:

Event event = EventBuilder.withBody(newline.toString().getBytes("UTF-8"));
 pushEvent(event);

你能给我一些建议吗?

agent1.sources = source1 source2
agent1.sources.source1.type = com.intellimap.flume.source.FileSource
agent1.sources.source1.file.owner = myuser
agent1.sources.source1.input.dir = /data/inputdir
agent1.sources.source1.reader.count = 60
agent1.sources.source1.redis.ip=127.0.0.1
agent1.sources.source1.log.configuration=/appdata/myuser/FlumePlugin/log4j.properties
agent1.sources.source2.log.configuration=/appdata/myuser/FlumePlugin/log4j.properties
agent1.channels = channel2 channel3

agent1.channels.channel2.type = memory
agent1.channels.channel2.capacity = 10000000
agent1.channels.channel2.transactionCapactiy = 1000
agent1.channels.channel3.type = memory
agent1.channels.channel3.capacity = 1000000
agent1.channels.channel3.transactionCapactiy = 100

agent1.sources.source1.channels = channel2
agent1.sources.source2.channels = channel3

agent1.sinks = sink1 sink2
agent1.sources.source1.selector.type = multiplexing
agent1.sources.source1.selector.header =timestamp
agent1.sources.source1.selector.default = channel2
agent1.sources.source2.selector.type = multiplexing
agent1.sources.source2.selector.header =timestamp
agent1.sources.source2.selector.default = channel3

agent1.sinks.sink1.channel = channel2
agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink1.kafka.topic = topic_x
agent1.sinks.sink1.kafka.bootstrap.servers = pappd03:9092,pappd04:9092,pappd05:9092
agent1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
agent1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
agent1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

agent1.sources.source2.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.source2.channels = channel3
agent1.sources.source2.batchSize = 5000
agent1.sources.source2.batchDurationMillis = 2000
agent1.sources.source2.kafka.bootstrap.servers = pappd03:9092, pappd04:9092, pappd05:9092
agent1.sources.source2.kafka.topics = topic_x
agent1.sources.source2.kafka.consumer.security.protocol = SASL_PLAINTEXT
agent1.sources.source2.kafka.consumer.sasl.kerberos.service.name = kafka
agent1.sources.source2.kafka.consumer.group.id = intellimap

agent1.sinks.sink2.type = hdfs
agent1.sinks.sink2.channel = channel3
agent1.sinks.sink2.hdfs.useLocalTimeStamp = true
agent1.sinks.sink2.hdfs.path = /user/hive/warehouse/my.db/x/%Y%m%d%H
agent1.sinks.sink2.hdfs.filePrefix = xdr
agent1.sinks.sink2.hdfs.fileType = DataStream
agent1.sinks.sink2.hdfs.writeFormat = Text
agent1.sinks.sink2.hdfs.rollCount = 10000000
agent1.sinks.sink2.hdfs.rollSize = 1000
agent1.sinks.sink2.hdfs.batchSize = 100000
agent1.sinks.sink2.hdfs.codeC = snappy
agent1.sinks.sink2.hdfs.fileType = CompressedStream

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题