groovy 用表达式在NiFi中读取文件

ecfdbz9o  于 7个月前  发布在  其他
关注(0)|答案(1)|浏览(66)

我有一个由电子邮件触发的NiFi流。问题是ListFile和GetFile处理器不是从外部启动的。

我需要什么:

我有一些文件:

context variable #{folder_to_read} = /home/input
#{folder_to_read}/MyBranches_2023-10-22_225510.csv
#{folder_to_read}/MyAccounts_2023-10-22_225510.csv
#{folder_to_read}/MyOrders/USAOrders_2023-10-22_215510.csv
#{folder_to_read}/MyOrders/EUAOrders_2023-10-22_215610.csv
...

字符串
我需要:

  • 接收信件并启动流文件(由ConsumeEWS处理器完成)
  • 在输入文件夹中通过MyBranches_$(now():format('yyyy-MM_dd'))_*.csv等模式检查并读取文件
  • 操作文件中的数据(文件)。

问题是我找不到如何在一个管道中完成它。ListFile和GetFile处理器不能通过电子邮件发送,fenchfile不接受常规模式。
你能分享一下如何在NiFi中做到这一点吗?也许可以通过ExecuteScript处理器+ python/groovy来做到这一点?

p4tfgftt

p4tfgftt1#

下面的答案提供了groovy,只要问题所有者也接受这种语言。
我假设传入的flowfile看起来像这样:

#some comment
filename1.ext
filename2.ext2
subfolder/filename3.ext3

字符串
使用GroovyExecuteScript处理器,添加base_path参数,指向一个文件夹,其中包含您要读取的文件,并将脚本主体设置为:

def ff = session.get()
if(!ff) return

//read lines from incoming file and filter comments and empty lines
def lines = ff.read().withReader("UTF-8"){r-> r.readLines()}.findAll{s-> s && !s.startsWith('#')}

def outFiles = []
lines.each{s->
    def ffOut = ff.clone(false) //clone all attributes, but not content
    ffOut.filename = s
    new File("${base_path}/${s}").withInputStream{rawIn->
        ffOut.write{rawOut-> rawOut << rawIn} // import content from file
    }
    outFiles.add(ffOut)
}

ff.remove() //drop current flowfile
REL_SUCCESS << outFiles //transfer to success new file list

相关问题