flume不从kafka主题写入hdfs

0x6upsns  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(241)

我试图从kafka主题中读取并将其作为flume接收器存储到hdfs中,输入数据是json,下面是我的配置文件,


# components name

a1.sources  = source1
a1.channels = channel1
a1.sinks = sink1

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.zookeeperConnect = server1:port,server2:port,server3:port
a1.sources.source1.kafka.bootstrap.servers = server1:port,server2:port,server3:port
a1.sources.source1.kafka.topics = TOPIC
a1.sources.source1.kafka.consumer.group.id = flume
a1.sources.source1.channels = channel1
a1.sources.source1.interceptors = i1
a1.sources.source1.interceptors.i1.type = timestamp
a1.sources.source1.kafka.consumer.timeout.ms = 100

a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1
a1.channels.channel1.transactionCapacity = 1

a1.sinks.sink1.type = hdfs
a1.sinks.sink1.hdfs.path = /path/kafka/
a1.sinks.sink1.hdfs.rollInterval = 1
a1.sinks.sink1.hdfs.rollSize = 1
a1.sinks.sink1.hdfs.rollCount = 1
a1.sinks.sink1.hdfs.fileType = DataStream
a1.sinks.sink1.channel = channel1
a1.sinks.sink1.hdfs.idleTimeout = 10

这是Flumeng命令

flume-ng agent --conf-file /path/kafka_hdfs_sink.conf --conf /path/flume/conf/ --name a1;

以下是加载所有依赖项后的日志,

18/01/30 12:15:24 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
18/01/30 12:15:24 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/fk010191/kafka/kafka_hdfs_sink.conf
18/01/30 12:15:24 INFO conf.FlumeConfiguration: Processing:sink1
18/01/30 12:15:24 INFO conf.FlumeConfiguration: Processing:sink1
18/01/30 12:15:24 INFO conf.FlumeConfiguration: Processing:sink1
18/01/30 12:15:24 INFO conf.FlumeConfiguration: Added sinks: sink1 Agent: a1
18/01/30 12:15:24 INFO conf.FlumeConfiguration: Processing:sink1
18/01/30 12:15:24 INFO conf.FlumeConfiguration: Processing:sink1
18/01/30 12:15:24 INFO conf.FlumeConfiguration: Processing:sink1
18/01/30 12:15:24 INFO conf.FlumeConfiguration: Processing:sink1
18/01/30 12:15:24 INFO conf.FlumeConfiguration: Processing:sink1
18/01/30 12:15:24 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
18/01/30 12:15:24 INFO node.AbstractConfigurationProvider: Creating channels
18/01/30 12:15:24 INFO channel.DefaultChannelFactory: Creating instance of channel channel1 type memory
18/01/30 12:15:24 INFO node.AbstractConfigurationProvider: Created channel channel1
18/01/30 12:15:24 INFO source.DefaultSourceFactory: Creating instance of source source1, type org.apache.flume.source.kafka.KafkaSource
18/01/30 12:15:24 ERROR node.AbstractConfigurationProvider: Source source1 has been removed due to an error during configuration
org.apache.flume.conf.ConfigurationException: Kafka topic must be specified.
        at org.apache.flume.source.kafka.KafkaSource.doConfigure(KafkaSource.java:183)
        at org.apache.flume.source.BasicSourceSemantics.configure(BasicSourceSemantics.java:65)
        at org.apache.flume.source.AbstractPollableSource.configure(AbstractPollableSource.java:63)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:331)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
18/01/30 12:15:24 INFO sink.DefaultSinkFactory: Creating instance of sink: sink1, type: hdfs
18/01/30 12:15:24 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
18/01/30 12:15:24 INFO node.AbstractConfigurationProvider: Channel channel1 connected to [sink1]
18/01/30 12:15:24 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4c601ec4 counterGroup:{ name:null counters:{} } }} channels:{channel1=org.apache.flume.channel.MemoryChannel{name: channel1}} }
18/01/30 12:15:24 INFO node.Application: Starting Channel channel1
18/01/30 12:15:24 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: channel1: Successfully registered new MBean.
18/01/30 12:15:24 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: channel1 started
18/01/30 12:15:24 INFO node.Application: Starting Sink sink1
18/01/30 12:15:24 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
18/01/30 12:15:24 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started

当我停下来一段时间后,我看到如下,但它没有写入任何hdfs

^C18/01/30 12:21:16 INFO lifecycle.LifecycleSupervisor: Stopping lifecycle supervisor 23
18/01/30 12:21:16 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider stopping
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 stopped
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: sink1. sink.start.time == 1517336124877
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: sink1. sink.stop.time == 1517336476542
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: sink1. sink.batch.complete == 0
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: sink1. sink.batch.empty == 46
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: sink1. sink.batch.underflow == 0
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: sink1. sink.connection.closed.count == 0
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: sink1. sink.connection.creation.count == 0
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: sink1. sink.connection.failed.count == 0
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: sink1. sink.event.drain.attempt == 0
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: sink1. sink.event.drain.sucess == 0
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: channel1 stopped
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: channel1. channel.start.time == 1517336124874
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: channel1. channel.stop.time == 1517336476543
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: channel1. channel.capacity == 1
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: channel1. channel.current.size == 0
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: channel1. channel.event.put.attempt == 0
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: channel1. channel.event.put.success == 0
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: channel1. channel.event.take.attempt == 46
18/01/30 12:21:16 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: channel1. channel.event.take.success == 0

当我运行以下命令-bin/kafka-console-consumer.sh--引导服务器server1:port,server2:port,server3:port--主题--从一开始,我可以看到主题中的一些数据…但是flume没有向hdfs写入任何内容。非常感谢您的帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题