调用 BashOperator 时出错:Bash 命令失败

2023-12-20

这是我的 dag 文件和 BashOperator 任务:

my_dag = { 
dag_id = 'my_dag',
start_date = datetime(year=2017, month=3, day=28),
schedule_interval='01***',
}

my_bash_task = BashOperator(
task_id="my_bash_task",
bash_command=bash_command,
dag=my_dag)

bash_command = "/home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh ""

按照此answer https://stackoverflow.com/questions/42259298/bashoperator-doent-run-bash-file-apache-airflow我什至在 bash 文件后面留了一个空格以避免找不到模板错误。但是在运行这个任务时给了我这个错误:airflow.exceptions.AirflowException:Bash 命令失败.

bash_command 文件内容为:

#!/bin/bash
DATABASE=db_name
FILE=$DATABASE-`date +%F-%H%M%S`.backup
export PGPASSWORD=password
pg_dump -h localhost -p 5432 -U developer -F c -b -v -f ~/Dropbox/database_backup/location/$FILE db_name
unset PGPASSWORD

但是,不要将 bash_command 指向 bash 文件,而是以多行字符串形式编写命令:

bash_command = """
DATABASE=db_name
FILE=$DATABASE-`date +%F-%H%M%S`.backup
export PGPASSWORD=password
pg_dump -h localhost -p 5432 -U developer -F c -b -v -f ~/Dropbox/database_backup/location/$FILE db_name
unset PGPASSWORD
"""

因此,我假设该错误不是因为 bash 命令造成的。 我什至尝试用 #!/bin/sh 替换 bash 文件中的 #!/bin/bash,但这也不起作用。

I ran sh db_back_up_bash.sh来自终结者,它工作正常。

Update The 实际代码:

bash_file_location_to_backup_db = '{{"/home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh"}}'
# bash_file_location_to_backup_db = "/home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh "
bash_command = """
DATABASE=ksaprice_scraping
FILE=$DATABASE-`date +%F-%H%M%S`.backup
export PGPASSWORD=password
pg_dump -h localhost -p 5432 -U developer -F c -b -v -f ~/Dropbox/database_backup/ksaprice/$FILE ksaprice_scraping
unset PGPASSWORD
"""

backup_scraped_db_in_dropbox_task = BashOperator(
    task_id="backup_scraped_db_in_dropbox_task",
    # bash_command=bash_command,# this works fine
    bash_command=bash_file_location_to_backup_db,#this give error :airflow.exceptions.AirflowException: Bash command failed
    dag=dag_crawl
)

错误跟踪:

[2017-04-11 20:02:14,905] {bash_operator.py:90} INFO - Output:
2017-04-11 20:02:14,905 | INFO| root : Output:
[2017-04-11 20:02:14,906] {bash_operator.py:94} INFO - /tmp/airflowtmp7FffJ2/backup_scraped_db_in_dropbox_taskQ6IVxm: line 1: /home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh: Permission denied
2017-04-11 20:02:14,906 | INFO| root : /tmp/airflowtmp7FffJ2/backup_scraped_db_in_dropbox_taskQ6IVxm: line 1: /home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh: Permission denied
[2017-04-11 20:02:14,906] {bash_operator.py:97} INFO - Command exited with return code 126
2017-04-11 20:02:14,906 | INFO| root : Command exited with return code 126
[2017-04-11 20:02:14,906] {models.py:1417} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
    result = task_copy.execute(context=context)
  File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 100, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
2017-04-11 20:02:14,906 | ERROR| root : Bash command failed
Traceback (most recent call last):
  File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
    result = task_copy.execute(context=context)
  File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 100, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2017-04-11 20:02:14,907] {models.py:1441} INFO - Marking task as FAILED.
2017-04-11 20:02:14,907 | INFO| root : Marking task as FAILED.
[2017-04-11 20:02:14,947] {models.py:1462} ERROR - Bash command failed
2017-04-11 20:02:14,947 | ERROR| root : Bash command failed
Traceback (most recent call last):
  File "/home/jak/my_projects/workflow_env/bin/airflow", line 28, in <module>
    args.func(args)
  File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 585, in test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
    result = task_copy.execute(context=context)
  File "/home/jak/my_projects/workflow_env/local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 100, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command faile

我认为这是气流中的一个错误,jinja 不应该期望 .sh 文件包含 BashOperator 中的模板信息。

我通过将命令转换成 Jinja 能够正确解释的格式来解决这个问题:

bash_command = '{{"/home/jak/my_projects/workflow_env/repo_workflow/db_backup_bash.sh"}}'

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

调用 BashOperator 时出错:Bash 命令失败 的相关文章

  • 更改随机森林分类器的阈值

    我需要开发一个没有 或接近没有 假阴性值的模型 为此 我绘制了召回率 精度曲线 并确定阈值应设置为 0 11 我的问题是 如何定义模型训练时的阈值 稍后在评估时定义它是没有意义的 因为它不会反映新数据 X train X test y tr
  • 在Python中不断寻找用户输入

    我将如何编写一个始终寻找用户输入的 Python 程序 我想我希望有一个等于输入的变量 然后根据该变量的等于值会发生不同的情况 因此 如果变量是 w 那么它将执行某个命令并继续执行 直到收到另一个输入 例如 d 然后会发生不同的情况 但直到
  • UnicodeDecodeError:“utf-8”编解码器无法解码位置 14 中的字节 0xb9:起始字节无效

    我正在使用 Django REST 进行文件上传测试 Python3 6 2Django1 11djangorest框架 3 6 4Excel OSX 15 38 170902 操作系统 10 12 6 过去使用普通照片文件可以成功完成此操
  • 如何仅选择数组中的第一列并对其求和?

    这是我的代码 import numpy as np contrainte1 1080 0 65 minutes tous les jours contrainte2 720 0 55 minutes du lundi au vendredi
  • Visual Studio Code:如何使用参数调试 Python 脚本

    我正在使用 Visual Studio Code 来调试 Python 脚本 下列的本指南 https code visualstudio com docs python debugging 我在中设置了参数launch json file
  • Seaborn regplot 中点和线的不同颜色

    中列出的所有示例西伯恩的regplot文档 https seaborn pydata org generated seaborn regplot html点和回归线显示相同的颜色 改变color争论改变了两者 如何为点设置与线不同的颜色 你
  • python blpapi安装错误

    我试图根据 README 中的说明为 python 安装 blpapi 3 5 5 但是在运行时 python setup py install 我收到以下错误 running install running build running b
  • 更改 numpy 数组的结构强制给定值

    如何缩小栅格数据的比例4 X 6大小成2 X 3如果 2 2 像素内的任何元素包含 1 则大小强制选择 1 否则选择 0 import numpy as np data np array 0 0 1 1 0 0 1 0 0 1 0 0 1
  • 类型错误:translate() 只接受一个参数(给定 2 个参数)[重复]

    这个问题在这里已经有答案了 我的代码在 python 2 x 版本上运行良好 但是当我尝试在 python 3 x 版本上运行它时 出现错误 主题 需要缩写短信编码中的任何消息 Code def sms encoding data star
  • 如何在seaborn中绘制离散变量的分布图

    当我画画的时候displot对于离散变量 分布可能不像我想象的那样 例如 We can find that there are crevices in the barplot so that the curve in kdeplot is
  • Django 说“id 可能不为 NULL”,但为什么会这样呢?

    我今天要疯了 我只是尝试插入一条新记录 但它返回了 post blogpost id 可能不为 NULL 错误 这是我的模型 class BlogPost models Model title models CharField max le
  • 监控培训课程如何运作?

    我试图理解使用之间的区别tf Session and tf train MonitoredTrainingSession 以及我可能更喜欢其中之一 似乎当我使用后者时 我可以避免许多 杂务 例如初始化变量 启动队列运行程序或设置文件编写器以
  • 如何在每次运行 python 程序时添加新列

    我希望我的表的第一列作为卷号 第二列作为名称 每当我运行 python 程序时 我想在表中添加一列日期 在这个新列中 我想填充从 user list 获得的列表将包含值 P A P P 等 如何处理 我尝试首先通过 alter 命令添加一列
  • Python 可选参数对

    我正在使用argparse模块获取两个可选的命令行参数 parser add argument start date nargs metavar START DATE help start date in YYYY MM DD parser
  • 在 Python 中将嵌套字典位置作为参数传递

    如果我有一个嵌套字典 我可以通过索引来获取键 如下所示 gt gt gt d a b c gt gt gt d a b c 我可以将该索引作为函数参数传递吗 def get nested value d path a b return d
  • 如何使用 Python 实现并行 gzip 压缩?

    使用python压缩大文件 https stackoverflow com questions 9518705 big file compression with python给出了一个很好的例子来说明如何使用例如bz2 纯粹用 Pytho
  • Pandas 中的数据透视表小计

    我有以下数据 Employee Account Currency Amount Location Test 2 Basic USD 3000 Airport Test 2 Net USD 2000 Airport Test 1 Basic
  • 获取 python 模块的 2 个独立实例

    我正在与以非 OO 方式编写的 python 2 x API 进行交互 它使用模块全局范围来处理一些内部状态驱动的东西 在它不再是单例的情况下需要它 并且修改原始代码 不是我们的 不是一个选择 如果不使用单独解释器的子进程运行 有什么方法可
  • 重写 PyGObject 中的虚拟方法

    我正在尝试实施高宽几何管理 http developer gnome org gtk3 3 2 GtkWidget html geometry management在 GTK 和 Python 中用于我的自定义小部件 我的小部件是来自的子类
  • Pandas 2 个字段中唯一值的数量

    我正在尝试查找覆盖 2 个字段的唯一值的数量 例如 一个典型的例子是姓氏和名字 我有一个数据框 当我执行以下操作时 我只获取每列的唯一字段数 在本例中为 最后一个 和 第一个 不是复合体 df Last Name First Name nu

随机推荐