使用 Airflow 将 mysql 数据加载到 bigquery 的 dag 出现“无效参数传递”错误

2024-03-23

我运行一个 DAG,提取 MySQL 数据并将其加载到气流中的 BigQuery。我目前收到以下错误:

/usr/local/lib/python2.7/dist-packages/airflow/models.py:1927:PendingDeprecationWarning:无效参数传递给 MySqlToGoogleCloudStorageOperator。 Airflow 2.0 中将不再支持传递此类参数。无效参数是:

*参数:()

**kwargs:{'google_cloud_storage_connn_id':'podioGCPConnection'}类别= PendingDeprecationWarning

/usr/local/lib/python2.7/dist-packages/airflow/models.py:1927:PendingDeprecationWarning:无效参数传递给 GoogleCloudStorageToBigQueryOperator。 Airflow 2.0 中将不再支持传递此类参数。无效参数是:

*参数:()

**kwargs: {'project_id': 'podio-data'} 类别=PendingDeprecationWarning

dag 的代码在这里:

my_connections = [
    'podiotestmySQL'
]

my_tables = [
    'logistics_orders',
    'logistics_waybills',
    'logistics_shipping_lines',
    'logistics_info_requests'
]

default_args = {
    'owner' : 'tia',
    'start_date' : datetime(2018, 1, 2),
    'depends_on_past' : False,
    'retries' : 1,
    'retry_delay':timedelta(minutes=5),
}

dag = DAG('etl', default_args=default_args,schedule_interval=timedelta(days=1))

slack_notify = SlackAPIPostOperator (
    task_id = 'slack_notfiy',
    token = 'xxxxxx',
    channel='data-status',
    username = 'airflow',
    text = 'Successfully performed podio ETL operation',
    dag=dag)

for connection in my_connections:
    for table in my_tables: 
        extract = MySqlToGoogleCloudStorageOperator(
           task_id="extract_mysql_%s_%s"%(connection,table),
           mysql_conn_id = connection,
           google_cloud_storage_connn_id = 'podioGCPConnection',
           sql = "SELECT *, '%s' as source FROM podiodb.%s"%(connection,table),
           bucket='podio-reader-storage',
           filename= '%s/%s/%s{}.json'%(connection,table,table),
           schema_filename='%s/schemas/%s.json'%(connection,table),
           dag=dag)

       load =GoogleCloudStorageToBigQueryOperator(
           task_id = "load_bg_%s_%s"%(connection,table),
           bigquery_conn_id = 'podioGCPConnection',
           google_cloud_storage_conn_id = 'podioGCPConnection',
           bucket = 'podio-reader-storage',
           destination_project_dataset_table = "Podio_Data1.%s/%s"%(connection,table),
           source_objects = ["%s/%s/%s*.json"%(connection,table,table)],
           schema_object = "%s/schemas/%s.json"%(connection,table),
           source_format = 'NEWLINE_DELIMITED_JSON',
           create_disposition = 'CREATE_IF_NEEDED',
           write_disposition = 'WRITE_TRUNCATE',
           project_id = 'podio-data',
           dag=dag)

      load.set_upstream(extract)
      slack_notify.set_upstream(load)

在这里阅读源码:https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/gcs_to_bq.py https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/gcs_to_bq.py

请从默认参数中删除这些参数:

google_cloud_storage_connn_id = 'podioGCPConnection'
project_id = 'podio-data',

您需要在 Airflow 仪表板中创建连接。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Airflow 将 mysql 数据加载到 bigquery 的 dag 出现“无效参数传递”错误 的相关文章

  • 多处理中的动态池大小?

    有没有办法动态调整multiprocessing Pool尺寸 我正在编写一个简单的服务器进程 它会产生工作人员来处理新任务 使用multiprocessing Process对于这种情况可能更适合 因为工作人员的数量不应该是固定的 但我需
  • 如何返回 cost, grad 作为 scipy 的 fmin_cg 函数的元组

    我怎样才能使 scipy 的fmin cg使用一个返回的函数cost and gradient作为元组 问题是有f对于成本和fprime对于梯度 我可能必须执行两次操作 非常昂贵 grad and cost被计算 此外 在它们之间共享变量可
  • 按边距(“全部”)值列对 Pandas 数据透视表进行排序

    我试图根据 pandas 数据透视表中的行总和对最后一列 边距 aggrfunc 进行降序排序 我知道我在这里错过了一些简单的东西 但我无法弄清楚 数据框 数据透视表 WIDGETS DATE 2 1 16 2 2 16 2 3 16 Al
  • Matplotlib:如何有效地将大量线段着色为独立渐变

    Python 绘图库 如何有效地将大量线段着色为独立渐变 已经 阅读this https stackoverflow com questions 8500700 how to plot a gradient color line in ma
  • 使用 for 循环创建一系列元组

    我已经搜索过 但找不到答案 尽管我确信它已经存在了 我对 python 很陌生 但我以前用其他语言做过这种事情 我正在以行形式读取数据文件 我想将每行数据存储在它自己的元组中 以便在 for 循环之外访问 tup i inLine wher
  • Spark 和 Python 使用自定义文件格式/生成器作为 RDD 的输入

    我想问一下 Spark 中输入的可能性 我可以看到从http spark apache org docs latest programming guide html http spark apache org docs latest pro
  • 从 Flask 运行 NPM 构建

    我有一个 React 前端 我想在与我的 python 后端 API 相同的源上提供服务 我正在尝试使用 Flask 来实现此目的 但我遇到了 Flask 找不到我的静态文件的问题 我的前端构建是用生成的npm run build in s
  • Ubuntu systemd 自定义服务因 python 脚本而失败

    希望获得有关 Ubuntu 中的 systemd 守护进程服务的一些帮助 我写了一个 python 脚本来禁用 Dell XPS 上的触摸屏 这更像是一个问题 而不是一个有用的功能 该脚本可以工作 但我不想一直启动它 这就是为什么我想到编写
  • 使用另一个数据帧在数据帧中创建子列

    我对 python 和 pandas 很陌生 在这里 我有一个以下数据框 did features offset word JAPE feature manual feature 0 200 0 aa 200 200 0 200 11 bf
  • 可以使用哪些技术来衡量 pandas/numpy 解决方案的性能

    Question 如何简洁全面地衡量下面各个功能的性能 Example 考虑数据框df df pd DataFrame Group list QLCKPXNLNTIXAWYMWACA Value 29 52 71 51 45 76 68 6
  • Python bug - 或者我的愚蠢 - 扫描字符串文字时 EOL

    我看不出以下两行之间有显着差异 然而第一个解析 而后者则不解析 In 5 n Axis of Awesome In 6 n Axis of Awesome File
  • AWS Lambda 不读取环境变量

    我正在编写一个 python 脚本来查询 Qualys API 中的漏洞元数据 我在 AWS 中将其作为 lambda 函数执行 我已经在控制台中设置了环境变量 但是当我执行函数时 出现以下错误 module initialization
  • 如何对字段数据进行分组?

    我有 sql 查询来显示数据 SELECT artikel foto naam fotografer id fotografer name fotografer customer first name customer last name
  • pandas 中数据帧中的随机/洗牌行

    我目前正在尝试找到一种方法来按行随机化数据框中的项目 我在 pandas 中按列洗牌 排列找到了这个线程 在 pandas 中对 DataFrame 进行改组 排列 https stackoverflow com questions 157
  • 如何使用 python 定位和读取 Data Matrix 代码

    我正在尝试读取微管底部的数据矩阵条形码 我试过libdmtx http libdmtx sourceforge net 它有 python 绑定 当矩阵的点是方形时工作得相当好 但当矩阵的点是圆形时工作得更糟 如下所示 另一个复杂问题是在某
  • 如何关闭整个数据库的区分大小写

    我创建了一个包含许多脚本和许多存储过程的数据库 在这个数据库中 我们没有注意担心区分大小写 因为它对于我的本地开发计算机来说是关闭的 综上所述 我试图弄清楚如何使以下两条语句返回相同的结果 SELECT FROM companies SEL
  • 错误 1305 (42000):保存点...不存在

    我的 MYSQL 数据库中有这个 SQL 存储过程为空 所以我猜没有隐式提交 DROP PROCEDURE IF EXISTS doOrder DELIMITER CREATE PROCEDURE doOrder IN orderUUID
  • python从二进制文件中读取16字节长的双精度值

    我找到了蟒蛇struct unpack 读取其他程序生成的二进制数据非常方便 问题 如何阅读16 字节长双精度数出二进制文件 以下 C 代码将 1 01 写入二进制文件三次 分别使用 4 字节浮点型 8 字节双精度型和 16 字节长双精度型
  • 如何(安全)将 Python 对象发送到我的 Flask API?

    我目前正在尝试构建一个 Flask Web API 它能够在 POST 请求中接收 python 对象 我使用 Python 3 7 1 创建请求 使用 Python 2 7 运行 API 该 API 设置为在我的本地计算机上运行 我试图发
  • 如何统计订单总价?

    我有这些表 Orders id status user id address id 1 await 1 1 products id name price quantity 1 test1 100 5 2 test2 50 5 order p

随机推荐