如何在 Airflow 中运行异步函数?

2023-11-23

我正在编写一个气流任务来读取大型 csv 并将其保存到 postgresql 数据库。 我发现这个 asyncpg 包具有复制功能,其运行速度比任何其他包都要快得多。然而,它是异步的,我不知道如何将它合并到Airflow中。 这是示例代码:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from pandas import DataFrame
import asyncpg

async def to_sql(dataframe, table_name, schema_name='public', timeout=None, truncate=False):
    connection = await asyncpg.connect(user='postgres', host='host.docker.internal', database='quantaxis', password='123456')
    result = await connection.copy_records_to_table(
        table_name,
        records=dataframe.values.tolist(),
        columns=shared_columns,
        schema_name=schema_name,
        timeout=timeout)
    await connection.close()
    return result


default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('pythonexp2123', default_args=default_args, schedule_interval=timedelta(days=1))

async def save_file_to_database(ds):
    df = pd.read_csv("data{0}.csv".format(ds))
    r = await to_sql(df, 'test')
    return r

t1 = PythonOperator(
    task_id='pushing_task',
    provide_context=True,
    python_callable=save_file_to_database,
    dag=dag
    )

t1

当我运行它时,它会返回错误:

Can't Pickle Object <Corountine>

我怎样才能改变这个函数来让这个 Dag 工作?我仍然想使用 asyncpg 包,因为它的速度。


您可以尝试使用 asyncio 在事件循环中运行异步函数。 如果您使用的是 Python >= 3.7,您可以简单地调用asyncio.run(async_function()).

https://docs.python.org/3/library/asyncio-task.html

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from pandas import DataFrame
import asyncpg
import asyncio

async def to_sql(dataframe, table_name, schema_name='public', timeout=None, truncate=False):
    connection = await asyncpg.connect(user='postgres', host='host.docker.internal', database='quantaxis', password='123456')
    result = await connection.copy_records_to_table(
        table_name,
        records=dataframe.values.tolist(),
        columns=shared_columns,
        schema_name=schema_name,
        timeout=timeout)
    await connection.close()
    return result



default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('pythonexp2123', default_args=default_args, schedule_interval=timedelta(days=1))

async def save_file_to_database(ds):
    df = pd.read_csv("data{0}.csv".format(ds))
    r = await to_sql(df, 'test')
    return r

def run_async(ds):
   loop = asyncio.get_event_loop()
   result = loop.run_until_complete(save_file_to_database(ds))
   return result

t1 = PythonOperator(
    task_id='pushing_task',
    provide_context=True,
    python_callable=run_async,
    dag=dag
    )

t1
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Airflow 中运行异步函数? 的相关文章

  • 查找 with: 块中定义的函数

    这是一些代码理查德 琼斯的博客 http www mechanicalcat net richard log Python Something I m working on 3 with gui vertical text gui labe
  • 打印 scrapy 请求的“响应”

    我正在尝试学习 scrapy 在遵循教程的同时 我正在尝试进行细微的调整 我想简单地从请求中获取响应内容 然后我会将响应传递到教程代码中 但我无法发出请求并获取响应内容 建议就好 from scrapy http import Respon
  • Python 中的六边形自组织映射

    我在寻找六边形 自组织映射 http en wikipedia org wiki Self organizing map在Python上 准备好模块 如果存在的话 绘制六边形单元格的方法 将六边形单元作为数组或其他方式使用的算法 About
  • 用 Python 编写一个无操作或虚拟类

    假设我有这样的代码 foo fooFactory create 由于种种原因 fooFactory create 可能无法创建实例Foo 如果可以的话我想要fooFactory create 返回一个虚拟 无操作对象 这个对象应该是完全惰性
  • 如何用 python 和 sympy 解决多元不等式?

    我对使用 python 和 Sympy 还很陌生 并且遇到了使用 sympy 解决多元不等式的问题 假设我的文件中有很多函数 如下所示 cst sqrt x 2 cst exp sqrt cst x 1 4 log log sqrt cst
  • Python 中 genfromtxt() 的可变列数?

    我有一个 txt具有不同长度的行的文件 每一行都是代表一条轨迹的一系列点 由于每条轨迹都有自己的长度 因此各行的长度都不同 也就是说 列数从一行到另一行不同 据我所知 genfromtxt Python 中的模块要求列数相同 gt gt g
  • 无法包含外部 pandas 文档 Pycharm v--2018.1.2

    我无法包含外部 pandas 文档Pycharm v 2018 1 2 例如 numpy gt http docs scipy org doc numpy reference generated module name element na
  • 如何使用 openpyxl 对工作簿中的 Excel 工作表/选项卡进行排序

    我需要按字母数字对工作簿中的选项卡 工作表进行排序 我在用openpyxl https openpyxl readthedocs io en default 操作工作表 您可以尝试排序workbook sheets list workboo
  • Python While 循环,and (&) 运算符不起作用

    我正在努力寻找最大公因数 我写了一个糟糕的 运算密集型 算法 它将较低的值减一 使用 检查它是否均匀地划分了分子和分母 如果是 则退出程序 但是 我的 while 循环没有使用 and 运算符 因此一旦分子可整除 它就会停止 即使它不是正确
  • 在wxpython中使用wx.TextCtrl并在按钮单击后显示数据的简单示例 - wx新手

    我正在学习 python 并尝试使用 wxpython 进行 UI 开发 也没有 UI exp 我已经能够创建一个带有面板 按钮和文本输入框的框架 我希望能够在文本框中输入文本 并让程序在单击按钮后对输入框中的文本执行操作 我可以获得一些关
  • Python int 太大,无法放入 SQLite

    我收到错误 OverflowError Python int 太大 无法转换为 SQLite INTEGER 来自以下代码块 该文件约25GB 因此必须分部分读取 length 6128765 Works on partitions of
  • urllib2.urlopen() 是否实际获取页面?

    当我使用 urllib2 urlopen 时 我在考虑它只是为了读取标题还是实际上带回整个网页 IE 是否真的通过 urlopen 调用或 read 调用获取 HTML 页面 handle urllib2 urlopen url html
  • ValueError:无法插入 ID,已存在

    我有这个数据 ID TIME 1 2 1 4 1 2 2 3 我想按以下方式对数据进行分组ID并计算每组的平均时间和规模 ID MEAN TIME COUNT 1 2 67 3 2 3 00 1 如果我运行此代码 则会收到错误 ValueE
  • Python Flask 是否定义了路由顺序?

    在我看来 我的设置类似于以下内容 app route test def test app route
  • 是否可以写一个负的python类型注释

    这可能听起来不合理 但现在我需要否定类型注释 我的意思是这样的 an int Not Iterable a string Iterable 这是因为我为一个函数编写了一个重载 而 mypy 不理解我 我的功能看起来像这样 overload
  • 使用 Doc2vec 后如何解释 Clusters 结果?

    我正在使用 doc2vec 将关注者的前 100 条推文转换为矢量表示形式 例如 v1 v100 之后 我使用向量表示来进行 K 均值聚类 model Doc2Vec documents t size 100 alpha 035 windo
  • Plotly:如何避免巨大的 html 文件大小

    我有一个 3D 装箱模型 它使用绘图来绘制输出图 我注意到 绘制了 600 个项目 生成 html 文件需要很长时间 文件大小为 89M 这太疯狂了 我怀疑可能存在一些巨大的重复 或者是由单个项目的 add trace 方法引起的 阴谋 为
  • 是否可以强制浮点数的指数或有效数匹配另一个浮点数(Python)?

    这是我前几天试图解决的一个有趣的问题 是否可以强制一个的有效数或指数float与另一个人一样float在Python中 出现这个问题是因为我试图重新调整一些数据 以便最小值和最大值与另一个数据集匹配 然而 我重新调整后的数据略有偏差 大约小
  • Scrapy 蜘蛛无法工作

    由于到目前为止没有任何效果 我开始了一个新项目 python scrapy ctl py startproject Nu 我完全按照教程操作 创建了文件夹和一个新的蜘蛛 from scrapy contrib spiders import
  • 从 dask 数据框中的日期时间序列获取年份和星期?

    如果我有一个 Pandas 数据框和一个日期时间类型的列 我可以按如下方式获取年份 df year df date dt year 对于 dask 数据框 这是行不通的 如果我先计算 像这样 df year df date compute

随机推荐

  • 创建自定义简单光标适配器

    我想创建一个非常简单的光标自定义光标适配器 以方便在单击时更改行项目的颜色 使用以下代码 private static int save 1 public void onListItemClick ListView parent View
  • 创建多个轻量级 Google Cloud Functions 的最佳实践?

    Google Cloud Functions 的工作方式似乎是 你的模块进入一个functions目录 that functions目录然后包含一个package json文件包含所有模块之间的共享依赖项 每个模块可以包含许多导出函数 go
  • HTTP 错误 404.4 - 未找到您正在查找的资源没有与其关联的处理程序

    我在 IIS 中托管了一个网站 但每当我浏览该网站时 我都会收到 404 4 我该如何解决这个问题 我已经提到了几篇文章 他们都说问题与静态文件有关 但它已经被映射了 我还能做什么 这是我的 iis 7 0 中处理程序映射的附图 有任何想法
  • 使用 JFileChooser 将文件类型附加到 Java 中的文件

    我正在尝试使用 JFileChooser 保存图像 我只希望用户能够将图像保存为 jpg 格式 但是 如果他们不输入 jpg 则不会将其保存为图像 是否可以以某种方式将 jpg 附加到文件末尾 File file chooser getSe
  • 如何在 Bootstrap 中仅在特定屏幕尺寸上显示某些内容?

    我希望能够仅在 html 中显示图像md屏幕 我正在考虑隐藏图像sm向下 并躲避lg and up 我怎样才能做到这一点 在 Bootstrap v4 中 您可以使用这些类d none d md block d lg none使内容仅在媒体
  • UML泛化与实现

    我对 UML 还很陌生 所以我对泛化和实现有一些疑问 我正在对电子微控制器的行为进行建模 并且需要从 UML 描述生成 C 代码 据我所知 一个class realizes接口 这意味着它可以提供接口的实现 A概括两个类之间可能存在关系 在
  • Miller Rabin 素性测试准确性

    我知道米勒 拉宾素性检验是概率性的 不过我想用它来编程任务没有任何出错的余地 如果输入数字是 64 位整数 即 long long in C Miller Rabin is indeed probabilistic but you can
  • Java 中数组的排列(重复)

    网站上有一些类似的问题 这些问题对我有一些帮助 但我无法完全确定这个问题 所以我希望这不是重复的 这是一项家庭作业 其中您有一组字符 A B C 并且必须使用递归来获取所有排列 有重复 我的代码是这样做的 char c A B C publ
  • 使用 Karma (Jasmine) 测试 AngularJS 工厂

    我正在努力使用 Karma Jasmine 测试 AngularJS 工厂 我无法将我的工厂注入OfficerValidationService多变的 我究竟做错了什么 注意 文件已正确加载 Factory use strict angul
  • 如何在 Flutter 中监控剪贴板?

    我正在寻找一种在 Flutter 中监视剪贴板的方法 我能找到的与 Flutter 上的剪贴板交互相关的所有内容是 剪贴板类 有谁知道如何监控系统剪贴板中的新项目 最好使用插件 可能有点晚了 但仍然如此 不需要插件或库 解决方案可能非常简单
  • .Select、.Activesheet、.Activecell 等...

    对于这个问题 我参考下面的帖子来澄清一下 为什么我的条件格式在用VBA添加时会偏移 在我这些天看到的很多很多帖子中 OP 被默默地允许使用 Activate Select Offset 等 而它们却为潜在错误 通常是由最终用户引起的 敞开了
  • 计算平均置信区间而不存储所有数据点

    对于大型n 请参阅下文了解如何确定足够大的值 根据中心极限定理 可以安全地将样本均值的分布视为正态 高斯 但我想要一个程序 为任何给出一个置信区间n 实现这一点的方法是使用 Student T 分布n 1自由程度 所以问题是 给定您一次收集
  • Ebean多对多查询

    我有两个类 用户和汽车 两者都有 ManyToMany 相互映射 User Entity public class User extends Model private int year ManyToMany cascade Cascade
  • 方法中变量的最大数量

    我在这里闲着 所以我有这个好奇心 有人可以告诉我 C 中每个方法的最大变量数是多少 我只是尝试编译生成的程序源 其中包含 26 26 26 26 个局部变量 而不是方法参数 它们被称为 aaaa aaab aaac等等 我遇到了这个限制 错
  • Hibernate 搜索与 spring-data-solr 、 spring-data-elasticsearch

    我有一个 Spring Boot Spring Data JPA hibernate Web 应用程序 想要引入文本搜索功能 我了解以下内容 hibernate search 或 spring data 都可以集成到我的应用程序中 Hibe
  • 如何从嵌套类访问超类方法?

    我希望这段代码可以解释这个问题 class Foo void a stuff class Bar extends Foo void a throw new Exception This is not allowed for Bar clas
  • 如何使用 div 和 css 模拟表格?

    像这样的表 table tr td td td td td td tr tr td td td td td td tr table 如何使用 div 和 css display table 规则创建一个 尝试这个 CSS table dis
  • 具有多个值列的数据透视表/交叉表

    我有一个产生以下结果集的视图 CREATE TABLE foo AS SELECT client id asset type current value future value FROM VALUES 1 0 10 20 1 1 5 10
  • 如何在编写测试时检查实际的 Laravel 命令输出?

    我正在为 Laravel Artisan 控制台命令编写一个非常基本的测试 如下所示 this gt artisan my command some option gt some value gt expectsOutput the exp
  • 如何在 Airflow 中运行异步函数?

    我正在编写一个气流任务来读取大型 csv 并将其保存到 postgresql 数据库 我发现这个 asyncpg 包具有复制功能 其运行速度比任何其他包都要快得多 然而 它是异步的 我不知道如何将它合并到Airflow中 这是示例代码 fr