如何定义 Airflow 上 STFP Operator 的操作?

2024-02-27

class SFTPOperation(object):
    PUT = 'put'
    GET = 'get'  

operation=SFTPOperation.GET,
NameError: name 'SFTPOperation' is not defined

我在这里定义了操作符,但我在互联网上找不到与操作相关的任何内容

class sftpplugin(AirflowPlugin):
    name = "sftp_plugin"
    operators = [SFTPOperator]

任何帮助将不胜感激!

Thanks,


通过注意到SFTP https://airflow.apache.org/docs/stable/_modules/airflow/contrib/operators/sftp_operator.html操作员使用 ssh_hook 打开 sftp 传输通道,您应该需要提供ssh_hook or ssh_conn_id用于文件传输。首先,让我们看一个提供参数的示例ssh_conn_id.

from airflow.providers.sftp.operators import sftp_operator
from airflow import DAG
import datetime

dag = DAG(
'test_dag',
start_date = datetime.datetime(2020,1,8,0,0,0),
schedule_interval = '@daily'
)

put_operation = SFTPOperator(
            task_id="operation",
            ssh_conn_id="ssh_default",
            local_filepath="route_to_local_file",
            remote_filepath="remote_route_to_copy",
            operation="put",
            dag=dag
            )
get_operation = SFTPOperator(....,
            operation = "get",
            dag = dag
            )

put_operation >> get_operation

请注意,应根据任务的需要安排 dag,此处的示例考虑从中午开始的每日计划。现在,如果您提供 SSHhook,则需要对上述代码进行以下更改

from airflow.contrib.hooks.ssh_hook import SSHHook
...

put_operation = SFTPOperator(
            task_id="operation",
            ssh_hook=SSHHook("Name_of_variable_defined"),
            ...
            dag=dag
            )
....

where "Name_of_variable_defined"在Airflow界面的Admin -> Connections中创建。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何定义 Airflow 上 STFP Operator 的操作? 的相关文章

  • Django 中的 Rpy2 错误 - 未为“”类型的对象定义转换“py2rpy”

    我以前从未使用过 R 并且正在尝试使用 rpy2 从 python 调用 R 函数 它可以在独立的 python 终端上运行 但不能在 Django 中运行 但rpy2似乎无法将python字符串转换为r对象 我正在使用同事提供的自定义库
  • 如何在 Google App Engine 的 Python 中获取 StringProperty 的值?

    如何获取 nbd Model 的值 我想返回由多个字段组成的描述 但我无法让它工作 这是我的班级代码 class User ndb Model name ndb StringProperty email ndb StringProperty
  • 以矢量化方式在另一个 DataFrame 中查找包含值子集的行

    如何匹配此 DataFrame 中的值source car id lat lon 0 100 10 0 15 0 1 100 12 0 10 0 2 100 09 0 08 0 3 110 23 0 12 0 4 110 18 0 32 0
  • 组和平均 NumPy 矩阵

    假设我有一个任意的 numpy 矩阵 如下所示 arr 6 0 12 0 1 0 7 0 9 0 1 0 8 0 7 0 1 0 4 0 3 0 2 0 6 0 1 0 2 0 2 0 5 0 2 0 9 0 4 0 3 0 2 0 1 0
  • Tipfy:如何在模板中显示blob?

    鉴于在 gae 上使用tipfy http www tipfy org python 以下模型 greeting avatar db Blob avatar 显示 blob 此处为图像 的模板标签是什么 在这种情况下 斑点是一个图像 这很棒
  • 使用多级解决方案计算二维网格中的最近邻

    我有一个问题 在 x y 大小的网格中 我提供了一个点 并且我需要找到最近的邻居 在实践中 我试图在 pygame 中找到距离光标最近的点 该点跨越颜色距离阈值 计算如下 sqrt rgb1 0 rgb2 0 2 rgb1 1 rgb2 1
  • 如何使用 i18n 切换器将“LANGUAGE_CODE”保存到数据库,以便在 Django 中的不同浏览器中语言不会更改?

    有什么办法可以改变它的值LANGUAGE CODE单击按钮 发送请求 时 settings py 中的变量会动态变化吗 我希望用户设置自己的 默认语言 他们的帐户 现在 用户可以使用下拉列表选择他们的首选语言 并且网站会得到完美的翻译 并且
  • Python——捕获异常的效率[重复]

    这个问题在这里已经有答案了 可能的重复 Python 常见问题解答 异常有多快 https stackoverflow com questions 8107695 python faq how fast are exceptions 我记得
  • 按多个键分组并对字典列表的值进行汇总/平均值

    在Python中按多个键进行分组并对字典列表进行汇总 平均值的最Pythonic方法是什么 假设我有一个字典列表 如下所示 input dept 001 sku foo transId uniqueId1 qty 100 dept 001
  • 如何使用 PyMongo 在重复键错误后继续插入

    如果我需要在 MongoDB 中插入尚不存在的文档 db stock update one document set document upsert True 将完成这项工作 如果我错了 请随时纠正我 但是 如果我有一个文档列表并想将它们全
  • 返回上个月的日期时间对象

    如果 timedelta 在它的构造函数中有一个月份参数就好了 那么最简单的方法是什么 EDIT 正如下面指出的那样 我并没有认真考虑这一点 我真正想要的是上个月的任何一天 因为最终我只会获取年份和月份 因此 给定一个日期时间对象 返回的最
  • Jupyter Notebook 中的深色模式绘图 - Python

    我正在使用 Jupyter Notebook 目前正在使用 JupyterThemes 的深色日光主题 我注意到我的绘图不是处于黑暗模式 并且文本仍然是黑色并且在日光照射的背景上无法读取 JupyterThemes 的自述文件建议在 ipy
  • 如何分析组合的 python 和 c 代码

    我有一个由多个 python 脚本组成的应用程序 其中一些脚本正在调用 C 代码 该应用程序现在的运行速度比以前慢得多 因此我想对其进行分析以查看问题所在 是否有工具 软件包或只是一种分析此类应用程序的方法 有一个工具可以将 python
  • 计算 pyspark df 列中子字符串列表的出现次数

    我想计算子字符串列表的出现次数 并根据 pyspark df 中包含长字符串的列创建一个列 Input ID History 1 USA UK IND DEN MAL SWE AUS 2 USA UK PAK NOR 3 NOR NZE 4
  • 根据列索引重命名 Dataframe 列

    是否有内置函数可以按索引重命名 pandas 数据框 我以为我知道列标题的名称 但事实证明第二列中有一些十六进制字符 根据我接收数据的方式 我将来可能会在第 2 列中遇到这个问题 因此我无法将这些特定的十六进制字符硬编码到 datafram
  • 在 matplotlib 中绘制多边形的并集[重复]

    这个问题在这里已经有答案了 我正在尝试绘制几个多边形的并集matplotlib 具有一定的 alpha 水平 我当前的代码在交叉点处颜色较深 有没有办法让交叉路口与其他地方的颜色相同 import matplotlib pyplot as
  • 在 scipy 中创建新的发行版

    我试图根据我拥有的一些数据创建一个分布 然后从该分布中随机抽取 这是我所拥有的 from scipy import stats import numpy def getDistribution data kernel stats gauss
  • 如何使用 os.chdir 转到减去最后一步的路径?

    例如 一个方法传递了一个路径作为参数 这个路径可能是 C a b c d 如果我想使用 os chdir 更改为 C a b 怎么办 c 没有最后一个文件夹 os chdir 可以接受 命令吗 os chdir 可以采取 作为论点 是的 然
  • 在 HDF5 (PyTables) 中存储 numpy 稀疏矩阵

    我在使用 PyTables 存储 numpy csr matrix 时遇到问题 我收到此错误 TypeError objects of type csr matrix are not supported in this context so
  • Streamlabs API 405 响应代码

    我正在尝试使用Streamlabs API https dev streamlabs com Streamlabs API 使用 Oauth2 来创建应用程序 因此 首先我将使用我的应用程序的用户发送到一个授权链接 其中包含我的应用程序的客

随机推荐

  • 如何在一个布局中实现多个recyclerview?

    我想创建一个窗格 其中有两个RecyclerViews 假设 MyItems AllItems 我创建了垂直LinearLayout 其中有TextView作为标题和RecyclerView 像这样的东西
  • 在 Visual Studio Webtest 中插入 120 秒等待

    我想在两个 Web 请求的执行之间添加 120 秒的延迟 我尝试过使用思考时间 但它不会暂停执行 120 秒 谁能告诉我如何添加等待以暂停执行 120 秒 在执行 2 个 Web 请求之后 然后继续执行下一个请求 我正在使用 Visual
  • ImportError:无法导入名称 celery

    我正在尝试使用 Celery Redis Flask 运行一些后台作业 我的应用程序结构是 myapp celery worker py manage py myapp init py app py bot init py tasks py
  • 使无边框窗口具有较暗较大的阴影

    当无边框窗口中的窗口变为活动状态时 如何创建更暗和更大的阴影 我对 NSWindow 进行了子类化 我的窗口成为主窗口和关键窗口 但这没有帮助 阴影仍然很小 那么也许有人知道如何解决这个问题 我也尝试过使阴影无效 但这也没有帮助 NSWin
  • 使用 SQL 中的结果计算每天的总数

    我的订单表中有 50 行 条目 我有一个列 当订单被认领时保存 名为claimed at 该字段中的日期格式如下 2011 10 03 07 07 33 格式为 yy mm dd 时间 我还有一个专栏叫price this price是他们
  • Loopback.io 的 ACL 问题

    我目前正在评估用于开发新项目的 API 部分的 Loopback io 但在设置正确的 ACL 条目时遇到问题 我希望完成的是给定一个身份验证令牌 GET 端点应该只返回用户拥有的对象 例如 对 Shows access token xxx
  • 什么是“SQLiteDatabase 创建且从未关闭”错误?

    我已经在我的适配器类中关闭了数据库 那么为什么这个错误显示在 logcat 上 但我的应用程序不强制关闭 但只有错误显示在 log cat 上 我应该关闭数据库以忽略此错误 我的错误是 下面 我离开哪个类来关闭数据库 我从此链接获取帮助ht
  • C#,求一个数的最大素因数

    我是编程新手 正在练习我的 C 编程技能 我的应用程序旨在查找用户输入的数字的最大素因数 但我的应用程序没有返回正确的答案 我真的不知道问题出在哪里 你能帮我么 using System using System Collections G
  • 从 Action Script 到 C# 的 Rijndael 加密

    我正在尝试在 Action Script 和 C 之间共享加密 我的任务是在 C 中解密以下消息 f1ca22a365ba54c005c3eb599d84b19c354d26dcf475ab4be775b991ac97884791017b1
  • UUID 转换为无符号整数

    有没有地方可以将 UUID 压缩 转换 编码 加密为无符号整数 我从 sql 表中读取 UUID 历史记录很难看 我无法更改 我只有一个 unsigned int 来存储它 这是 C 以防产生影响 对此有什么想法吗 谢谢 礼萨 正如其他人所
  • 在两个容器之间共享/tmp

    我在用着docker compose生成两个容器 我想分享 tmp这两个容器之间的目录 但不与主机 tmp如果可能的话 这是因为我正在通过上传一些文件flask to tmp并想要处理这些文件celery flask build comma
  • 撤销 IdentityServer4 中特定会话的访问令牌(参考)

    我在用IdentityServer 4带有参考标记的隐式流程 我自己做了一个实现IPersistedGrantStore https github com IdentityServer IdentityServer4 blob releas
  • SVG使用element来克隆SVG

    是否可以在单独的 svg 中 使用 整个其他 svg 我想使用 d3 生成的地图作为同一页面上的图标 这是我尝试过的 但它不起作用
  • Angualr2 错误:无法设置仅具有 getter 的 # 的属性值

    表格看起来像
  • IntelliJ + JUnit 5(木星)

    My build gradle has testCompile org junit jupiter junit jupiter api 5 0 0 使用标准示例http junit org junit5 docs current user
  • XmlSerializer 和 IEnumerable:可以进行序列化,无需无参数构造函数:Bug?

    在我们的项目中 我们广泛使用 XmlSerializer 偶然我发现了一个没有无参数构造函数的类 我认为这一定会破坏序列化过程 但事实并非如此 通过调查这个问题 我发现 XmlSerializer 在序列化 反序列化时表现得很奇怪IE可枚举
  • 在 git 中为每个部署构建创建一个标签是一个好习惯吗?

    我刚刚从 Subversion 切换到 Git Subversion 的集中式架构为其提供了一个有意义的修订号 我将其构建到基于 Web 的应用程序的更改日志中 以便轻松登录并查看任何给定服务器上正在运行的版本 Git 没有友好的内部版本号
  • 如何使用 REST API 为领事附加手表?

    我使用 REST API 来访问领事 例如 这是我创建条目的方法 curl X PUT d localhost 8500 v1 kv example lt lt lt FooValue 我想添加watches当键值更改时通知我的服务的领事
  • 将函数发布到门户后,Azure 函数在函数列表中不可见

    我是Azure函数的新手 在函数发布到门户后发现这里 但它在函数列表中不可见 我附上了示例代码的快照和一个空的天蓝色列表 请帮忙 添加kudu ui 这里我找到了 wwwroot下唯一的host json Hi All 添加了kudu ui
  • 如何定义 Airflow 上 STFP Operator 的操作?

    class SFTPOperation object PUT put GET get operation SFTPOperation GET NameError name SFTPOperation is not defined 我在这里定