尝试通过气流 GoogleCloudStorageToBigQueryOperator 运算符将数据从谷歌云存储加载到 bigquery 中。
出现以下错误。需要有关以下错误的建议。
Code:
load_into_bq = GoogleCloudStorageToBigQueryOperator(
task_id=get_task_id("load_into_bq", flow_name),
bucket='bigquery-source-replication',
source_objects=flow_details["gcs_csv_filename"],
destination_project_dataset_table='project-141508.dwh_test.datalake_production_products_intermediate',
source_format="CSV",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_APPEND",
autodetect=True,
google_cloud_storage_conn_id=GCP_CONN_ID,
bigquery_conn_id=BQ_CONN_ID,
dag=dag
)
LOGS:
[2021-06-10 09:56:54,522] {taskinstance.py:902} INFO - Executing <Task(GoogleCloudStorageToBigQueryOperator): flow_name_load_into_bq> on 2021-06-10T09:55:01.281248+00:00
[2021-06-10 09:56:54,599] {standard_task_runner.py:54} INFO - Started process 13009 to run task
[2021-06-10 09:56:54,854] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'mysql_to_gcs_data_dag', 'flow_name_load_into_bq', '2021-06-10T09:55:01.281248+00:00', '--job_id', '19338', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/mysql_gcs_bq_poc_sourav.py', '--cfg_path', '/tmp/tmpypa_dgaw']
[2021-06-10 09:56:54,860] {standard_task_runner.py:78} INFO - Job 19338: Subtask flow_name_load_into_bq
[2021-06-10 09:56:56,025] {logging_mixin.py:112} INFO - Running <TaskInstance: mysql_to_gcs_data_dag.flow_name_load_into_bq 2021-06-10T09:55:01.281248+00:00 [running]> on host airflow-worker-567675b8f5-t58ns
[2021-06-10 09:56:56,834] {gcp_api_base_hook.py:145} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.