在 Nifi 中从 Avro Schema 创建 Postgresql 表

2023-12-05

使用 InferAvroSchema 我获得了文件的 Avro 架构。我想使用此 Avro 架构在 PostregSql 中创建一个表。我必须使用哪个处理器。

我使用:GetFile->InferAvroSchema->我想从此架构创建一个表->放入databaseRecord。

avro 架构:

{
  "type" : "record",
  "name" : "warranty",
  "doc" : "Schema generated by Kite",
  "fields" : [ {
    "name" : "id",
    "type" : "long",
    "doc" : "Type inferred from '1'"
  }, {
    "name" : "train_id",
    "type" : "long",
    "doc" : "Type inferred from '21691'"
  }, {
    "name" : "siemens_nr",
    "type" : "string",
    "doc" : "Type inferred from 'Loco-001'"
  }, {
    "name" : "uic_nr",
    "type" : "long",
    "doc" : "Type inferred from '193901'"
  }, {
    "name" : "Configuration",
    "type" : "string",
    "doc" : "Type inferred from 'ZP28'"
  }, {
    "name" : "Warranty_Status",
    "type" : "string",
    "doc" : "Type inferred from 'Out_of_Warranty'"
  }, {
    "name" : "Warranty_Data_Type",
    "type" : "string",
    "doc" : "Type inferred from 'Real_based_on_preliminary_acceptance_date'"
  }, {
    "name" : "of_progression",
    "type" : "long",
    "doc" : "Type inferred from '100'"
  }, {
    "name" : "Delivery_Date",
    "type" : "string",
    "doc" : "Type inferred from '18/12/2009'"
  }, {
    "name" : "Warranty_on_Delivery_Date",
    "type" : "string",
    "doc" : "Type inferred from '18/12/2013'"
  }, {
    "name" : "Customer_Status",
    "type" : "string",
    "doc" : "Type inferred from 'homologation'"
  }, {
    "name" : "Commissioning_Date",
    "type" : "string",
    "doc" : "Type inferred from '6/10/2010'"
  }, {
    "name" : "Preliminary_acceptance_date",
    "type" : "string",
    "doc" : "Type inferred from '6/01/2011'"
  }, {
    "name" : "Warranty_Start_Date",
    "type" : "string",
    "doc" : "Type inferred from '6/01/2011'"
  }, {
    "name" : "Warranty_End_Date",
    "type" : "string",
    "doc" : "Type inferred from '6/01/2013'"
  }, {
    "name" : "Effective_End_Warranty_Date",
    "type" : [ "null", "string" ],
    "doc" : "Type inferred from 'null'",
    "default" : null
  }, {
    "name" : "Level_2_in_function",
    "type" : "string",
    "doc" : "Type inferred from '17/07/2015'"
  }, {
    "name" : "Baseline",
    "type" : "string",
    "doc" : "Type inferred from '2.10.23.4'"
  }, {
    "name" : "RELN_revision",
    "type" : "string",
    "doc" : "Type inferred from '0434-26.3'"
  }, {
    "name" : "TC_report",
    "type" : "string",
    "doc" : "Type inferred from 'A480140'"
  }, {
    "name" : "Last_version_Date",
    "type" : "string",
    "doc" : "Type inferred from 'A-23/09/2015'"
  }, {
    "name" : "ETCS_ID_NID_Engine",
    "type" : [ "null", "long" ],
    "doc" : "Type inferred from '13001'",
    "default" : null
  }, {
    "name" : "Item_Type",
    "type" : "string",
    "doc" : "Type inferred from 'Item'"
  }, {
    "name" : "Path",
    "type" : "string",
    "doc" : "Type inferred from 'sites/TrWMTISnerc_Community/Lists/X4Trains'"
  } ]
}

我的表创建表是:

Create table warranty(
  id    float,
  train_id float,
  siemens_nr    varchar(255),
  uic_nr    float,
  configuration varchar(255),
  warranty_status   varchar(255),
  warranty_data_type    varchar(255),
  of_progression    float,
  delivery_date varchar(255),
  warranty_on_delivery_date varchar(255),
  customer_status   varchar(255),
  commissioning_date    varchar(255),
  preliminary_acceptance_date   varchar(255),
  warranty_start_date   varchar(255),
  warranty_end_date varchar(255),
  effective_end_warranty_date   varchar(255),
  level_2_in_function   varchar(255),
  baseline  varchar(255),
  reln_revision varchar(255),
  tc_report varchar(255),
  last_version_Date varchar(255),
  etcs_id_nid_engine    float,
  item_type  varchar(255),
  path varchar(255)

)

我可以建议ExecuteGroovyScriptnifi v1.5+ 中的处理器

定义新属性SQL.mydb- 系统将提示您将其值链接到数据库(DBCPConnectionPool)

选择要创建表的数据库

并使用此脚本(假设 avro 架构位于流文件内容中)

import groovy.json.JsonSlurper

def ff = session.get()
if(!ff)return

//parse avro schema from flow file content
def schema = ff.read().withReader("UTF-8"){ new JsonSlurper().parse(it) }

//define type mapping
def typeMap = [
    "string"            : "varchar(255)",
    "long"              : "numeric(10)",
    [ "null", "string" ]: "varchar(255)",
    [ "null", "long" ]  : "numeric(10)",
]

assert schema.name && schema.name=~/^\w.*/

//build create table statement
def createTable = "create table ${schema.name} (" +
    schema.fields.collect{ "\n  ${it.name.padRight(39)} ${typeMap[it.type]}" }.join(',') +
    "\n)"

//execute statement through the custom defined property
//SQL.mydb references http://docs.groovy-lang.org/2.4.10/html/api/groovy/sql/Sql.html object
SQL.mydb.execute(createTable as String) //important to cast to String

//transfer flow file to success
REL_SUCCESS << ff
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Nifi 中从 Avro Schema 创建 Postgresql 表 的相关文章

随机推荐

  • 如何在Python中将浮点数格式化为固定宽度

    如何将浮点数格式化为固定宽度并满足以下要求 如果 n 添加尾随小数零以填充固定宽度 截断超过固定宽度的小数位 对齐所有小数点 例如 formatter something like 06 numbers 23 23 0 123334987
  • Apex 5:动态操作设置页面项值

    使用新的 apex 5 版本时 我遇到以下问题 无法通过plsql获取页面项的值 nv P2 TO P2 FROM lt lt lt DOESN T WORK I Yes P FROM exist and verified nv P2 TO
  • SPSS 按行分组并将字符串连接成一个变量

    我试图导出 SPSS 元数据使用 SPSS 语法转换为自定义格式 具有值标签的数据集包含一个或多个变量标签 但是 现在我想将每个变量的值标签连接成一个字符串 例如对于变量SEX将行组合或分组F Female and M Male转化为一个变
  • 在 R 中的 ggplot2 地图上叠加栅格图层?

    我正在尝试将栅格图层叠加到 ggplot 中的地图上 栅格图层包含卫星标签中每个时间点的似然面 我还想在栅格图层上设置累积概率 95 75 50 我已经弄清楚如何在 ggplot 地图上显示栅格图层 但坐标未彼此对齐 我尝试使每个都有相同的
  • 如何在 Netbeans 中使用 -g 选项进行编译?

    调试时 我收到一条有关异常的警告消息 variable info not available compiled without g 如何在 netbeans 中设置使用 g 进行编译 thanks 据我所知你的own代码是用调试信息编译的
  • MVC3 EF 工作单元 + 通用存储库 + Ninject

    我是 MVC3 的新手 一直在关注 asp net 网站上的精彩教程 然而 我不太清楚如何将工作单元和通用存储库模式与 Ninject 结合使用 我使用本教程作为起点 http www asp net mvc tutorials getti
  • 使用设备策略控制器在后台升级应用程序

    我有一个正在运行的 DPC 应用程序 它是设备所有者 我已在两台不同的 Android 6 0 1 设备上尝试过此操作 以排除任何设备 制造商问题 I used adb shell dpm set device owner com exam
  • VB.NET 中默认启用选项 Strict

    每当我创建一个新的 VB NET 程序时 我必须进入该项目的属性并将 Option strict 设置为打开 我可以这样做一次 这样每次创建新项目时它都是默认的吗 在 Visual Studio 中 转到菜单Tools gt Options
  • 训练后,TensorFlow 始终会收敛到所有项目的相同输出

    这是我正在使用的代码片段 import tensorflow as tf import numpy as np from PIL import Image from os import listdir nodes l1 500 nodes
  • Log4j2 覆盖过去一天的日志文件

    我正在使用 Log4j2 版本 2 3 log4j2 xml 如下所示
  • 使第二行填补上面的空白

    一个简单的 html css 问题 请看这个example 我希望第二行中的块能够填补它们上方的空白 除了使用 JavaScript 之外还有什么办法吗 block float left width 200px height 200px b
  • Lasagne 与 Theano 可能版本不匹配(Windows)

    所以我终于设法让 theano 启动并在 GPU 上运行this指导 测试代码运行良好 告诉我它使用了 GPU 耶 然后我想尝试一下并遵循this数字识别 CNN 训练指南 问题是 我从烤宽面条调用 theano 的方式中收到错误 我猜这里
  • 有没有办法进行内联多值比较?

    我什至觉得问这个问题很愚蠢 因为这看起来很微不足道 但我的大脑却让我失望了 如果我有以下内容 let a b c 1 1 1 有没有一种优雅的方法来确定 a b 和 c 是否都具有相同的值 就像是 let result a b c 这会失败
  • python中如何检查两个字符串是否有交集?

    例如 a abcdefg b krtol 它们没有交集 c hflsfjg 则a和c有交集 检查这个最简单的方法是什么 只需要一个 True 或 False 结果 def hasIntersection a b return not set
  • 使用powershell登录后如何从网站获取表数据?

    我的公司希望我从他们的内部网站获取数据 对其进行组织 然后将其发送到数据库 数据显示在您在站点内导航到的表格上 我想将这些字段提取到文件或内存中以进行进一步处理 到目前为止 我可以通过获取提交登录按钮的 ID 并传递我的用户名 密码来在 p
  • ffmpeg通过python子进程无法找到相机

    这里有一个奇怪的问题 我使用这个命令通过 ffmpeg 捕获我的网络摄像头 通过 Windows 上的 cmd ffmpeg y t 300 rtbufsize 1024M f dshow i video Lenovo EasyCamera
  • 视图需要相互依赖的逻辑:没有模型是否可行?

    我正在尝试编写一些 Oracle 11g SQL 但遇到了一些先有鸡还是先有蛋的问题 我正在寻找类似电子表格的行为 我找到了一个使用 Oracle 的解决方案MODEL条款 但性能并不好 所以我想知道是否 非MODEL 解决方案在技术上甚至
  • 在 django 项目中导入应用程序

    我在 django 项目中的另一个应用程序中导入应用程序时遇到问题 我知道有几个关于这个主题的问题 asnwsers 相信我 我读了很多 甚至还有一些关于 python import 的 这是我的项目树 我将放置真实的文件夹名称 was f
  • 将 URL 变量传递到 xsl 中

    是否可以将 URL 变量传递到 xsl xml 中 例如 http www somedomain com index aspx myVar test myVar2 anotherTest 我希望能够在 xsl 文件的逻辑中使用 myVar
  • 在 Nifi 中从 Avro Schema 创建 Postgresql 表

    使用 InferAvroSchema 我获得了文件的 Avro 架构 我想使用此 Avro 架构在 PostregSql 中创建一个表 我必须使用哪个处理器 我使用 GetFile gt InferAvroSchema gt 我想从此架构创