使用Luigi,如何读取PostgreSQL数据,然后将这些数据传递到工作流程中的下一个任务?

2024-04-01

Using Luigi https://github.com/spotify/luigi,我想定义一个具有两个“阶段”的工作流程:

  • 第一个从 PostgreSQL 读取数据。
  • 第二个对数据做了一些事情。

因此我从子类化开始luigi.contrib.postgres.PostgresQuery并覆盖主机、数据库、用户等,如doc https://luigi.readthedocs.io/en/stable/_modules/luigi/contrib/postgres.html.

之后,如何将查询结果传递给工作流中的下一个任务呢?这样的下一个任务已经在requires方法上面的类必须被实例化并返回。

My code:

class MyData(luigi.contrib.postgres.PostgresQuery):

    host = 'my_host'
    database = 'my_db'
    user = 'my_user'
    password = 'my_pass'
    table = 'my_table'
    query = 'select *'

class DoWhateverWithMyData(luigi.Task):

    def requires(self):
        return MyData()

还需要什么?

提前致谢!

EDIT 1

看看 Luigi 的代码,似乎什么也没做run的方法PostgresQuery https://luigi.readthedocs.io/en/stable/_modules/luigi/contrib/postgres.html#PostgresQuery与查询的结果;我的意思是,查询已运行,​​仅此而已:

class PostgresQuery(rdbms.Query):
    """
    Template task for querying a Postgres compatible database

    Usage:
    Subclass and override the required `host`, `database`, `user`, `password`, `table`, and `query` attributes.
    Optionally one can override the `autocommit` attribute to put the connection for the query in autocommit mode.

    Override the `run` method if your use case requires some action with the query result.

    Task instances require a dynamic `update_id`, e.g. via parameter(s), otherwise the query will only execute once

    To customize the query signature as recorded in the database marker table, override the `update_id` property.
    """

    def run(self):
        connection = self.output().connect()
        connection.autocommit = self.autocommit
        cursor = connection.cursor()
        sql = self.query

        logger.info('Executing query from task: {name}'.format(name=self.__class__))
        cursor.execute(sql)

        # Update marker table
        self.output().touch(connection)

        # commit and close connection
        connection.commit()
        connection.close()


    def output(self):
        """
        Returns a PostgresTarget representing the executed query.

        Normally you don't override this.
        """
        return PostgresTarget(
            host=self.host,
            database=self.database,
            user=self.user,
            password=self.password,
            table=self.table,
            update_id=self.update_id
        )

我想我必须用我自己的实现来扩展这样的类。

EDIT 2

I found this https://groups.google.com/forum/#!topic/luigi-user/Nax5tm2QAx8链接解释与我上面的编辑相同。


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用Luigi,如何读取PostgreSQL数据,然后将这些数据传递到工作流程中的下一个任务? 的相关文章

  • Python:多处理和请求

    以下是我正在运行的使用多处理并行触发 HTTP 请求的代码片段 在控制台上运行后 它挂在 requests get url 处 既不继续前进也不抛出错误 def echo 100 q print before r requests get
  • 如何将经度和纬度转换为国家或城市?

    我需要将经度和纬度坐标转换为国家或城市 python中有这样的例子吗 提前致谢 我使用谷歌的API from urllib2 import urlopen import json def getplace lat lon url http
  • 使用 GeoDjango 在坐标系之间进行转换

    我正在尝试将坐标信息添加到我的数据库中 添加django contrib gis支持我的应用程序 我正在写一个south数据迁移 从数据库中获取地址 并向 Google 询问坐标 到目前为止 我认为我最好的选择是使用geopy为了这 接下来
  • 使用 python 通过搜索端点从 Spotify API 获取曲目

    因此 我尝试使用 API 的搜索端点进行搜索 从而从 Spotify API 获取曲目 请参阅文档 https developer spotify com documentation web api reference search sea
  • 在 Python 中比较日期 - 如何处理时区修饰符

    我正在做Python日期比较 假设我有一个这样的约会 Fri Aug 17 12 34 00 2012 0000 我按以下方式解析它 dt datetime strptime Fri Aug 17 12 34 00 2012 0000 a
  • 代理阻止网络套接字?如何绕行

    我有一个用 Python 编写的正在运行的 websocket 服务器 来自https github com opiate SimpleWebSocketServer https github com opiate SimpleWebSoc
  • 将 JSON 字符串传递给 Django 模板

    我一直在用头撞墙 试图找出为什么我无法将从 Django 模型生成的 JSON 字符串传递到模板的 javascript 静态文件中 事实证明 问题不在模型级别 使用serializers serialize 在脚本本身中放入相同的字符串将
  • 如何通过facebook-sdk python api获取用户帖子?

    我使用 facebook jssdk 授权我的应用程序读取用户个人资料和用户帖子 FB login function response scope user status user likes user photos user videos
  • 获取 zeep.exceptions.ValidationError:缺少与 suds 一起使用的方法的元素

    我正在移植开发的代码suds 0 6到zeep 2 4 0 以前的泡沫代码 client Client WSDLfile proxy proxy faults True config client factory create perUse
  • keras 预测内存交换无限期增加

    我使用keras实现了一个分类程序 我有一大组图像 我想使用 for 循环来预测每个图像 然而 每次计算新图像时 交换内存都会增加 我尝试删除预测函数内部的所有变量 并且我确信该函数内部存在问题 但内存仍然增加 for img in ima
  • 如何在Python中正确声明ctype结构+联合?

    我正在制作一个二进制数据解析器 虽然我可以依靠 C 但我想看看是否可以使用 Python 来完成该任务 我对如何实现这一点有一些了解 我当前的实现如下所示 from ctypes import class sHeader Structure
  • Python将csv数据导出到文件中

    我有以下运行良好的代码 但我无法修剪数据并将其存储在数据文件中 import nltk tweets love this car this view amazing not looking forward the concert def g
  • 散景中的时间序列流

    我想在散景中绘制实时时间序列 我只想在每次更新时绘制新的数据点 我怎样才能做到这一点 散景网站上有一个动画情节的示例 但它每次都需要重新绘制整个图片 另外 我正在寻找一个简单的示例 我可以在其中逐点绘制时间序列的实时绘图 散景效果0 11
  • 如何在 Spyder IDE 中安装 Selenium 包

    我刚刚在工作中安装了 Spyder IDE 仅 Spyder 不是整个 Anaconda 并且希望使用 FireFox 自动化我的工作 我的问题是 如何安装 Selenium 软件包 I figured it out Here is ins
  • Scrapy - 不会爬行

    我正在尝试运行递归爬行 由于我编写的爬行不能正常工作 因此我从网络上提取了一个示例并进行了尝试 我真的不知道问题出在哪里 但是爬行没有显示任何错误 谁能帮我这个 另外 是否有任何逐步调试工具可以帮助理解蜘蛛的爬行流程 非常感谢任何与此相关的
  • 写入 UDP 套接字会被阻塞吗?

    如果是的话 在什么条件下 或者 换句话说 在twisted 中运行此代码是否安全 class StatsdClient AbstractStatsdClient def init self host port super StatsdCli
  • 如何使 Postgres Copy 忽略大 txt 文件的第一行

    我有一个相当大的 txt 文件 9gb 我想将此 txt 文件加载到 postgres 中 第一行是标题 后面是所有数据 如果我直接 postgres COPY 数据 标头将导致数据类型与我的 postgres 表不匹配的错误 因此我需要以
  • 如何在 Python 中解析损坏的 XML?

    我无法影响的服务器发送的 XML 非常损坏 具体来说 Unicode WHITE STAR 将被编码为 UTF 8 E2 98 86 然后使用 Latin 1 转换为 HTML 实体表 我得到的是 acirc 98 86 9 个字节 位于声
  • Synapse Notebook 参考 - 使用参数从另一个笔记本调用 Synapse Notebook

    我有一个带有参数的突触笔记本 我试图从另一个笔记本调用该笔记本 我正在使用 run 命令 我应该如何将参数从基本笔记本传递到正在调用的笔记本 另外 对我来说 上述答案不起作用 作为对此问题的单独解决方案 下面是一个答案 打开笔记本并转到最右
  • 升级后 pip 损坏

    我做了 pip install U easyinstall 然后 pip install U pip 来升级我的 pip 但是 当我尝试使用 pip 时 我现在收到此错误 root d8fb98fc3a66 which pip usr lo

随机推荐

  • 在WPF中,如果窗口不在屏幕上,如何将其移动到屏幕上?

    如果我有一个窗口 如何确保该窗口永远不会隐藏在屏幕之外 这很重要 因为有时如果用户添加或删除监视器 如果我们记住了之前的位置 窗口可能会永久隐藏在屏幕之外 我在用WPF MVVM 这个答案已经在大规模的现实世界应用中得到了测试 从任何附加属
  • 代码块找不到我的编译器

    好的 所以我下载了 Codeblocks 并且我的计算机上已经有一个 cygwin 终端 我经常使用它来编译和运行 C 程序 但是当我尝试在 CodeBlocks 中构建程序时 它给了我一个错误 The compiler s setup G
  • ant 构建过程中代码修改的最佳实践

    承认 这听起来完全不像最佳实践 但让我解释一下 在构建过程中 我们需要将内部版本号和系统版本粘贴到一个类中 该类的唯一目的是包含这些值并使其可访问 我们的第一个想法是使用系统属性 但由于部署环境的波动性 另一种说法是 系统管理员正在做奇怪
  • PHP CURL 库中的curl --resolve 等效项

    是否有等效的curl resolve 在 PHP CURL 库中 背景 我有循环 DNS 一个域名解析为多个 IP 并且我想将请求发送到specific主持人 我使用基于 apache 名称的虚拟主机 因此 HTTP 请求中必须出现正确的域
  • 相机 2 CameraCharacteristics 似乎显示不正确的数据

    我已经下载并更改了 Google 的相机 2 基本版 https github com googlesamples android Camera2Basic例子 我的更改增加了对相机设备的迭代并显示了它们的一些特征 我创建了这个函数 pri
  • UIView 与 UIViewController

    好吧 我对 iPhone 开发真的很陌生 而且我已经了解了很多知识 我只需要帮助决定如何将这 4 6 张图片编程到我的项目中 我基本上想制作一本漫画书 用户能够从一张图片滑动到另一张图片 所有这些图片应该在 UIVIEW 还是 UIView
  • 为什么JSP文件不能超过64k

    当我编写 JSP 文件时 我收到错误 它不应超过 64 KB 然后我将代码分解到不同的文件 现在代码正在工作 为什么 JSP 存在这个限制 Java 对方法的大小有 64k 的限制 因此 当 jsp 转换为 jspService 方法时 如
  • 您知道针对多种编程语言的快速参考指南吗? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 在我的工作过程中 我维护多种编程语言的代码 如下所列 由于我还没有掌握其中的大部分内容 所以我一直忘记
  • GitHub 中的相关拉取请求是否可行?

    目前我正在处理一个非常大的拉取请求 为了以某种方式保持代码审查的可管理性 我们的想法是将完整的拉取请求分割成相互依赖的独立部分 一个例子是 拉取请求 1 创建接口 接口 A 和 B 并重构代码 拉取请求 2 接口 A 实现和测试 取决于拉取
  • 水平滚动图像列表

    我正在尝试创建一个水平滚动列表 当启用 Javascript 时 我将用一个奇特的版本替换它 但我希望标记和 css 在相当现代的浏览器上没有 Javascript 的情况下看起来和工作正常 任何以任何方式使用 Javascript 的建议
  • ADFS 2016、MVC 和 WEB.API 令牌和声明

    我目前正在开发一个带有 Web API 后端的 Net MVC 应用程序 这两个应用程序都使用 ADFS 2016 进行身份验证 Web API 本身按预期工作 但是我在 MVC 应用程序本身的 ADFS 配置方面遇到一些问题 基于本指南h
  • WP7 XNA 显示 3D FBX 模型

    我只是初学者 很抱歉我的愚蠢问题 我的模型看起来像这样 http img265 imageshack us img265 8291 clipboard01ap jpg http img265 imageshack us img265 829
  • 在 Python 3 中重新打开关闭的 stringIO 对象

    因此 我创建一个 StringIO 对象来将字符串视为文件 gt gt gt a Me you and them n gt gt gt import io gt gt gt f io StringIO a gt gt gt f read 1
  • 以编程方式退出应用程序的适当方法是什么?

    I am evaluating user inputs as commands for my application If the user presses Q or q and then hits enter the applicatio
  • 收到“无法检查可用内存”。在 Oracle DB 18c 上

    我正在尝试创建一个数据库以完成我的自学 但不幸的是我遇到了意外错误 无法检查可用内存 我正在使用数据库配置助手 DBCA 来完成此操作 我的Oracle数据库版本 添加以下参数来绕过错误 dbca J Doracle assistants
  • 加载图像停止问题

    我有窗户形式 我已经放入了一张加载图像PictureBox 当我加载表单时 我已经设置了 PictureBox1 Visible false 当我触发我设置的按钮的单击事件时 PictureBox1 Visible true 但在这种情况下
  • 静态常量和常量有什么区别?

    有什么区别static const and const 例如 static const int a 5 const int i 5 他们之间有什么区别吗 你什么时候会使用其中一种而不是另一种 static确定函数外部的可见性或内部变量的生命
  • JavaScript 对象的长度

    我有一个 JavaScript 对象 是否有内置或公认的最佳实践方法来获取该对象的长度 const myObject new Object myObject firstname Gareth myObject lastname Simpso
  • Firefox 11 调试器中的 firebug 1.9.1 不起作用

    当我激活脚本面板时 它显示 调试器未激活 但我无法在调试器上使 firebug 中断 或在断点处 我已经重新安装了插件 将其重置为默认设置 删除了 Firefox 配置文件 重新启动计算机 没有任何作用 这个版本一直有效 同样的事情也发生在
  • 使用Luigi,如何读取PostgreSQL数据,然后将这些数据传递到工作流程中的下一个任务?

    Using Luigi https github com spotify luigi 我想定义一个具有两个 阶段 的工作流程 第一个从 PostgreSQL 读取数据 第二个对数据做了一些事情 因此我从子类化开始luigi contrib