Apache NiFi ExecuteScript:通过映射文件替换 Json 值的 Groovy 脚本

2024-04-28

我正在 Groovy 脚本上使用 Apache NiFi 0.5.1,以将传入的 Json 值替换为映射文件中包含的值。映射文件如下所示(它是一个简单的 .txt):

Header1;Header2;Header3
 A;some text;A2

我从以下几方面入手:

import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper 
import java.nio.charset.StandardCharsets 

def flowFile = session.get(); 
if (flowFile == null) { 
    return; 
} 

flowFile = session.write(flowFile, 
        { inputStream, outputStream -> 

            def content = """ 
{ 
  "field1": "A"
  "field2": "A", 
  "field3": "A" 

}""" 

            def slurped = new JsonSlurper().parseText(content) 
            def builder = new JsonBuilder(slurped) 
            builder.content.field1 = "A"
            builder.content.field2 = "some text" 
            builder.content.field3 = "A2" 
            outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8)) 
        } as StreamCallback) 
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

第一步工作得很好,尽管它是硬编码的并且远非理想。我最初的想法是使用 ReplaceTextWithMapping 来执行替换,但是它不适用于复杂的映射文件(例如多列)。我想更进一步,但我不知道如何去做。首先,我不想传递整个编码的 JSON,而是读取传入的流文件。在 NiFi 中这怎么可能呢?在作为 ExecuteScript 的一部分运行脚本之前,我已通过 UpdateAttribute 输出了一个包含内容的 .Json 文件,其中 filename = myResultingJSON.json。此外,我知道如何使用 Groovy 加载 .txt 文件(String mappingContent= new File('/path/to/file').getText('UTF-8'),但是如何使用加载的文件来执行替换,以便生成的 JSON 看起来像这样:

{ 
  "field1": "A"
  "field2": "some text", 
  "field3": "A2" 
}

感谢您的帮助,

I.

EDIT:

对脚本的第一次修改允许我从 InputStream 中读取:

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
    return;
}

flowFile = session.write(flowFile,
        { inputStream, outputStream ->

            def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8)

            def slurped = new JsonSlurper().parseText(content)
            def builder = new JsonBuilder(slurped)
            builder.content.field1 = "A"
            builder.content.field2 = "some text" 
            builder.content.field3 = "A2" 
            outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
        } as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

然后,我开始使用 ConfigSlurper 测试该方法,并在将逻辑注入 Groovy ExecuteScript 之前编写一个通用类:

class TestLoadingMappings {

    static void main(String[] args) {

        def content = '''
         {"field2":"A",
         "field3": "A"
         }
         '''

        println "This is the content of the JSON file" + content

        def slurped = new JsonSlurper().parseText(content)
        def builder = new JsonBuilder(slurped)

        println "This is the content of my builder " + builder

        def propertiesFile = new File("D:\\myFile.txt")
        Properties props = new Properties()
        props.load(new FileInputStream(propertiesFile))
        def config = new ConfigSlurper().parse(props).flatten()

        println "This is the content of my config " + config

        config.each { k, v ->
            if (builder[k]) {
                builder[k] = v
            }
        }
        println(builder.toPrettyString())
    }

}

我返回了 groovy.lang.MissinPropertyException,这是因为映射并不那么简单。所有字段/属性(从 field1 到 field3)都具有相同的值(例如)进入 InpuStream,这意味着每次 field2 具有该值时,您都可以确定它对于其他两个属性有效。但是,我无法拥有映射“field2”:“someText”的映射字段,因为实际映射是由映射文件中的第一个值驱动的。这里有一个例子:

{ 
      "field1": "A"
      "field2": "A", 
      "field3": "A" 

 }

在我的映射文件中,我有:

A;some text;A2

但是,如果您愿意,field1 需要映射到 A(文件中的第一个值)或保持不变。 Field2 需要映射到最后一列 (A2) 中的值,最后 Field3 需要映射到中间列中的“某些文本”。

你能帮忙吗?这是我可以使用 Groovy 和 ExecuteScript 实现的目标吗?如果需要,我可以将配置文件分成两个。

另外,我快速浏览了另一个选项(PutDistributedMapCache),我不确定我是否理解如何将键值对加载到分布式地图缓存中。看起来您需要有一个 DistributedMapCacheClient,我不确定这是否容易实现。

谢谢你!

EDIT 2:

其他一些进展,我现在可以进行映射工作,但不确定为什么在读取属性文件的第二行时失败:

"A" someText
"A2" anotherText

class TestLoadingMappings {

    static void main(String[] args) {

        def content = '''
         {"field2":"A",
         "field3":"A"
         }
         '''

        println "This is the content of the JSON file" + content

        def slurper = new JsonSlurper().parseText(content)
        def builder = new JsonBuilder(slurper)

        println "This is the content of my builder " + builder

        assert builder.content.field2 == "A"
        assert builder.content.field3 == "A"

        def propertiesFile = new File('D:\\myTest.txt')
        Properties props = new Properties()
        props.load(new FileInputStream(propertiesFile))
        println "This is the content of the properties " + props
        def config = new ConfigSlurper().parse(props).flatten()

        config.each { k, v ->
            if (builder.content.field2) {

                builder.content.field2 = config[k]
            }
            if (builder.content.field3) {

                builder.content.field3 = config[k]
            }

            println(builder.toPrettyString())
            println "This is my builder " + builder
        }
    }
}

我返回的是:This is my builder {"field2":"someText","field3":"someText"}

知道为什么吗?

太感谢了

编辑3(从下面移动)

我写了以下内容:

    import groovy.json.JsonBuilder
    import groovy.json.JsonSlurper

    class TestLoadingMappings {

        static void main(String[] args) {

            def content =
            '''
            {"field2":"A",
             "field3":"A"
            }
            '''
            def slurper = new JsonSlurper().parseText(content)
            def builder = new JsonBuilder(slurper)

            println "This is the content of my builder " + builder

            def propertiesFile = new File('D:\\properties.txt')
            Properties props = new Properties()
            props.load(new FileInputStream(propertiesFile))
            def conf = new ConfigSlurper().parse(props).flatten()

            conf.each { k, v ->
            if (builder.content[k]) {
                builder.content[k] = v
            }
            println("This prints the resulting JSON :" + builder.toPrettyString())
        }
    }
}

但是,我必须更改映射文件的结构,如下所示:

"field1"="substitutionText"
"field2"="substitutionText2"

然后,我将 ConfigSlurper“合并”到 ExecuteScript 脚本中,如下所示:

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
    return;
}

flowFile = session.write(flowFile,
        { inputStream, outputStream ->

            def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

            def slurped = new JsonSlurper().parseText(content)
            def builder = new JsonBuilder(slurped)
            outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))

            def propertiesFile = new File(''D:\\properties.txt')
            Properties props = new Properties()
            props.load(new FileInputStream(propertiesFile))
            def conf = new ConfigSlurper().parse(props).flatten();

            conf.each { k, v ->
                if (builder.content[k]) {
                    builder.content[k] = v
                }
            }
            outputStream.write(content.toString().getBytes(StandardCharsets.UTF_8))
        } as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

问题似乎是这样一个事实:我无法使用类似于在 TestLoadingMappings 中创建的内容来真正复制原始映射文件中的逻辑。正如我之前的评论/编辑中提到的,映射应该以这种方式工作:

field2 = 如果 A 则替换为“某些文本”

field3 = 如果 A 则替换为 A2

...

field2 = B 然后替换为“一些其他文本”

field3 = B 然后替换为 B2

和儿子。

简而言之,映射由 InputStream 中的传入值(不同)驱动,该值根据 JSON 属性有条件地映射到不同的值。您能否推荐一种更好的方法来通过 Groovy/ExecuteScript 实现此映射?我可以灵活地修改映射文件,您能找到一种方法来更改它以实现所需的映射吗?

Thanks


我有一些关于如何读取包含 JSON 的流文件的示例:

http://funnifi.blogspot.com/2016/02/executescript-explained-split-fields.html http://funnifi.blogspot.com/2016/02/executescript-explained-split-fields.html http://funnifi.blogspot.com/2016/05/validating-json-in-nifi-with.html http://funnifi.blogspot.com/2016/05/validating-json-in-nifi-with.html http://funnifi.blogspot.com/2016/02/executescript-processor-replacing-flow.html http://funnifi.blogspot.com/2016/02/executescript-processor-replacing-flow.html

上面的结构是正确的;基本上,您可以在闭包中使用“inputStream”变量来读取传入的流文件内容。如果您想一次性读取所有内容(对于 JSON,您可能需要这样做),您可以使用 IOUtils.toString() 后跟 JsonSlurper,如上面链接中的示例所示。

对于您的映射文件,特别是如果您的 JSON 是“平面”的,您可以有一个 Java 属性文件,将字段名称映射到新值:

field2=一些文本

字段3=A2

查看ConfigSlurper http://docs.groovy-lang.org/latest/html/gapi/groovy/util/ConfigSlurper.html用于读取属性文件。

一旦您读取了传入的 JSON 文件并读取了映射文件,您就可以使用数组表示法而不是直接成员表示法来获取 JSON 的各个字段。假设我将属性读入 ConfigSlurper,并且我想用属性文件中的属性覆盖输入 JSON 中的任何现有属性(在示例中称为“json”)。这可能如下所示:

config.parse(props).flatten().each { k,v ->
  if(json[k]) {
    json[k] = v
  }
}

然后你可以继续你的outputStream.write()。

您还可以通过以下方式将其加载到分布式缓存中,而不是从文件中读取映射PutDistributedMapCache https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.PutDistributedMapCache/index.html处理器。您可以从 ExecuteScript 中的 DistributedCacheMapServer 读取数据,我在这里有一个示例:

http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html

如果您的映射很复杂,您可能需要使用 TransformJSON 处理器,该处理器将在 NiFi 的下一版本(0.7.0)中提供。相关的 Jira 案例在这里:

https://issues.apache.org/jira/browse/NIFI-361 https://issues.apache.org/jira/browse/NIFI-361

EDIT:

针对您的编辑,我没有意识到您对各种值有多个规则。在这种情况下,属性文件可能不是表示映射的最佳方式。相反,你可以使用 JSON:

{
  "field2": {
         "A": "some text",
         "B": "some other text"
       },
  "field3": {
         "A": "A2",
         "B": "B2"
       }
}

然后您可以使用 JSONSlurper 读取映射文件。以下是使用上述映射文件的示例:

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback

import java.nio.charset.StandardCharsets

def flowFile = session.get();
if (flowFile == null) {
    return;
}

def mappingJson = new File('/Users/mburgess/mappings.json').text

flowFile = session.write(flowFile, { inputStream, outputStream ->

    def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    def inJson = new JsonSlurper().parseText(content)
    def mappings = new JsonSlurper().parseText(mappingJson)

    inJson.each {k,v -> 
        inJson[k] = mappings[k][v]
    }
    outputStream.write(inJson.toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache NiFi ExecuteScript:通过映射文件替换 Json 值的 Groovy 脚本 的相关文章

  • 使用 jquery 迭代 JSON 对象

    为什么以下不起作用 在循环内 当 myJSON 为空或不为空时 它永远不会打印 url each parseJSON myJSON function key value alert value url 对于这个 JSON 结构 host f
  • (可选)根据运行时值序列化属性

    从根本上讲 我想根据序列化时的值包含或省略生成的 Json 中的属性 更具体地说 我有一个类型 它知道是否已为其分配了值 并且我只想序列化该类型的属性 如果有 has是分配给它的东西 所以我需要在运行时检查该值 我试图让我的 API 更容易
  • 詹金斯:${BUILD_LOG, maxLines, escapeHtml} 不起作用

    我正在尝试使用 BUILD LOG maxLines escapeHtml 如下所述 如何从 BUILD LOG 变量中获取最后 20 行 https stackoverflow com questions 16089096 how can
  • 有没有办法将 JSON 模式转换为 XSD? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我目前正在寻找一种将 JSON 架构转换为 XSD 或 XML 架构的方法 我没有找到任何关于这个主题
  • Fabric js-在保存和加载回画布时缺少添加附加属性的扩展 toObject 方法

    我创建了一个带有矩形和文本的织物组 最后 我使用以下代码将自定义属性 名称 添加到组类中 我使用 JSON stringify canvas 将画布数据序列化为 JSON 并将 Json 字符串发送到 java 最后将 Json 字符串保存
  • JAX-RS 多态 POST 请求:我应该如何编写 JSON?

    我在尝试用 JAX RS 解决这个问题时遇到了麻烦 我相信它与编组 解组过程有关 我认为我对此不太了解 并且我想重新创建这个 发帖的 REST 端点是 rest register 所以我的服务定义如下 ApplicationPath res
  • 将单引号括起来的数组转换为数组

    深度嵌套在 JSON 对象中我有属性value actions name InviteUser type button value Brian Timoney email protected cdn cgi l email protecti
  • 如何在 Groovy 中的 JSON Converter 方法中保留字母大小写?

    我正在尝试将 groovy 对象解析为 JSON 属性名称不遵循正确的驼峰式大小写形式 class Client String Name Date Birthdate 当我使用这个时 Client client new Client Nam
  • 等待 JavaScript 中 Json 调用完成

    我正在使用下面的json调用在我的 javascript 方法中 function go123 var cityName var temp getJSON https abc in api city callback args functi
  • 如何使用循环构建 json 对象?

    我正在尝试循环遍历多个项目 并创建一个 json 对象 每个循环都应该是对象上的一个新项目 但我在执行此操作时遇到了一些问题 似乎只添加了一组项目 而不是多个项目 这是我的代码 jsonObj rows each function inde
  • 如何使用 fs.copyTpl 忽略 Yeoman 中的文件

    我怎样才能忽略文件 我想排除任何子目录中以 开头的所有文件 我使用这两种方法没有成功 this fs copyTpl this templatePath basicFiles this destinationPath answers ign
  • 如何从 jQuery $.getJSON() 请求获取原始 JSON 响应?

    如何从 jQuery 获取原始 JSON 响应 getJSON 要求 我只想打印原始响应alert 我的浏览器中的对话 从 jQuery 1 5 开始jqXHR http api jquery com jQuery ajax jqXHR对象
  • 使用 jquery 迭代 json 数组

    已经有一些帖子和我遇到了类似的问题 如何使用 PHP 中的 jQuery AJAX 调用迭代 JSON 数组 https stackoverflow com questions 6472338 how do i iterate over a
  • 从 postgres 表中提取 json 数组给出错误:无法从标量中提取元素

    通过使用jsonb array elements 提取出来的函数jsonb来自 Postgres 的数据数组 它给出了错误 无法从标量中提取元素 我认为这是因为NULL在返回调用中 添加了NULL检查状况但不工作 任何帮助表示赞赏 sele
  • 无法读取解析推送通知包数据

    我尝试使用 Parse 推送通知服务发送自定义数据 但从 Bundle 中提取时总是返回 null 值 自定义广播接收器 Override public void onReceive Context context Intent inten
  • Rails 4 - 将地址保存为数据库中的一列

    我是 Rails 新手 正在开发一个简单的应用程序 我的 ERD 中有一个名为 Client 的模型 并且希望保存每个客户的地址 我最初的想法是将地址保存为单独的字段 即 rails g model Client address first
  • sqlalchemy 的 row_to_json 语法

    我想弄清楚如何将 Postgres 9 2 row to json 与 SqlAlchemy 一起使用 但是我无法想出任何有效的语法 details foo row q select Foo where Foo bar id Bar id
  • MySQL JSON 存储与两个表

    与使用单独的元表相比 使用 JSON 在表中存储数据有什么好处吗 这是原始架构 Users Table UserId Username Etc 5 John Avatar Table Id UserId ImageName ImageTyp
  • 如何将 JSON 字符串转换为图像?

    我有一个将图像转换为 JSON 数组的应用程序 并将其保存到 blob 字段中 function getImage String var memorystream TMemoryStream jsonArray TJSONArray beg
  • 如何在gradle中复制文件?

    我正在尝试将战争档案部署到 Tomcat 中 这是我编写的构建脚本 apply plugin war task deploy dependsOn war copy from build libs into E apache tomcat 8

随机推荐