气流:Dag 每隔几秒安排两次

2024-04-24

我尝试每天仅运行一次 DAG00:15:00(午夜 15 分钟),然而,它被安排了两次,间隔几秒钟。

dag = DAG(
    'my_dag',
    default_args=default_args,
    start_date=airflow.utils.dates.days_ago(1) - timedelta(minutes=10),
    schedule_interval='15 0 * * * *',
    concurrency=1,
    max_active_runs=1,
    retries=3,
    catchup=False,
)

该 Dag 的主要目标是检查新电子邮件,然后检查 SFTP 目录中的新文件,然后运行“合并”任务以将这些新文件添加到数据库中。

所有作业都是 Kubernetes Pod:

email_check = KubernetesPodOperator(
    namespace='default',
    image="g.io/email-check:0d334adb",
    name="email-check",
    task_id="email-check",
    get_logs=True,
    dag=dag,
)
sftp_check = KubernetesPodOperator(
    namespace='default',
    image="g.io/sftp-check:0d334adb",
    name="sftp-check",
    task_id="sftp-check",
    get_logs=True,
    dag=dag,
)
my_runner = KubernetesPodOperator(
    namespace='default',
    image="g.io/my-runner:0d334adb",
    name="my-runner",
    task_id="my-runner",
    get_logs=True,
    dag=dag,
)
my_runner.set_upstream([sftp_check, email_check])

所以,问题是似乎有两次运行DAG 预定的相隔几秒钟。他们不run同时进行,但一旦第一个完成,第二个就开始。

这里的问题是my_runnerjob 的目的是每天只运行一次:它尝试创建一个以日期为后缀的文件,如果该文件已经存在,它会抛出异常,因此第二次运行总是会抛出异常(因为当天的文件第一次运行时已经正确创建)

由于一张(或两张)图像相当于一千个单词,因此如下:

您将看到已安排第一次运行“00:15 后 22 秒“(这很好......有时它会在这里或那里变化几秒钟)然后还有第二个似乎总是被安排好的"5800:15 UTC 后的秒数”(至少根据他们得到的名字)。因此,第一个运行正常,似乎没有其他运行......并且一旦完成运行,就进行第二次运行(安排在00:15:58)启动(并失败)。

一个“好”的:

一个“坏”的:


您可以检查计划间隔参数吗? schedule_interval='15 0 * * * *'. The cron schedule takes only 5 parameters and I see an extra star. 另外,你能固定开始日期吗? start_date: datetime(2019, 11, 10)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

气流:Dag 每隔几秒安排两次 的相关文章

  • 如何为一系列任务设计执行引擎

    我正在尝试用 Java 编写一个问题 我必须执行一堆任务 Problem 执行由多个任务组成的作业 并且这些任务之间具有依赖关系 一个作业将有一个任务列表 每个这样的任务将进一步有一个后续任务列表 每个后续任务将有自己的后续任务 您可以在此
  • 运行 php 脚本的 Bash 脚本

    我有一个 php 脚本 我想使用 bash 脚本运行它 所以我可以使用 Cron 每分钟左右运行 php 脚本 据我所知 我需要创建 bash 脚本来处理 php 脚本 然后我才能使用 Cron 工具 计时器 到目前为止 我被告知我需要输入
  • BashOperator 为其他 PythonOperators 中使用的库引发 ImportError

    我的 dag 构建器模块中有一组任务 该模块使用 Airflow 中全球使用的 Python 运算符 我正在 kubernetes 上使用 docker 部署气流 任务失败并显示错误消息 no module named pandas 使用
  • MongoDB - WiredTiger 快照与锁定

    我不完全理解这两个功能在 WiredTiger MongoDB 程序中如何相互关联 1 WiredTiger 快照 2 数据锁定 如果使用WiredTiger引擎的每个读操作在读时都提供了数据库级别的 快照 以便创建一致性 ACID中的C
  • App Engine Cron 作业始终返回 HTTP 状态代码 301

    我已关注本指南 https cloud google com appengine docs flexible ruby scheduling jobs with cron yaml为我的 Rails 应用程序创建 cron 作业 但 HTT
  • 使用 Airflow BigqueryOperator 向 BigQuery 表添加标签

    我必须向 bigquery 表添加标签 我知道可以通过 BigQuery UI 来完成此操作 但如何通过气流运算符来完成此操作 Use case 用于计费和搜索目的 由于多个团队在同一项目和数据集下工作 我们需要将各个团队创建的所有表组合在
  • 关于 Executors.newSingleThreadExecutor() 的问题

    这是一个关于以下代码的程序流程的问题 import java util concurrent ExecutorService import java util concurrent Executors public class Test p
  • 完全替换 ASP.Net 的会话

    ASP Net 会话对于传统的 WebForms 应用程序来说似乎很完美 但它们所做的一些事情对于现代 AJAX 和 MVC 应用程序来说是一个严重的问题 具体来说 访问 ASP Net 提供程序只有 3 种方法 锁定读和写 默认 会话被锁
  • 如何在 Django 中获取 URL(带有协议和域)(无需请求)?

    我想在 cron 作业中发送邮件 邮件应包含 我的应用程序的链接 在 cron 作业中 我没有请求对象 并且无法使用 request build absolute uri 据我所知 网站框架可以在这里提供帮助 但没有给我协议 http vs
  • 使用 Nginx 或 Apache 来提供动态内容? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 验证随时间变化的连续条件

    我想开发一个Python程序 从某个时刻开始 等待60秒再执行操作 该程序必须具有的另一个功能是 如果我更新初始时间 它必须开始检查条件 我想过用线程来做 但我不知道如何停止线程并以新的开始时间重新启动它 import thread imp
  • 编写潜在并发问题的证明

    我正在阅读 Java 并发实践 并尝试编写一段代码来表明第 3 5 1 章中作为示例提供的类确实会引入问题 public class Holder public int n public Holder int n this n n publ
  • 当气流 initdb 时,导入错误:无法导入名称 HiveOperator

    我最近安装了airflow对于我的工作流程 在创建项目时 我执行了以下命令 airflow initdb 返回以下错误 2016 08 15 11 17 00 314 init py 36 INFO Using executor Seque
  • 如何在 AppEngine (GAE) 中进行数据库锁定?

    在 GAE 中 我有一个充满 一次性 的表 诸如 最后使用的序列号 之类的东西 这些东西并不真正属于其他表 它是一个简单的字符串键和字符串值对 我有一些代码来获取命名整数并递增它 如下所示 PersistenceCapable detach
  • 使用 SQL Server 作为具有多个客户端的数据库队列

    给定一个充当队列的表 如何最好地配置表 查询 以便多个客户端同时处理队列 例如 下表指示了工作人员必须处理的命令 当worker完成后 它会将处理后的值设置为true ID COMMAND PROCESSED 1 true 2 false
  • WordPress 计划事件未在设定时间触发

    在 WordPress 中 我正在创建一个插件 用于向用户发送电子邮件 为此 我使用 WordPresscron工作 所以基本上它要做的就是每小时向用户发送电子邮件 所以我的代码看起来像这样 public function construc
  • 在运行的 Swing 应用程序中替换 AWT EventQueue 的安全方法

    我维护的 Swing 应用程序中的各种零星问题似乎是由它使用自己的自定义版本替换默认 AWT 事件队列的方式引起的Toolkit getDefaultToolkit getSystemEventQueue push new AEventQu
  • 插入并发问题-多线程环境

    我有一个问题 即使用完全相同的参数在完全相同的时间调用相同的存储过程 存储过程的目的是获取记录 如果存在 或创建并获取记录 如果不存在 问题是两个线程都在检查记录是否存在并报告错误 然后都插入新记录 在数据库中创建重复记录 我尝试将操作保留
  • gentoo crontab:为什么这个简单的 crontab 不起作用?

    我使用 GENTOO 发行版 crontab e 35 12 root php5 home www cron php 当我手动运行时 php5 php5 home www cron php 这有效 它向我发送了一封电子邮件 然后我检查日期
  • 如何在特定 systemd 服务重新启动时触发自定义脚本运行

    我想知道如何安排自定义脚本在重新启动服务时运行 我的用例是 每当重新启动 Tomcat 服务时 我都必须运行多个命令 我想知道是否有一种方法可以编写脚本并安排它在重新启动 Tomcat 服务时运行 我已将 tomcat 脚本设置为 syst

随机推荐

  • 时区显示值与给定时区的 GMT 偏移量?

    我想构建一个时区列表以显示给用户选择 显示名称必须类似于 GMT 5 30 India Standard Time Asia Calcutta 我正在使用所有时区TimeZone getAvailableIDs 并构建列表 我写的代码是 S
  • 初始化 C++ 向量的大小

    初始化 C 向量以及其他容器的大小有哪些优点 如果有 有什么理由不只使用默认的无参数构造函数吗 基本上 两者之间是否存在显着的性能差异 vector
  • 如何正确创建可通过http访问的SVN存储库? (在 public_html 内)?

    情况是这样的 subversion 已安装在服务器中 并且我可以访问服务器中的共享帐户之一 不是 root 并且该共享托管帐户具有 SSH 访问权限 我想创建一个存储库 我可以在其中提交我正在处理的 PHP 文件 当我提交时 它应该可以在浏
  • Fabric 消息太大

    我试图将 5MB 数据从服务传递给参与者 但收到错误 Fabric 消息太大 如何增加微服务之间可传输的最大大小 我看了以下内容page https github com Azure azure content blob master ar
  • ESS 在 Windows 上找不到 Rterm.exe

    我将 R 安装在名为 X alphaAndOmega R R 的目录中 所以Rterm exe 32位版本 位于 X alphaAndOmega R R bin i386 我知道它不是 标准 R 目录 并且 标准 R 目录 例如 R 3 0
  • 如何在没有设置器的情况下设置属性值

    我已经看到提出并回答了各种问题 我们可以使用反射来调用私有设置器 如下所示 是否可以通过反射获取属性的私有设置器 https stackoverflow com questions 9219261 is it possible to get
  • 现代 x86 硬件不能将单个字节存储到内存中吗?

    说到 C 的并发内存模型 Stroustrup 的C 编程语言 第 4 版 第 1 节 41 2 1 说 就像大多数现代硬件一样 机器无法加载或存储小于单词的任何内容 然而 我的 x86 处理器已经有几年的历史了 它可以并且确实存储小于单词
  • 将 JQuery getJSON 与包含的其他 JavaScript 一起使用会出现 ReferenceError

    我制作了一个小型 HTML 页面示例来使 JQuery 的 getJSON 方法正常工作 它如下所示 抱歉 草率了 这只是一个概念证明 稍后会添加到更大的项目中
  • 在三角形内强制图表 d3.js

    我正在研究 d3 js 力图 我有一个问题 是否可以在具有某些坐标的三角形内制作力图 这是我的代码 var width 500 var height 500 margin var marginLeft 10 var marginTop 10
  • 从矩阵中删除零行(优雅的方式)

    我有一个包含一些零行的矩阵 我想删除零行 矩阵是Nx3 我所做的很简单 我创造std vector其中每三个元素代表一行 然后我将其转换为Eigen MatrixXd 有没有一种优雅的方法来删除零行 include
  • 在 ncurses 中的指定位置添加相同符号的快捷方式是什么?

    我想添加str in ncurse屏幕 带坐标x 5 to 24 y 23 to 42 这是一个正方形 但我想不出一个简单的方法来做到这一点 我试过了 stdscr addstr range 23 42 range 5 24 但这行不通 它
  • 使用敏捷方法建造飞机? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 开发者可以从其他行业学到很多东西 作为一个思维练习 是否有可能使用敏捷技术建造一架客机 暂时忘记成本 对硬件 机身 机翼等 和软件进行迭代和增量
  • DYMOLA:opc 服务器如何使用 MATLAB 使用 dsin.txt 或 mat 文件进行初始化

    我在 DYMOLA 中创建了一个 OPC 服务器 现在我在 DYMOSIM 中有这个可以单击并初始化 使用 dsin txt 的 MAT 文件 现在我在 MATLAB 中创建了一个 GUI 文件 并获取变量的输入并创建了一个 mat 文件
  • 无法构建轮子 - 错误:无效命令“bdist_wheel”

    我已经尝试了这个非常相关的问题中的所有内容 为什么我无法在 python 中创建轮子 https stackoverflow com questions 26664102 why can i not create a wheel in py
  • postgresql自连接

    假设我有一张这样的桌子 id device cmd value id unique row ID device device identifier mac address cmd some arbitrary command value v
  • Rails:创建删除表级联迁移

    如何在 Rails 3 2 迁移中强制执行 DROP TABLE CASCADE 是否有一个选项可以传递给 drop table table name 在 Rails 4 中 您可以执行以下操作 drop table accounts fo
  • 如何使用在单击按钮上创建的用户触发图表中的放大和缩小?

    我正在构建一个角度应用程序 其中我们需要创建用于放大和缩小图表的单击按钮 我们可以使用可悬停模式栏上的按钮放大缩小图表 但这对于我们的应用程序来说不是必需的 我们希望使用通过单击按钮创建的用户来放大和缩小图表 有没有办法使用单击按钮触发可悬
  • Electron如何拦截http响应体

    有什么办法可以拦截BrowserWindow主进程中的http响应主体没有调试器 是否无法使用WebRequest类和onCompleted method 我可以使用调试器做到这一点 但由于某种原因我不能使用它 await w webCon
  • 在 Eclipse (Spring Source) 中,Grails 始终以生产模式构建

    当在 Grails 项目中使用 Eclipse 时 战争的构建似乎陷入了生产模式 如果您想部署到附加的 tcServer 您只需右键单击您的项目 然后选择 运行方式 gt 在服务器上运行 如果您将 grails 项目设置为 dev 右键单击
  • 气流:Dag 每隔几秒安排两次

    我尝试每天仅运行一次 DAG00 15 00 午夜 15 分钟 然而 它被安排了两次 间隔几秒钟 dag DAG my dag default args default args start date airflow utils dates