我正在 Groovy 脚本上使用 Apache NiFi 0.5.1,以将传入的 Json 值替换为映射文件中包含的值。映射文件如下所示(它是一个简单的 .txt):
Header1;Header2;Header3
A;some text;A2
我从以下几方面入手:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
def flowFile = session.get();
if (flowFile == null) {
return;
}
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def content = """
{
"field1": "A"
"field2": "A",
"field3": "A"
}"""
def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)
builder.content.field1 = "A"
builder.content.field2 = "some text"
builder.content.field3 = "A2"
outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
第一步工作得很好,尽管它是硬编码的并且远非理想。我最初的想法是使用 ReplaceTextWithMapping 来执行替换,但是它不适用于复杂的映射文件(例如多列)。我想更进一步,但我不知道如何去做。首先,我不想传递整个编码的 JSON,而是读取传入的流文件。在 NiFi 中这怎么可能呢?在作为 ExecuteScript 的一部分运行脚本之前,我已通过 UpdateAttribute 输出了一个包含内容的 .Json 文件,其中 filename = myResultingJSON.json。此外,我知道如何使用 Groovy 加载 .txt 文件(String mappingContent= new File('/path/to/file').getText('UTF-8'
),但是如何使用加载的文件来执行替换,以便生成的 JSON 看起来像这样:
{
"field1": "A"
"field2": "some text",
"field3": "A2"
}
感谢您的帮助,
I.
EDIT:
对脚本的第一次修改允许我从 InputStream 中读取:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
def flowFile = session.get();
if (flowFile == null) {
return;
}
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8)
def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)
builder.content.field1 = "A"
builder.content.field2 = "some text"
builder.content.field3 = "A2"
outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
然后,我开始使用 ConfigSlurper 测试该方法,并在将逻辑注入 Groovy ExecuteScript 之前编写一个通用类:
class TestLoadingMappings {
static void main(String[] args) {
def content = '''
{"field2":"A",
"field3": "A"
}
'''
println "This is the content of the JSON file" + content
def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)
println "This is the content of my builder " + builder
def propertiesFile = new File("D:\\myFile.txt")
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
def config = new ConfigSlurper().parse(props).flatten()
println "This is the content of my config " + config
config.each { k, v ->
if (builder[k]) {
builder[k] = v
}
}
println(builder.toPrettyString())
}
}
我返回了 groovy.lang.MissinPropertyException,这是因为映射并不那么简单。所有字段/属性(从 field1 到 field3)都具有相同的值(例如)进入 InpuStream,这意味着每次 field2 具有该值时,您都可以确定它对于其他两个属性有效。但是,我无法拥有映射“field2”:“someText”的映射字段,因为实际映射是由映射文件中的第一个值驱动的。这里有一个例子:
{
"field1": "A"
"field2": "A",
"field3": "A"
}
在我的映射文件中,我有:
A;some text;A2
但是,如果您愿意,field1 需要映射到 A(文件中的第一个值)或保持不变。 Field2 需要映射到最后一列 (A2) 中的值,最后 Field3 需要映射到中间列中的“某些文本”。
你能帮忙吗?这是我可以使用 Groovy 和 ExecuteScript 实现的目标吗?如果需要,我可以将配置文件分成两个。
另外,我快速浏览了另一个选项(PutDistributedMapCache),我不确定我是否理解如何将键值对加载到分布式地图缓存中。看起来您需要有一个 DistributedMapCacheClient,我不确定这是否容易实现。
谢谢你!
EDIT 2:
其他一些进展,我现在可以进行映射工作,但不确定为什么在读取属性文件的第二行时失败:
"A" someText
"A2" anotherText
class TestLoadingMappings {
static void main(String[] args) {
def content = '''
{"field2":"A",
"field3":"A"
}
'''
println "This is the content of the JSON file" + content
def slurper = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurper)
println "This is the content of my builder " + builder
assert builder.content.field2 == "A"
assert builder.content.field3 == "A"
def propertiesFile = new File('D:\\myTest.txt')
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
println "This is the content of the properties " + props
def config = new ConfigSlurper().parse(props).flatten()
config.each { k, v ->
if (builder.content.field2) {
builder.content.field2 = config[k]
}
if (builder.content.field3) {
builder.content.field3 = config[k]
}
println(builder.toPrettyString())
println "This is my builder " + builder
}
}
}
我返回的是:This is my builder {"field2":"someText","field3":"someText"}
知道为什么吗?
太感谢了
编辑3(从下面移动)
我写了以下内容:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
class TestLoadingMappings {
static void main(String[] args) {
def content =
'''
{"field2":"A",
"field3":"A"
}
'''
def slurper = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurper)
println "This is the content of my builder " + builder
def propertiesFile = new File('D:\\properties.txt')
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
def conf = new ConfigSlurper().parse(props).flatten()
conf.each { k, v ->
if (builder.content[k]) {
builder.content[k] = v
}
println("This prints the resulting JSON :" + builder.toPrettyString())
}
}
}
但是,我必须更改映射文件的结构,如下所示:
"field1"="substitutionText"
"field2"="substitutionText2"
然后,我将 ConfigSlurper“合并”到 ExecuteScript 脚本中,如下所示:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.StandardCharsets
def flowFile = session.get();
if (flowFile == null) {
return;
}
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)
outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
def propertiesFile = new File(''D:\\properties.txt')
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
def conf = new ConfigSlurper().parse(props).flatten();
conf.each { k, v ->
if (builder.content[k]) {
builder.content[k] = v
}
}
outputStream.write(content.toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
问题似乎是这样一个事实:我无法使用类似于在 TestLoadingMappings 中创建的内容来真正复制原始映射文件中的逻辑。正如我之前的评论/编辑中提到的,映射应该以这种方式工作:
field2 = 如果 A 则替换为“某些文本”
field3 = 如果 A 则替换为 A2
...
field2 = B 然后替换为“一些其他文本”
field3 = B 然后替换为 B2
和儿子。
简而言之,映射由 InputStream 中的传入值(不同)驱动,该值根据 JSON 属性有条件地映射到不同的值。您能否推荐一种更好的方法来通过 Groovy/ExecuteScript 实现此映射?我可以灵活地修改映射文件,您能找到一种方法来更改它以实现所需的映射吗?
Thanks