Using Luigi https://github.com/spotify/luigi,我想定义一个具有两个“阶段”的工作流程:
- 第一个从 PostgreSQL 读取数据。
- 第二个对数据做了一些事情。
因此我从子类化开始luigi.contrib.postgres.PostgresQuery
并覆盖主机、数据库、用户等,如doc https://luigi.readthedocs.io/en/stable/_modules/luigi/contrib/postgres.html.
之后,如何将查询结果传递给工作流中的下一个任务呢?这样的下一个任务已经在requires
方法上面的类必须被实例化并返回。
My code:
class MyData(luigi.contrib.postgres.PostgresQuery):
host = 'my_host'
database = 'my_db'
user = 'my_user'
password = 'my_pass'
table = 'my_table'
query = 'select *'
class DoWhateverWithMyData(luigi.Task):
def requires(self):
return MyData()
还需要什么?
提前致谢!
EDIT 1
看看 Luigi 的代码,似乎什么也没做run
的方法PostgresQuery https://luigi.readthedocs.io/en/stable/_modules/luigi/contrib/postgres.html#PostgresQuery与查询的结果;我的意思是,查询已运行,仅此而已:
class PostgresQuery(rdbms.Query):
"""
Template task for querying a Postgres compatible database
Usage:
Subclass and override the required `host`, `database`, `user`, `password`, `table`, and `query` attributes.
Optionally one can override the `autocommit` attribute to put the connection for the query in autocommit mode.
Override the `run` method if your use case requires some action with the query result.
Task instances require a dynamic `update_id`, e.g. via parameter(s), otherwise the query will only execute once
To customize the query signature as recorded in the database marker table, override the `update_id` property.
"""
def run(self):
connection = self.output().connect()
connection.autocommit = self.autocommit
cursor = connection.cursor()
sql = self.query
logger.info('Executing query from task: {name}'.format(name=self.__class__))
cursor.execute(sql)
# Update marker table
self.output().touch(connection)
# commit and close connection
connection.commit()
connection.close()
def output(self):
"""
Returns a PostgresTarget representing the executed query.
Normally you don't override this.
"""
return PostgresTarget(
host=self.host,
database=self.database,
user=self.user,
password=self.password,
table=self.table,
update_id=self.update_id
)
我想我必须用我自己的实现来扩展这样的类。
EDIT 2
I found this https://groups.google.com/forum/#!topic/luigi-user/Nax5tm2QAx8链接解释与我上面的编辑相同。