如何使用Airflow获取并处理mysql记录?

2024-01-11

我需要

1. run a select query on MYSQL DB and fetch the records.              
2. Records are processed by python script.

我不确定我应该采取什么方式。 xcom 是去这里的路吗?另外,MYSQLOperator 只执行查询,不获取记录。有没有我可以使用的内置传输运营商?我如何在这里使用 MYSQL 挂钩?

你可能想使用一个使用钩子来获取数据的PythonOperator, 应用转换并将(现已评分)行运回其他地方。

有人可以解释如何进行同样的事情吗?

Refer - http://markmail.org/message/x6nfeo6zhjfeakfe http://markmail.org/message/x6nfeo6zhjfeakfe

def do_work():
    mysqlserver = MySqlHook(connection_id)
    sql = "SELECT * from table where col > 100 "
    row_count = mysqlserver.get_records(sql, schema='testdb')
    print row_count[0][0]

callMYSQLHook = PythonOperator(
    task_id='fetch_from_testdb',
    python_callable=mysqlHook,
    dag=dag
)

这是正确的方法吗? 另外我们如何使用 xcoms 来存储以下 MySqlOperator 的记录?

t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)

在过去的 90 分钟里,我真的一直在为此苦苦挣扎,这里有一个适合新手的更具声明性的方法:

from airflow.hooks.mysql_hook import MySqlHook

def fetch_records():
  request = "SELECT * FROM your_table"
  mysql_hook = MySqlHook(mysql_conn_id = 'the_connection_name_sourced_from_the_ui', schema = 'specific_db')
  connection = mysql_hook.get_conn()
  cursor = connection.cursor()
  cursor.execute(request)
  sources = cursor.fetchall()
  print(sources)

...your DAG() as dag: code

task = PythonOperator(
  task_id = 'fetch_records',
  python_callable = fetch_records
)

这会将数据库查询的内容返回到日志。

我希望这对其他人有用。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用Airflow获取并处理mysql记录? 的相关文章

  • MySQL 将表从 Latin1 转换为 utf8

    我需要将包含大量数据的表从 Latin1 转换为 utf8 以便它可以接受韩语字符 如何更改该表而不损坏其中的数据 我的 SQL 语句是什么 最好的方法是什么 ALTER TABLE database name table name CON
  • Python,将函数的输出重定向到文件中

    我正在尝试将函数的输出存储到Python中的文件中 我想做的是这样的 def test print This is a Test file open Log a file write test file close 但是当我这样做时 我收到
  • 使用来自另一个数据库的选择查询更新 mysql 表

    我有两个数据库 我想用另一个数据库表中的值更新一个表 我正在使用以下查询 但它不起作用 UPDATE database1 table1 SET field2 database2 table1 field2 WHERE database1 t
  • 更改mysql数据库表中的日期格式

    大家早上好 只是一个简单的问题 在我现有的 MySql 数据库中 我几乎没有包含日期 的列 目前这些是年 月 日格式 但现在我需要将其全部更改为年 月 日格式 我试过了select date format curdate d m Y 但它不
  • 如何在查询语句之外从mysql查询中获取值?

    这是下面的函数console log function quo value value connection query SELECT role from roles where id 1 function error results fi
  • Pandas 中的 Groupby、转置和追加?

    我有一个数据框 如下所示 每个用户有10条记录 现在 我想创建一个如下所示的数据框 userid name1 name2 name10 这意味着我需要反转该列的每 10 条记录name并附加到新的数据框 那么 它是如何做到的呢 有什么办法可
  • 没有提示指令的直连接中表的顺序是否会影响性能?

    所有基于 SQL 的 RDBMS 10 年前的版本 直接连接查询 没有提示指令 中的表顺序是否会对最佳性能和内存管理产生影响 听说最后一个join应该是最大的表 您的数据库的查询优化器如何处理这种情况 回答你的问题 是的 表的顺序在连接中有
  • PyCharm 表示 readline 导入未被使用

    我有这个代码 while True cmd input gt if cmd exit break 但我想实现高级文本输入功能 例如命令历史记录 因此我导入了 readline 模块 导入 readline 模块 甚至不使用它 将解锁这些功能
  • Python排序算法[重复]

    这个问题在这里已经有答案了 我在Python中实现了不同的排序算法 以更好地理解它们 我想知道Python的内置排序方法实现什么类型的排序 这是一个叫做Timsort http en wikipedia org wiki Timsort由
  • 将IP保存到数据库中

    当用户登录时 我想将他们的 IP 保存在数据库中 我该怎么做呢 MySQL 字段最适合使用哪种类型 获取IP的PHP代码是什么样的 我正在考虑将其用作登录 会话内容的额外安全功能 我正在考虑使用用户现在拥有的 IP 检查用户从数据库登录的
  • MySql 视图脚本中的注释

    可以这样做吗 我尝试过多个 gui mysql workbench navicat toad for mysql 但没有一个保存这样的注释 something important select something else importan
  • “修改列”与“更改列”

    我知道 我们不能使用重命名列MODIFY COLUMN语法 但我们可以使用CHANGE COLUMN syntax 我的问题是 主要用途是什么modify syntax 例如 ALATER TABLE tablename CHANGE co
  • str.translate 与 str.replace - 何时使用哪一个?

    何时以及为什么使用前者而不是后者 反之亦然 目前尚不完全清楚为什么有些人使用前者以及为什么有些人使用后者 它们有不同的目的 translate只能用任意字符串替换单个字符 但一次调用可以执行多次替换 它的参数是一个特殊的表 它将单个字符映射
  • 如何在Windows中的Python 3.9下pip安装pickle?

    我需要pickle https docs python org 3 9 library pickle html module pickle包安装在我的下面Python 3 9在 Windows 10 下 我尝试过的 当尝试与pip inst
  • 无法在 virtualenv 中安装 libxml2

    我有一个问题libxml2蟒蛇模块 我正在尝试将其安装在python3 虚拟环境使用以下命令 pip install libxml2 python3 但它显示以下错误 Collecting libxml2 python3 Using cac
  • Tomcat 6找不到mysql驱动

    这里有一个类似的问题 但关于类路径 ClassNotFoundException com mysql jdbc Driver https stackoverflow com questions 1585811 classnotfoundex
  • 使用 MYSQL 将 h:mm pm/am 时间格式插入数据库

    我正在尝试将以 h mm am pm 格式写入的时间插入到存储为标准 DATETIME 格式 hh mm ss 的数据库中 但我不知道如何将发布的时间转换为标准格式所以数据库会接受它 这是我到目前为止一直在尝试的 title POST in
  • 使用用户定义函数 MySql 时出错

    您好 请帮我解决这个问题 提前致谢 我在数据库中定义了这些函数 CREATE FUNCTION levenshtein s1 VARCHAR 255 s2 VARCHAR 255 RETURNS INT DETERMINISTIC BEGI
  • 无法连接到 MAMP 上的 phpMyAdmin

    我收到此错误消息 MySQL 说道 无法连接 设置无效 phpMyAdmin 尝试连接 MySQL 服务器 但服务器拒绝连接 您应该检查配置中的主机 用户名和密码 并确保它们与 MySQL 服务器管理员提供的信息相对应 用户和通行证是默认的
  • 没有名为“turtle”的模块

    我正在学习并尝试用Python3制作贪吃蛇游戏 我正在进口海龟 我正在使用 Linux mint 19 PyCharm python37 python3 tk Traceback most recent call last File hom

随机推荐

  • python str.index 时间复杂度

    为了找到字符串中子字符串的位置 需要一个简单的算法O n 2 时间 然而 使用一些有效的算法 例如KMP算法 https en wikipedia org wiki Knuth E2 80 93Morris E2 80 93Pratt al
  • 在Python中组合多个音频文件(带延迟)

    我希望在 Python 中组合一系列不同的音频文件 mp3 要求之一是我需要能够在每个文件的末尾指定延迟 为了说明这一点 例如 文件1 mp3 3秒 延迟 2秒 文件2 mp3 mp3 3秒 延迟 2秒 mp3 4秒 延迟 2秒 file3
  • 窃取焦点,因为 SetForegroundWindow 无法做到这一点

    我知道这听起来很邪恶 但我的意图根本不是这个 用户单击 延迟拍摄 并开始倒计时 在此期间他们会聚焦另一个应用程序 然后倒计时后用户希望我的应用程序重新获得焦点 SetForegroundWindow当它从 PID X 的应用程序运行而 PI
  • 未找到类 ZMQContext

    我在 Ubuntu 14 04 的虚拟机内的 nginx 1 4 6 和 php 5 5 上运行 Web 服务器 并且需要安装 ZeroMQ 扩展 我已按照以下说明进行操作ZMQ http zeromq org area download
  • RIP寄存器不改变

    为什么当我继续使用c和内联汇编打印堆栈和指令指针寄存器时它们不会改变 因为逻辑上其他程序同时运行 所以它们应该在打印时不断改变 操作系统和 CPU 一起工作 为进程 同时运行 提供 CPU 切片 实际上 他们通过分配时间片来虚拟化 CPU
  • 如何在多行中编写 f 字符串而不引入意外的空格? [复制]

    这个问题在这里已经有答案了 考虑以下代码片段 name1 Nadya name2 Jim def print string string f name1 n name2 print string print string 产生 Nadya
  • Monodevelop - 仅使用 sudo 运行

    我已经在我的 Debian amd64 jessie 构建上安装了 Mono 和 Monodevelop 并且我只能使用提升的权限运行 monodevelop 从 UI startesque 菜单启动 monodevelop 似乎什么也没发
  • 带有位置参数的 Git 别名

    基本上我正在尝试别名 git files 9fa3 执行命令 git diff name status 9fa3 9fa3 但 git 似乎没有将位置参数传递给别名命令 我努力了 alias files git diff name stat
  • 为什么 Apache 没有在 XAMPP 上启动 [关闭]

    Closed 这个问题是无关 help closed questions 目前不接受答案 直到昨天 我的本地主机一切都很好 但从昨天开始 本地主机无法打开 它说 无法连接 我尝试了很多次来启动Apache on XAMPP 但它说消息忙 我
  • .NET .config 文件中 ConnectionString 元素的用途

    在中存储和读取应用程序的连接字符串有什么区别
  • 转移 PyPI 包的所有权

    As per PEP 541 https www python org dev peps pep 0541 现在可以认领废弃的 PyPI 项目 有人这样做过吗 联系谁 我尝试过dist utils 邮件列表 https mail pytho
  • Get-EventLog - 某些事件日志源缺少有效消息

    我正在使用 get eventlog 提取和过滤系统事件日志数据 我发现 get event log 无法正确返回与某些条目关联的消息 这些条目通常显示在事件日志查看器中 例如 get eventlog logname system sou
  • Python 模拟多个具有不同结果的调用

    我希望能够对特定属性函数进行多次调用 为每次连续调用返回不同的结果 在下面的示例中 我希望增量在第一次调用时返回 5 然后在第二次调用时返回 10 Ex import mock class A def init self self size
  • OpenCV 和 VS2010:致命错误 LNK1104:致命错误 LNK1104:无法打开文件“tbb_debug.lib”

    我尝试按照本指南使用 Visual Studio C 2010 安装 OpenCV 使用 Windows 7 64 位 在 Visual C 2010 Express 中安装 OpenCV 2 4 3 https stackoverflow
  • Django 独立脚本

    我正在尝试从另一个 python 脚本访问我的 Django v1 10 应用程序数据库 但遇到了一些问题 这是我的文件和文件夹结构 store store init py settings py urls py wsgi py store
  • 面向对象的设计建议

    这是我的代码 class Soldier public Soldier const string name const Gun gun string getName private Gun gun string name class Gun
  • Android:SensorManager.getRotationMatrix 和 SensorManager.getOrientation() 的算法

    要在 Android 中获取欧拉角 例如俯仰角 横滚角 方位角 的方向 需要执行以下操作 SensorManager getRotationMatrix float R float I float 重力 float 地磁 SensorMan
  • 跨站脚本注入

    我正在测试一个网络应用程序 我想写一个XSS将显示警报的脚本 Hello 我写的第一个脚本是
  • VBA Word - 带有初始文件名的另存为对话框

    我有一个 vba 宏 可以对当前文档进行一些更改 并确定应该用于该文档的文件名 如果该文档没有保存为该文件名 但应该提示用户这样做 但应该能够更改默认设置 我发现两种可能性都不完美 我需要这两种的混合 第一种方法 Application D
  • 如何使用Airflow获取并处理mysql记录?

    我需要 1 run a select query on MYSQL DB and fetch the records 2 Records are processed by python script 我不确定我应该采取什么方式 xcom 是