如何使用groovy或python来解码NiFi中的动态json数组值?

dauxcl2d  于 2022-09-21  发布在  Python
关注(0)|答案(2)|浏览(167)

我想解码NiFi json内容中动态大小数组的Base64编码字符串值。

我尝试使用EvaluateJsonPathUpdateAttributes,但数组大小是动态的,我无法像for loop一样获取所有元素的索引变量。

然后我也尝试使用更新记录。这对我来说是非常完美的。但在我的NIFI版本中,并没有包含NIFI录制路径的Base64Decode功能......

所以我想尝试使用脚本处理器。

如何使用流数据编写groovy/python脚本。

下面是输入和输出JSON示例。

输入JSON:

{
  "A": [
       {
         "CC" : "Encoded string",
         "DD" : "any string"
       },
       {
         "CC" : "Encoded string",
         "DD" : "any string"
       }
     ]
  "B": "any string"
}

输出JSON:

{
  "A": [
       {
         "CC" : "Decoded string",
         "DD" : "any string"
       },
       {
         "CC" : "Decoded string",
         "DD" : "any string"
       }
     ]
  "B": "any string"
}
wh6knrhe

wh6knrhe1#

在NiFi中,您可以通过以下代码使用ExecuteGroovyScrip

def flowFile = session.get()
if (flowFile != null) {

    def inputFlow = flowFile.read().getText("UTF-8").readLines()

    def search = inputFlow.findAll { it.contains('"CC" :') }
    def searchStr = search.toString()
    flowFile = session.putAttribute(flowFile, 'search', searchStr)
    session.transfer(flowFile, REL_SUCCESS)
}

现在你有了一个属性搜索,行为CC,或者你也可以在这个代码中添加Base64解码

享受

iyfamqjs

iyfamqjs2#

我参考他们的答案来解决我的问题,如下所示。

我必须解析文本,因为解码后的字符串也是Json字符串。

如果我不解析已解码的字符串,结果将包含“\”字符,因为它只是字符串,而不是Json字符串

谢谢大家!

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import groovy.json.JsonOutput; 
import org.apache.nifi.processor.io.StreamCallback

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
    return;
}

flowFile = session.write(flowFile, { inputStream, outputStream ->

    def content = inputStream.getText("UTF-8")

    def data = new JsonSlurper().parseText(content)    

    data.A.each{obj ->

         def decodedString = new String(obj["CC"].decodeBase64())
         def decodedJsonObject = new JsonSlurper().parseText(decodedString)
         obj["CC"] = decodedJsonObject
    }

    outputStream.write(JsonOutput.toJson(data).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)

相关问题