如何在 Jython 中使用修改后的数据更新行?

2023-11-30

我有一个 csv 文件,其中包含数十万行,下面是一些示例行..,

1,Ni,23,28-02-2015 12:22:33.2212-02
2,Fi,21,28-02-2015 12:22:34.3212-02
3,Us,33,30-03-2015 12:23:35-01
4,Uk,34,31-03-2015 12:24:36.332211-02

我需要获取日期时间格式错误的 csv 数据的最后一列。所以我需要获得默认的日期时间格式("YYYY-MM-DD hh:mm:ss[.nnn]")来自数据的最后一列。

我尝试了以下脚本来从中获取行并写入流文件。

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
    for line in text[1:]:
        outputStream.write(line + "\n") 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename'))
  session.transfer(flowFile, REL_SUCCESS)

但我无法找到一种方法将其转换为如下输出。

1,Ni,23,28-02-2015 12:22:33.221
2,Fi,21,29-02-2015 12:22:34.321
3,Us,33,30-03-2015 12:23:35
4,Uk,34,31-03-2015 12:24:36.332

我已经和我的朋友(谷歌)检查了解决方案,但仍然找不到解决方案。

谁能指导我将这些输入数据转换为我所需的输出?


在这个转换中,不必要的数据位于每行的末尾,因此使用正则表达式管理转换任务非常容易。

^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?

在这里检查正则表达式和解释:https://regex101.com/r/sAB4SA/2

一旦您有一个大文件 - 最好不要将其加载到内存中。以下代码将整个文件加载到内存中:

IOUtils.readLines(inputStream, StandardCharsets.UTF_8)

最好逐行迭代。

所以这段代码是为了ExecuteScript使用 python (Jython) 语言的 nifi 处理器:

import sys
import re
import traceback
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
from java.lang import Class
from java.io import BufferedReader
from java.io import InputStreamReader
from java.io import OutputStreamWriter


class TransformCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        try:
            writer = OutputStreamWriter(outputStream,"UTF-8")
            reader = BufferedReader(InputStreamReader(inputStream,"UTF-8"))
            line = reader.readLine()
            p = re.compile('^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?')
            while line!= None:
                # print line
                match = p.search(line)
                writer.write( match.group(1) + (match.group(3) if match.group(3)!=None else '') )
                writer.write('\n')
                line = reader.readLine()
            writer.flush()
            writer.close()
            reader.close()
        except:
            traceback.print_exc(file=sys.stdout)
            raise


flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile, TransformCallback())

    # Finish by transferring the FlowFile to an output relationship
    session.transfer(flowFile, REL_SUCCESS)

一旦问题涉及到 nifi,这里有一些似乎更简单的替代方案


与上面相同的代码,但在 groovy for nifi 中ExecuteScript处理器:

def ff = session.get()
if(!ff)return
ff = session.write(ff, {rawIn, rawOut->
    // ## transform streams into reader and writer
    rawIn.withReader("UTF-8"){reader->
        rawOut.withWriter("UTF-8"){writer->
            reader.eachLine{line, lineNum->
                if(lineNum>1) { // # skip the first line
                    // ## let use regular expression to transform each line
                    writer << line.replaceAll( /^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?/ , '$1$3' ) << '\n'
                }
            }
        }
    }
} as StreamCallback)
session.transfer(ff, REL_SUCCESS)

替换文本处理器

如果正则表达式没问题 - nifi 中最简单的方法是ReplaceText可以进行正则表达式逐行替换的处理器。

在这种情况下,您不需要编写任何代码,只需构建正则表达式并正确配置您的处理器即可。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Jython 中使用修改后的数据更新行? 的相关文章

  • 漂亮的地图打印机会抛出类型错误

    我已经使用配置了漂亮的打印机http wiki eclipse org CDT User FAQ How can I inspect the contents of STL containers 3F http wiki eclipse o
  • Python 函数句柄 ala Matlab

    在 MATLAB 中可以创建function handles http www mathworks co uk help techdoc ref function handle html与类似的东西 myfun arglist body 这
  • matplotlib:调整图形窗口大小而不缩放图形内容

    当您调整图形大小时 Matplotlib 会自动缩放图形窗口中的所有内容 通常这是用户想要的 但我经常想增加窗口的大小 为其他东西腾出更多空间 在这种情况下 我希望在更改窗口大小时预先存在的内容保持相同的大小 有谁知道一个干净的方法来做到这
  • Pandas 在列级别连接数据帧时添加键

    根据 Pandas 0 19 2 文档 我可以提供keys参数来创建结果多索引 DataFrame 一个例子 来自 pandas 文档 是 result pd concat frames keys x y z 我将如何连接数据框以便我可以在
  • 从 Django 基于类的视图的 form_valid 方法调用特殊(非 HTTP)URL

    如果你这样做的话 有一个 HTML 技巧 a href New SMS Message a 点击新短信打开手机的本机短信应用程序并预 先填写To包含所提供号码的字段 在本例中为 1 408 555 1212 以及body与提供的消息 Hel
  • Colab 的使用限制持续多久?

    当我对同一帐户的两个笔记本同时使用两个 GPU 约半小时后 Colab 已 12 小时未运行 此消息不断弹出 由于 Colab 中的使用限制 您当前无法连接到 GPU 自从我上次使用 colab 以来已经过去了大约两个小时 但该消息仍然弹出
  • Scrapy Splash,如何处理onclick?

    我正在尝试抓取以下内容 我能够收到响应 但我不知道如何访问以下项目的内部数据以抓取它 我注意到访问这些项目实际上是由 JavaScript 和分页处理的 这种情况我该怎么办 下面是我的代码 import scrapy from scrapy
  • 在Python中清理属于不同语言的文本

    我有一个文本集合 其中的句子要么完全是英语 印地语或马拉地语 每个句子附加的 id 为 0 1 2 分别代表文本的语言 无论任何语言的文本都可能有 HTML 标签 标点符号等 我可以使用下面的代码清理英语句子 import HTMLPars
  • 为什么在 __init__ 函数中声明描述符类会破坏描述符功能?

    在下面的 B 类中 我想要 set 每当您赋值给 A 类中的函数时 就会调用该函数B a 相反 将值设置为B a覆盖B a与价值 C类分配给C a工作正常 但我想为每个用户类都有一个单独的 A 实例 即我不想在 C 的一个实例中更改 a 来
  • 检查列表是否已排序的 Pythonic 方法

    有没有一种Python式的方法来检查列表是否已经排序ASC or DESC listtimestamps 1 2 3 5 6 7 就像是isttimestamps isSorted 返回True or False 我想输入一些消息的时间戳列
  • 多个列表和大小的所有可能排列

    在 python 中使用以下命令很容易计算简单的排列itertools permutations https docs python org 3 library itertools html itertools permutations 你
  • Scapy:如何将新层(802.1q)插入现有数据包?

    我有一个数据包转储 想要将 VLAN 标记 802 1q 标头 注入到数据包中 怎么做 为了找到答案 我查看了Scapy 插入新层和记录问题 https stackoverflow com q 17259592 1381638 这确实很有帮
  • 在 Qt 5 中嵌入 Python

    我想将 Python 解释器嵌入到 Qt 5 应用程序中 我在 Qt 5 中有一个工作应用程序 但是当我把 include
  • 使用 os.forkpty() 创建一个伪终端以 ssh 到远程服务器并与其通信

    我正在尝试编写一个 python 脚本 它可以 ssh 到远程服务器 并可以从 python 客户端执行 ls cd 等简单命令 但是 在成功 ssh 到服务器后 我无法读取伪终端的输出 任何人都可以在这里帮助我 以便我可以在服务器上执行一
  • 如何删除 pip 安装的所有软件包?

    如何从当前激活的虚拟环境中卸载 pip 安装的所有软件包 我发现这个片段作为替代解决方案 与重新创建 virtualenv 相比 删除库更加优雅 pip freeze xargs pip uninstall y 如果您通过 VCS 安装了软
  • 向结构化 numpy 数组添加字段

    将字段添加到结构化 numpy 数组的最简洁方法是什么 是否可以破坏性地完成 或者是否有必要创建一个新数组并复制现有字段 每个字段的内容是否连续存储在内存中 以便可以有效地完成此类复制 如果您使用 numpy 1 3 还有 numpy li
  • 如何输入可变的默认参数

    Python 中处理可变默认参数的方法是将它们设置为无 https stackoverflow com a 366430 5049813 例如 def foo bar None bar if bar is None else bar ret
  • python:xml.etree.ElementTree,删除“命名空间”

    我喜欢 ElementTree 解析 xml 的方式 特别是 Xpath 功能 我有一个带有嵌套标签的应用程序的 xml 输出 我想按名称访问此标签而不指定名称空间 这可能吗 例如 root findall molpro job 代替 ro
  • 具有行业级约束的 SciPy 投资组合优化

    尝试在这里优化投资组合权重分配 通过限制风险来最大化我的回报函数 我可以毫无问题地通过简单的约束 所有权重之和等于 1 找到产生我的回报函数的优化权重 并做出另一个约束 即我的总风险低于目标风险 我的问题是 如何为每个组添加行业权重界限 我
  • Pandas 2 个字段中唯一值的数量

    我正在尝试查找覆盖 2 个字段的唯一值的数量 例如 一个典型的例子是姓氏和名字 我有一个数据框 当我执行以下操作时 我只获取每列的唯一字段数 在本例中为 最后一个 和 第一个 不是复合体 df Last Name First Name nu

随机推荐