气流:GoogleCloudStorageToBigQueryOperator 错误

2024-01-09

尝试通过气流 GoogleCloudStorageToBigQueryOperator 运算符将数据从谷歌云存储加载到 bigquery 中。

出现以下错误。需要有关以下错误的建议。

Code:

load_into_bq = GoogleCloudStorageToBigQueryOperator(
            task_id=get_task_id("load_into_bq", flow_name),
            bucket='bigquery-source-replication',
            source_objects=flow_details["gcs_csv_filename"],
            destination_project_dataset_table='project-141508.dwh_test.datalake_production_products_intermediate',
            source_format="CSV",
            create_disposition="CREATE_IF_NEEDED",
            write_disposition="WRITE_APPEND",
            autodetect=True,
            google_cloud_storage_conn_id=GCP_CONN_ID,
            bigquery_conn_id=BQ_CONN_ID,
            dag=dag
        )

LOGS:

[2021-06-10 09:56:54,522] {taskinstance.py:902} INFO - Executing <Task(GoogleCloudStorageToBigQueryOperator): flow_name_load_into_bq> on 2021-06-10T09:55:01.281248+00:00
[2021-06-10 09:56:54,599] {standard_task_runner.py:54} INFO - Started process 13009 to run task
[2021-06-10 09:56:54,854] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'mysql_to_gcs_data_dag', 'flow_name_load_into_bq', '2021-06-10T09:55:01.281248+00:00', '--job_id', '19338', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/mysql_gcs_bq_poc_sourav.py', '--cfg_path', '/tmp/tmpypa_dgaw']
[2021-06-10 09:56:54,860] {standard_task_runner.py:78} INFO - Job 19338: Subtask flow_name_load_into_bq
[2021-06-10 09:56:56,025] {logging_mixin.py:112} INFO - Running <TaskInstance: mysql_to_gcs_data_dag.flow_name_load_into_bq 2021-06-10T09:55:01.281248+00:00 [running]> on host airflow-worker-567675b8f5-t58ns
[2021-06-10 09:56:56,834] {gcp_api_base_hook.py:145} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.

您的问题似乎与 GCP 连接有关,而不是与运营商本身有关。

有 3 种方法可以通过 GCP 进行身份验证:

  1. 使用申请默认凭证 https://google-auth.readthedocs.io/en/latest/reference/google.auth.html#google.auth.default
  2. Use a 服务帐户密钥 https://cloud.google.com/docs/authentication/#service_accounts磁盘上的文件(JSON 格式) - 密钥文件路径。
  3. 使用连接配置中的服务帐户密钥文件(JSON 格式) - Keyfile JSON。

由于您没有设置任何选项,因此您收到此错误。

First GoogleCloudStorageToBigQueryOperator已弃用。你应该导入GCSToBigQueryOperator as:

from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

对于气流 >= 2.0.0: 安装谷歌提供商 https://pypi.org/project/apache-airflow-providers-google/:

pip install apache-airflow-providers-google

安装后,您可以按照以下说明进行操作docs https://airflow.apache.org/docs/apache-airflow-providers-google/stable/connections/gcp.html#configuring-the-connection并在上面列出的任何选项中设置连接。

对于气流 Google 向后移植提供商:

pip install apache-airflow-backport-providers-google

安装后,您可以按照以下说明进行操作docs https://airflow.apache.org/docs/apache-airflow/1.10.15/howto/connection/gcp.html#configuring-the-connection并在上面列出的任何选项中设置连接。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

气流:GoogleCloudStorageToBigQueryOperator 错误 的相关文章

随机推荐

  • 从 selectKbest 获取特征名称

    我使用 Scikit 学习selectKbest从 900 个特征中选择大约 500 个最佳特征 如下 其中 d 是所有特征的数据框 from sklearn feature selection import SelectKBest chi
  • 如何更改小型转储中模块的校验和?

    我编写 和销售 的软件在分发之前经过压缩和加密 每次发布新版本时 我都会在压缩和加密之前保留所有 map 文件和生成的二进制文件 包括 exe 当它在客户端计算机上崩溃时 我会得到一个小型转储 我在 Visual Studio 中打开这些小
  • Matplotlib窗口出现在后面?

    每当我打电话show 在 matplotlib 中 绘图窗口出现在所有其他窗口后面 我必须最小化所有内容才能看到它 有什么方法可以阻止这种情况或以编程方式将其带到前台 在 OSX Lion 上 Python 2 7 出色地 这个答案是在已接
  • ifelse 命令中的多个 true 条件

    在R中我们使用ifelse 测试 是 否 命令 我面临的问题是 如果协调结果为真 我需要执行各种声明 例如 ifelse fp 月 1 fp sum sales 1 fp sum sales 2 0 所以我给出两个条件 if fp mont
  • 为什么我的 Django 表单一直显示“此字段为必填字段”

    有谁知道为什么我的表单 文件选择器 在更简单的版本中工作时不断返回 此字段是必需的 我的看法是 def add attempt request m id a id template loader get template add attem
  • 在 C++ 中将 pytorch 张量转换为 opencv mat,反之亦然

    我想在 C 中将 pytorch 张量转换为 opencv mat 反之亦然 我有这两个功能 cv Mat TensorToCVMat torch Tensor tensor std cout lt lt converting tensor
  • 我可以在 C 程序中使用 C++ 库吗?

    我正在用 C 编写一个程序 但我想使用像向量一样的动态库 是否可以在 C 程序中使用 C 库 Not std vector 不 任何模板化的东西都是正确的 一般来说 使用 C 代码并不有趣 但这是可以做到的 您必须将类包装在 C 代码可以调
  • Assembly.LoadFrom 和依赖项

    我一直在尝试使用 Assembly LoadFrom 设置一个插件系统 该系统动态加载 exe 目录的子文件夹中的 dll 我有一个由 exe 和插件引用的接口库 构建完成后 我将插件 dll 复制到子文件夹中 不同的插件可能有共同的库 所
  • Android 可绘制语音气泡

    我已经找了好几天了 但找不到任何关于如何绘制气泡或在哪里绘制 9 个补丁图像用作背景的好线索 我是一个糟糕的艺术家 有人可以帮忙吗 我找到的最好的示例位于 Stack Overflow 上 但它是用 Objective C 编写的 如何在i
  • Java 方法引用抛出 NPE

    于是我就上了一堂课 public class MenuBar extends JMenuBar MenuBarController controller public MenuBar JMenu menu new JMenu File me
  • Google Analytics 跟踪代码给出 CORS 错误

    我正在使用 在我看来 昨天从我的帐户获得的新 Google Analytics 跟踪代码 它使用了一些 全局站点标签 我已按照 GA 的指示将其放置在我的 顶部 但在加载页面时遇到跨源错误 因此 GA 无法正确跟踪我的网站 这是确切的消息
  • django Rest 框架中的 django_countries

    我正在尝试创建一个 API 它可以使用 django countries 返回所有国家 地区 我正在尝试以下操作 但它不起作用 作为一个单独的字段 它工作正常 但对于完整的国家 地区列表 它会给出错误 from django countri
  • 我们可以在 noexcept 规范中引用成员变量吗?

    请考虑以下代码片段 template
  • 如何使用 Spring Boot 将日志消息写入文件?

    我想将消息记录在文件中而不是控制台上 我使用的是Spring Boot 我的配置如下 应用程序属性 logging level DEBUG logging level ERROR logging file HOME application
  • JS/Firebase - messages.onBackgroundMessage 不是函数

    这是我的整个代码 import initializeApp from https www gstatic com firebasejs 9 16 0 firebase app js import getMessaging getToken
  • 在 Kubernetes 中您应该使用 PM2、节点集群还是两者都不使用?

    我正在将一些 NodeJS 代码部署到 Kubernetes 中 过去 您需要运行 PM2 或 NodeJS 集群模块才能充分利用多核硬件 现在我们有了 Kubernetes 目前尚不清楚是否必须使用其中之一才能充分发挥多核的优势 如果一个
  • C++ + OpenCV = 读取位置 0x02176000 访问冲突

    这段代码实际上以前一直有效 我现在不知道是什么导致它抛出错误 我实际上不记得对代码进行过任何更改 如下 它从文件读取图像到 OpenCV IplImage 对象 然后将其转换为 jpeg 缓冲区 IplImage fIplImageHead
  • Android Studio 3.0.1 NDK致命错误:stdint.h:没有这样的文件或目录

    我的应用程序具有本机代码 可以与以前的 Android Studio 版本完美运行 我刚刚更新到3 0 1并收到此错误 Users salman nazir Library Android sdk ndk bundle toolchains
  • 始终加密:无法使用密钥存储提供程序解密列加密密钥:“MSSQL_CERTIFICATE_STORE”

    我正在我的 Web 应用程序中执行 Always 加密 该应用程序通过由窗口服务托管的 WCF 服务插入数据 将数据插入加密列时出现以下错误 无法使用密钥存储提供程序解密列加密密钥 MSSQL CERTIFICATE STORE 加密列加密
  • 气流:GoogleCloudStorageToBigQueryOperator 错误

    尝试通过气流 GoogleCloudStorageToBigQueryOperator 运算符将数据从谷歌云存储加载到 bigquery 中 出现以下错误 需要有关以下错误的建议 Code load into bq GoogleCloudS