在flume中将csv文件转换为json

fquxozlt  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(754)

我正在尝试将一个csv文件从flume传递到kafka。我可以通过使用下面的配置文件直接传递文件,将整个文件从flume传递到kafka。


# Name the components on this agent

   a1.sources = r1
   a1.sinks = k1
   a1.channels = c1

  # Describe the source
  a1.sources.r1.type = exec
  a1.sources.r1.command = cat /User/Desktop/logFile.csv

   # Describe the sink
   a1.sinks.k1.type  = org.apache.flume.sink.kafka.KafkaSink
   a1.sinks.k1.topic = kafkaTopic
   a1.sinks.k1.brokerList = localhost:9092
   a1.sinks.sink1.batchSize = 20

   # Use a channel which buffers events in memory
   a1.channels.c1.type = memory
   a1.channels.c1.capacity = 10000
   a1.channels.c1.transactionCapacity = 10000

   # Bind the source and sink to the channel
   a1.sources.r1.channels = c1
   a1.sinks.k1.channel = c1

但我希望它在传递给kafka进行进一步处理之前转换成json格式。有人能告诉我如何将文件从csv格式转换成json格式吗。
谢谢!!

kr98yfug

kr98yfug1#

我想你需要自己写一个拦截器。
从实现拦截器接口开始
从flume事件体读取csv。
解析并编写json
把它贴回事件主体
例子:https://questforthought.wordpress.com/2014/01/13/using-flume-interceptor-multiplexing/

相关问题