在我的流程中了解 NIFI 中的通知和等待流程

2024-02-21

我是 NIFI 流程的新手,在我当前的工作中,我有通知和等待流程。有人可以帮助我理解这个流程吗

On what basis the Notify work. in my case we have 4 schema files process and 4 data files with respective those. the below details are notify properties.. enter image description here

Below are the Wait properties: enter image description here

***据我了解,等待进程寻找8个信号以继续处理下一级进程。但从技术上讲,这将如何运作还不确定。

如果有人提供有关此过程的详细技术级别解释,我们将不胜感激。


@Andy提到的博客对如何一起使用等待和通知给出了很好的解释:https://ijokarumawak.github.io/nifi/2017/02/02/nifi-notify-batch/ https://ijokarumawak.github.io/nifi/2017/02/02/nifi-notify-batch/

但是,如果您想要更深入的技术了解,我发现各个处理器的文档非常有用。

等待的描述 https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.Wait/index.html

将传入的 FlowFiles 路由到“等待”关系,直到匹配的释放信号从相应的通知处理器存储在分布式缓存中。当识别到匹配的释放信号时,等待的 FlowFile 被路由到“成功”关系,并具有从通知处理器产生释放信号的 FlowFile 复制的属性。然后从高速缓存中删除释放信号条目。如果等待的 FlowFiles 超过过期持续时间,它们将被路由到“过期”。如果您需要等待多个信号,请通过“目标信号计数”属性指定所需的信号数量。这对于将源 FlowFile 拆分为多个片段的处理器(例如 SplitText)特别有用。为了等待所有片段被处理,将“原始”关系连接到等待处理器,并将“分割”关系连接到相应的通知处理器。配置通知和等待处理器以使用“${fragment.identifier}”作为“释放信号标识符”的值,并在等待处理器中指定“${fragment.count}”作为“目标信号计数”的值当使用“等待”关系作为循环时,建议使用优先顺序(例如先进先出)。

通知说明 https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.Notify/index.html

在分布式缓存中缓存释放信号标识符,可选地与 FlowFile 的属性一起缓存。一旦发现缓存中的该信号,任何保存在相应等待处理器中的流文件都将被释放。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在我的流程中了解 NIFI 中的通知和等待流程 的相关文章

  • NiFi - 如何在 ExecuteStreamCommand 中引用 flowFile?

    我需要执行类似的操作 sed 1d simple tsv gt noHeader tsv 这将从我的大流文件 gt 1 GB 中删除第一行 问题是 我需要在我的流程文件上执行它 所以它是 sed 1d myFlowFile gt myFlo
  • 如何在NiFi中调用远程REST服务

    是否可以在NIFI处理器中使用xmlHttpRequest来调用远程休息服务 就我而言ExecuteScript处理器 使用Javascript 无法评估XMLHttpRequest 有没有类似的解决方案可以用来获取响应数据 var Out
  • Nifi:如何编写自定义处理器

    我想写nifi处理器 可以从hdfs目录读取xml文件 然后将其数据提取到flowfile属性中 如果有两个 nifi 处理器可以获取该文件并读取数据或向其中写入内容的情况我怎样才能进行文件锁定这样一次只有一个处理器可以使用它 您能给我推荐
  • PublishJMS 处理器无法将消息写入 IBM Websphere MQ

    我在通过 PublishJMS 处理器将消息发布到 IBM Websphere MQ 队列时遇到问题 配置PublishJMS处理器和JMS控制器服务 我已经验证MQ连接没有问题 我相信我需要在 PublishJMS 或控制器服务中设置一些
  • 如何将nif流文件从1.12版本迁移到1.16.3

    我有一个在 NiFi 1 12 0 中运行的数据流 此安装的相关属性如下 nifi sensitive props key nifi sensitive props key protected nifi sensitive props al
  • Nifi:nifi 中的线程

    我想知道线程在 nifi 中如何工作 我的意思是一个处理器有一个线程还是它们在一个主线程中 也许我想从处理器获取一个文件 然后我想更新它 如何防止多个处理器同时获取文件数据 除了使用 keep file false 操作 是否可以在执行脚本
  • PutSql - 日期格式错误

    我正在尝试从 Teradata 读取数据并将其填充到 Oracle 数据库 请在下面找到我的流程 ExecuteSQL gt SplitAvro gt ConvertAvroToJSON gt ConvertJSONToSQL gt Put
  • 删除NiFi中的空属性

    因为这个问题仍未解决 我有一个EvaluateJsonPath有时输出带有空字符串的属性的处理器 是否有直接的方法从流程文件中删除属性 我尝试使用UpdateAttributes处理器 但它只能根据匹配属性名称进行删除 我需要匹配属性的值
  • 如何在“ExecuteGroovyScript”处理器中使用“DBCPConnectionPoolLookup”?

    我想在 ExecuteGroovyScript 处理器中使用 DBCPConnectionPoolLookup 控制器服务 我设置 数据库名称 但我收到这个错误 这是 ExecuteGroovyScript 配置 I found someo
  • 动态 Jolt 规范用于处理 JSON 内的嵌套数组是否具有单个元素、多个元素或嵌套数组不存在

    动态 Jolt 规范用于处理 JSON 内的子数组是否包含单个元素 多个元素或子数组不存在 如果我们在 JSON 中嵌套数组有多个元素 那么我的 jolt 规范工作得很好 但是如果他们发送没有嵌套数组或有 1 个元素的嵌套数组的 JSON
  • 如何在Nifi getMongo查询字段中获取ISO字符串

    我正在尝试使用表达式语言使用以下查询在 Nifi getMongo 查询字段中生成 ISO 字符串 remindmeDate gte now format yyyy MM dd T HH mm ss SSS Z GMT lte now to
  • 如何清除NiFi队列?

    我们正在 NiFi 中创建一些流 在某些情况下可能会建立队列 但由于某种原因 流无法按预期工作 在一天结束时 我想清除队列并以某种方式实现自动化 问题是我们如何从后端删除队列 我们有什么办法可以实现这一目标吗 除了 Bryan 提到的明确的
  • 如何在NiFi中使用计数器的值

    在 NiFi 1 3 0 中 我创建了一个流程来分割 JSON 文件并使用名称更新计数器filenamecounter这样我就可以将每个拆分保存为不同的文件名 当我查看 NiFi 计数器窗格时 我看到计数器值已更新 但我怎样才能获取这个值呢
  • SQL Server 的 DBCPConnectionPool 控制器服务,jdbc 异常

    NiFi 1 1 1 在 Windows 7 和 RHEL 7 上进行了测试 后台线程是here https stackoverflow com questions 42765471 jdbc failing in custom proce
  • JOLT 移位转换以过滤数组中的值

    我想使用 JOLT 转换来做两件事 过滤名为 myarray 的数组中的元素 以便仅保留具有 v 518 属性的元素 过滤掉除 v 518 和 lfdn 之外的其余元素的所有属性 Input isError false isValid tr
  • 使用 NiFi 更新 CSV 内字段中的值

    我想实现一个简单的用例 使用 NiFi 将 CSV 内字段中的多个字符串 文本值更新为整数值 例如 我的 CSV 文件如下所示 输入 CSV 文件 字段 1 字段 2 美国 苹果 美国 苹果 印度 葡萄 中国城 奥兰治 澳大利亚民族 桃子
  • 为时间戳记录创建正确的 avro 架构

    我想知道对于这种格式的 json 到 avro 转换 正确的 avro 模式是什么 entryDate 2018 01 26T12 00 40 930 我的架构 type record name schema fields name ent
  • 通过 Apache Knox 网关访问 Apache NIFI REST API (jwt)

    我正在寻找配置 Apache 的资源KNOXTOKEN访问 Apache NIFI REST API 的服务 我已经有了KNOXSSO已配置 并且能够通过它访问 NIFI UI 但是 我找不到资源来通过 Curl 和 JWT 安全地访问 N
  • 如何使用 NiFi 将字符串转换为 JSON 数组

    在 NiFi 中 我正在处理包含以下属性的流文件 Key my array Value u firstElement u secondElement 我想在此数组上拆分 flowFile 以单独处理每个元素 然后合并 我尝试使用SplitJ
  • Apache Nifi:使用更新记录处理器替换列中的值

    我有一个 csv 看起来像这样 name code age Himsara 9877 12 John 9437721 16 Razor 232 45 我必须更换色谱柱code根据一些正则表达式 我的逻辑如下面的 Scala 代码所示 if

随机推荐