使用flume(spool目录作为源)将csv文件加载到hdfs中

eulz3vhy  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(379)

我正在尝试将csv文件(6mb)加载到hdfs中,使用flume和spooldir作为源,hdfs作为接收器,下面是我的配置文件:


# Initialize agent's source, channel and sink

agent.sources = TwitterExampleDir
agent.channels = memoryChannel
agent.sinks = flumeHDFS

# Setting the source to spool directory where the file exists

agent.sources.TwitterExampleDir.type = spooldir
agent.sources.TwitterExampleDir.spoolDir = /usr/local/word_count_input

# Setting the channel to memory

agent.channels.memoryChannel.type = memory

# Max number of events stored in the memory channel

agent.channels.memoryChannel.capacity = 10000

# agent.channels.memoryChannel.batchSize = 15000

agent.channels.memoryChannel.transactioncapacity = 1000000

# Setting the sink to HDFS

agent.sinks.flumeHDFS.type = hdfs
agent.sinks.flumeHDFS.hdfs.path = hdfs://192.168.220.128:8000/spool5
agent.sinks.flumeHDFS.hdfs.fileType = DataStream

# Write format can be text or writable

agent.sinks.flumeHDFS.hdfs.writeFormat = Text

# use a single csv file at a time

agent.sinks.flumeHDFS.hdfs.maxOpenFiles = 1

# rollover file based on maximum size of 10 MB

agent.sinks.flumeHDFS.hdfs.rollCount = 0
agent.sinks.flumeHDFS.hdfs.rollInterval = 0
agent.sinks.flumeHDFS.hdfs.rollSize = 1000
agent.sinks.flumeHDFS.hdfs.batchSize = 100

# never rollover based on the number of events

agent.sinks.flumeHDFS.hdfs.rollCount = 0

# rollover file based on max time of 1 min

# agent.sinks.flumeHDFS.hdfs.rollInterval = 0

# agent.sinks.flumeHDFS.hdfs.idleTimeout = 600

# Connect source and sink with channel

agent.sources.TwitterExampleDir.channels = memoryChannel
agent.sinks.flumeHDFS.channel = memoryChannel

在那之后我得到了这些错误并且不知道为什么:

2015-02-05 09:01:01,036 [SinkRunner-PollingRunner-DefaultSinkProcessor] ERROR org.apache.flume.sink.hdfs.HDFSEventSink (HDFSEventSink.java:466) - process failed
java.lang.UnsupportedOperationException: Not implemented by the DistributedFileSystem FileSystem implementation
        at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:216)
        at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2564)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:274)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:266)
        at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:722)
        at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:187)
        at org.apache.flume.sink.hdfs.BucketWriter.access$1700(BucketWriter.java:59)
        at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:719)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
2015-02-05 09:01:01,062 [SinkRunner-PollingRunner-DefaultSinkProcessor] ERROR org.apache.flume.SinkRunner (SinkRunner.java:160) - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.UnsupportedOperationException: Not implemented by the DistributedFileSystem FileSystem implementation
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:470)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: Not implemented by the DistributedFileSystem FileSystem implementation
        at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:216)
        at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2564)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:274)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:266)
        at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:722)
        at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:187)
        at org.apache.flume.sink.hdfs.BucketWriter.access$1700(BucketWriter.java:59)
        at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:719)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        ... 1 more

有谁能帮我解决这个问题吗?

4ioopgfo

4ioopgfo1#

我使用的是hortonworks沙盒v2.2,经过长时间的调试,我发现我手动安装的spark版本“v1.2”和hortonworks沙盒库之间存在一些冲突,所以我决定使用cloudera quickstart 5.3.0,现在一切正常

相关问题