带有 SQL Server 后端和 pyodbc 的 Airflow Scheduler

2024-01-20

我已经将 Airflow 设置为 SQL Server 作为后端(SQL Azure)。初始化数据库成功。我试图每 2 分钟运行一次简单的 dag。

dag 有 2 个任务:

  1. 打印日期
  2. sleep

当它启动气流调度程序时,它会为这两个任务创建任务实例,第一个任务成功,第二个任务似乎陷入“运行”状态。

查看调度程序日志,我反复看到以下错误。

[2019-01-04 11:38:48,253] {jobs.py:397} ERROR - Got an exception! Propagating...
Traceback (most recent call last):
  File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/jobs.py", line 389, in helper
    pickle_dags)
  File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/jobs.py", line 1816, in process_file
    dag.sync_to_db()
  File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/models.py", line 4296, in sync_to_db
    DagModel).filter(DagModel.dag_id == self.dag_id).first()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2755, in first
    ret = list(self[0:1])
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2547, in __getitem__
    return list(res)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2855, in __iter__
    return self._execute_and_instances(context)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2876, in _execute_and_instances
    close_with_result=True)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2885, in _get_bind_args
    **kw
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2867, in _connection_from_session
    conn = self.session.connection(**kw)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1019, in connection
    execution_options=execution_options)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1024, in _connection_for_bind
    engine, execution_options)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 409, in _connection_for_bind
    conn = bind.contextual_connect()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2112, in contextual_connect
    self._wrap_pool_connect(self.pool.connect, None),
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2151, in _wrap_pool_connect
    e, dialect, self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1465, in _handle_dbapi_exception_noconnection
    exc_info
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2147, in _wrap_pool_connect
    return fn()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 387, in connect
    return _ConnectionFairy._checkout(self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 768, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 516, in checkout
    rec = pool._do_get()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 1140, in _do_get
    self._dec_overflow()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line 66, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 1137, in _do_get
    return self._create_connection()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 333, in _create_connection
    return _ConnectionRecord(self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 461, in __init__
    self.__connect(first_connect_check=True)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 651, in __connect
    connection = pool._invoke_creator(self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/strategies.py", line 105, in connect
    return dialect.connect(*cargs, **cparams)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/default.py", line 393, in connect
    return self.dbapi.connect(*cargs, **cparams)
InterfaceError: (pyodbc.InterfaceError) ('28000', u"[28000] [unixODBC][Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Login failed for user 'airflowuser'. (18456) (SQLDriverConnect)")

Airflow 配置为使用 LocalExecutor 和 pyodbc 连接到 SQL Azure

    # The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
#executor = SequentialExecutor
executor = LocalExecutor


# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
#sql_alchemy_conn = sqlite:////home/sshuser/airflow/airflow.db
#connection string to MS SQL Serv er
sql_alchemy_conn = mssql+pyodbc://airflowuser@afdsqlserver76:<password>@afdsqlserver76.database.windows.net:1433/airflowdb?driver=ODBC+Driver+17+for+SQL+Server


# The encoding for the databases
sql_engine_encoding = utf-8

# If SqlAlchemy should pool database connections.
sql_alchemy_pool_enabled = True

# The SqlAlchemy pool size is the maximum number of database connections
# in the pool. 0 indicates no limit.
sql_alchemy_pool_size = 10

# The SqlAlchemy pool recycle is the number of seconds a connection
# can be idle in the pool before it is invalidated. This config does
# not apply to sqlite. If the number of DB connections is ever exceeded,
# a lower config value will allow the system to recover faster.
sql_alchemy_pool_recycle = 180

# How many seconds to retry re-establishing a DB connection after
# disconnects. Setting this to 0 disables retries.
sql_alchemy_reconnect_timeout = 300

达格如下

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 4),
    'email': ['[email protected] /cdn-cgi/l/email-protection'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval='*/2 * * * *', max_active_runs=1, catchup=False)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

t2.set_upstream(t1)

我有点不明白为什么调度程序在成功运行第一个任务后无法连接到数据库。 非常感谢任何解决此问题的指示。

我有一个示例程序,它使用 sqlalchemy 使用相同的凭据连接到 SQL Azure,这很有效。

import sqlalchemy

from sqlalchemy import create_engine

engine = create_engine("mssql+pyodbc://afdadmin@afdsqlserver76:<password>@afdsqlserver76.database.windows.net:1433/airflowdb?driver=ODBC+Driver+17+for+SQL+Server")

connection = engine.connect()
result = connection.execute("select version_num from alembic_version")
for row in result:
    print("Version:", row['version_num'])
connection.close()

在 odbcinst.ini 中设置 Pooling = True 后问题得到解决

[ODBC] 池化 = 是

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

带有 SQL Server 后端和 pyodbc 的 Airflow Scheduler 的相关文章

  • 合并sql中的列

    我正在使用 SQL Server 2017 有一个存储过程 其中我有一个带有连接的简单选择 例如 SELECT p legacyKey AS JobNumber p Name AS JobName G Label AS DesignStat
  • 我可以根据多列删除数据库重复项吗?

    I 不久前问过这个问题 https stackoverflow com questions 4952250 how to delete duplicates from a database table based on a certain
  • 如何使用 SQL 通过表示多级订单的 varchar 字段正确排序?

    我不太喜欢数据库 我发现在查询上出现以下问题SQL服务器数据库旧的遗留应用程序的 我声明不幸的是我无法更改数据库结构 字段类型 这非常难看 我有以下情况 SELECT Sottocategoria IdSottocategoria IdCa
  • 有没有办法设置 SQL Server 作业计划每 30 秒运行一次?

    当我尝试创建计划时 我可以选择的最短时间是 1 分钟 有没有办法将其减少到秒 这篇文章在这里SQL Server 作业调度 http www sqlservercentral com articles Administration sqls
  • 使用 MS Access 链接表连接到 SQL Server 后端时是否可能发生 SQL 注入

    我一直在对此进行一些研究 但到目前为止还是一片空白 情况是这样的 我正在开发一个链接到 SQL Server 后端的 MS Access 前端 我在某种程度上使用 Access 中的链接表 表单访问 SQL DB 这样 当用户更新表单中的值
  • 分组和切换列和行

    我不知道这是否会被正式称为枢轴 但我想要的结果是这样的 Alex Charley Liza 213 345 1 23 111 5 42 52 2 323 5 23 1 324 5 我的输入数据采用这种形式 Apt Name
  • SQL Server:将表达式转换为数据类型 bigint 时出现算术溢出错误

    这是我的查询顺序 SELECT CASE WHEN BarCode IS NOT NULL AND ExternelBarCode IS NULL THEN BarCode WHEN BarCode IS NULL AND Externel
  • 在数据库中搜索时忽略空文本框

    此代码能够搜索数据并将其加载到DataGridView基于搜索表单文本框中提供的值 如果我将任何文本框留空 则不会有搜索结果 因为 SQL 查询是用 AND 组合的 如何在搜索 从 SQL 查询或 C 代码 时忽略空文本框 private
  • 如何通过 Apache Airflow 中的 Docker Operator 使用卷

    我正在开发一个 ETL 流程 使用 DockerOperator 通过 Apache Airflow 进行调度和编排 我正在使用 Windows 笔记本电脑 因此我只能从 Docker 容器内运行 Apache Airflow 我能够在我的
  • ALTER TABLE 语句与 FOREIGN KEY 约束冲突

    为什么要添加外键tblDomare表导致此错误 ALTER TABLE 语句与 FOREIGN KEY 约束 FK tblDomare PersN 5F7E2DAC 冲突 冲突发生在数据库 almu0004 表 dbo tblBana 列
  • pymssql 库中的参数绑定是否正确实现?

    我使用 pymsqsql 库从 Python 程序调用极其简单的查询 with self conn cursor as cursor cursor execute select extra id from mytable where id
  • SQL Server 中的 FIFO 查询

    我正在构建一个库存管理应用程序c with SQL server 我想做一个FIFO从我的表查询 我以可变价格购买了相同的产品 之后我卖掉了其中一些 我想根据 先进先出 进行查询BatchDate柱子 所以我想通过PurchasePrice
  • SQL Server 实例名称的最大长度?

    我需要知道以下版本的 SQL Server 实例名称的最大字符长度 如果它们之间存在差异 SQL Server 2000 SQL Server 2005 SQL Server 2008 SQL Server 2008 R2 我正在开发一个应
  • C# 和 SQL Server:如果字符串值为空,如何在命令参数中插入 DBNull.Value?

    我已经搜索了几个小时 但找不到解决方案 我正在将一些字符串插入 SQL 但是有时 我用来执行此操作的方法可能包含空字符串 即 因此我想在 SQL Server 中插入一个空值 首先我测试我的方法以确保我能够插入DBNull Value通过使
  • 无法将数据加载到 mvc 4 中的 jTable 中

    好的 我第一次尝试 jTable 我可以加载表 但这对我没有什么好处 因为它不会加载我的任何数据 当我调试程序时 我想要的表中的所有行都存储在我的列表中 因此我很困惑为什么当我运行应用程序时会弹出一个对话框 显示 与服务器通信时发生错误 H
  • TransactionScope 是否需要开启 DTC 服务?

    根据我的阅读 为了在 NET 中使用 TransactionScope 您需要运行 Windows 中的分布式事务协调器服务 我有那个服务关掉 并且我的应用程序似乎运行相同并且回滚事务没有问题 我错过了什么吗 它如何能够发挥作用呢 我正在运
  • 将表值参数与 SQL Server JDBC 结合使用

    任何人都可以提供一些有关如何将表值参数 TVP 与 SQL Server JDBC 一起使用的指导吗 我使用的是微软提供的6 0版本的SQL Server驱动程序 我已经查看了官方文档 https msdn microsoft com en
  • 重用 t-sql 游标的起始位置?

    我正在开发一个在临时表上使用游标的存储过程 我已经阅读了一些关于为什么不需要游标的内容 但在这种情况下我相信我仍然需要使用游标 在我的过程中 我需要遍历表的行两次 声明游标后 已经单步执行临时表并关闭游标 重新打开时游标的位置是否仍保留在表
  • SQL Server 文件操作?

    使用 SQL Server 2005 如何使用 T SQL 将文件读入 SPROC 所以 假设我有一个像这样的 CSV 文件 ID OtherUselessData 1 asdf 2 asdf 3 asdf etc 我基本上想这样做 Sel
  • SQL Server:如果存在会大大减慢查询速度

    正在使用SQL Server 2012 我找到了一些关于查询优化的主题 并将 EXISTS 与 COUNT 进行比较 但我找不到这个确切的问题 我有一个看起来像这样的查询 select from tblAccount as acc join

随机推荐

  • 如何同时使用导航规则和 f:ajax

    这是我的场景 我想在某些情况下通过 Ajax 更新页面 在其他情况下执行导航规则 我的用例是登录表单 我希望他们在 uname password 失败时通过 ajax 收到错误消息 但如果成功则导航到新页面 有人使用 JSF2 0 f aj
  • Java isNan 是如何工作的?

    我正在看openjdk 1 7 0 25源码中我见过这个方法 Returns code true if the specified number is a Not a Number NaN value code false otherwis
  • RDS 集群和数据库实例概念

    我需要创建 RDS Aurora 5 7 数据库 我想我对RDS的概念不太清楚 这是正确的层次结构吗 aws rds cluster gt aws rds cluster instance gt aws db instance我应该需要定义
  • 请求标头未从拦截器角度 2/4 成功更新(401 处理)

    我正在使用 Http 拦截器并尝试重试失败的请求来处理401 error 我正在尝试设置一个新标头来更新请求 但它不起作用 我注意到我的标头没有通过请求设置 而是转到lazyUpdates内部标头 任何人都可以告诉我为什么会发生这种情况 检
  • 更改 emacs 文本模式的边距

    我发现根据自己的喜好更改 emacs 中的边距而又不会出现滑稽的情况的唯一方法是 add hook window configuration change hook lambda set window margins car get buf
  • codeigniter 中的无限滚动 jquery 插件

    我的 config 文件夹中有一个 pagination php 文件 代码如下 config num links 5 config use page numbers TRUE config query string segment pag
  • 使用批处理文件计算文件夹和子文件夹的数量

    我正在创建一个批处理文件 当给定路径时 它将计算其中的所有文件夹和子文件夹 到目前为止 我只能收集路径第一层内的文件夹数量 然后我会将其传输到一个文本文件 这是我到目前为止所拥有的 for f a in dir b ad folder fi
  • mgcv bam() 错误:无法分配大小为 99.6 Gb 的向量

    我正在尝试使用 bam mgcv 库 拟合加法混合模型 我的数据集包含来自对 300 个健康中心内 2 10 5 名儿童生长情况的纵向研究的 10 6 观察结果 我正在寻找每个中心的坡度 模型是 bam haz s month bs cc
  • 使用 uint64_t 作为键和结构体作为值的 GHashTable

    我正在学习GHashTable 虽然 Stackoverflow 中已经有一些例子 但它们只是一些常见的情况 所以我仍然不确定如何实现我的要求并决定寻求帮助 我想用一个uint64 t作为钥匙和struct作为价值 我发现没有这样的内置哈希
  • 您可以将 VectorDrawable pathData 转换为 Path 对象吗

    是否可以拉动pathData出于一个VectorDrawable并将其转换为Path目的 我想创建一个自定义ViewOutlineProvider并给它一个任意形状来剪切和投射阴影 如果有办法直接使用VectorDrawable那就更好了
  • 内联柔性容器宽度不增长

    考虑以下布局 div class div span class span1 test span span class span2 test test test test test span div 和CSS div display inli
  • 是否有一个 Excel 公式可以搜索列表并突出显示包含不同列表中的单词的文本? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我在 Excel 文档中有两个表 更像是列表 两者都只是一列 我想创建一个公式来搜索第一个列表并突出显示包含第二个列表中的某个单词的任
  • Qt 翻译非源文件中的字符串

    我有一个使用 XML 文件的 Qt 项目 这些 XML 文件包含人类可读的文本 并且应使用 Qt 工具 lupdate lrelease QtLinguist 翻译该文本 问题是是否可以通过 lupdate 在 ts 文件中生成条目 而无需
  • Jetpack compose 中使用 AppCompatActivity 代替 ComponentActivity

    我想打开datePickerJetpack compose 中单击按钮上的对话框 为此 我在按钮的内部使用以下代码onClick action val context LocalContext current Button onClick
  • 清除 EB 初始化配置

    有没有办法清除以前的 eb init 配置 之前的配置包含早期 AWS 账户中不存在的资源 我正在使用新的 AWS 账户并想要初始化现有的 Beanstalk 环境 Thanks 您可以随时添加 help到命令以查看可用的选项 例如 eb
  • Angular Datepicker更改日期格式

    我使用 Angular Material 中的 DatePicker 我想将输出格式更改为yyyy mm dd 目前它以以下格式打印日期 Wed Nov 14 2018 00 00 00 GMT 0100 Central European
  • Eclipse:启用自动完成/内容辅助

    如何在 Eclipse 中启用自动完成功能 我找不到它 If you would like to use autocomplete all the time without having to worry about hitting Ctr
  • 使用 Quercus 从 Java 调用 PHP

    我有一个示例 PHP 类 我想在我的 Java 应用程序中使用它 我们决定使用 Quercus 作为进行集成的库 有人可以告诉我如何使用 Quercus 从 Java 代码调用 PHP 类吗 例如 PHP 类名称是calculator ph
  • 在 Mac OS X 上编译 ncurses 应用程序时出错

    我试图在 Mac OS X 10 6 8 上编译基于 ncurses 的应用程序 但出现此错误 Undefined symbols for architecture x86 64 initscr referenced from main i
  • 带有 SQL Server 后端和 pyodbc 的 Airflow Scheduler

    我已经将 Airflow 设置为 SQL Server 作为后端 SQL Azure 初始化数据库成功 我试图每 2 分钟运行一次简单的 dag dag 有 2 个任务 打印日期 sleep 当它启动气流调度程序时 它会为这两个任务创建任务