如何从气流传感器中提取 xcom 值?

2024-02-08

主要问题:我正在尝试创建一个 BigQuery 表(如果不存在)。

方法:使用 BigQueryTableSensor 检查表是否存在,并根据返回值,使用 BigQueryCreateEmptyTableOperator 创建或不创建新表。

Problem:我无法使用 xcom 获取 BigQueryTableSensor 传感器的返回值。我们知道,poke方法需要返回一个布尔值。

这就是我创建任务的方式:

check_if_table_exists = BigQueryTableSensor(
        task_id='check_if_table_exists',
        project_id='my_project',
        dataset_id='my_dataset',
        table_id='my_table',
        bigquery_conn_id='bigquery_default',
        timeout=120,
        do_xcom_push=True,
    )

# Output: INFO - Success criteria met. Exiting.

get_results = BashOperator(
        task_id='get_results',
        bash_command="echo {{ ti.xcom_pull(task_ids='check_if_table_exists') }}"
    )

# Output: INFO - Running command: echo None

查看 Airflow 界面,我检查 BigQueryTableSensor 没有推送任何内容:(

问题:

  • 有没有办法获得传感器的返回值?

  • 有更好的方法来解决我的主要问题吗?也许使用 BigQueryOperator 和像“CREATE TABLE IF NOT EXISTS”这样的 SQL 查询。


是的,这是可能的,我让它像这样工作:

class MyCustomSensor(BaseSensorOperator):

    @apply_defaults
    def __init__(self,
                 *args,
                 **kwargs):
        super(MyCustomSensor, self).__init__(*args, **kwargs)

    def poke(self, context):
        application_id = context['ti'].xcom_pull(key='application_id')
        print("We found " + application_id)
        return True

这是一个完整的 DAG 示例:

import os
import sys
from datetime import datetime
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults


dag = DAG('my_dag_name',
          description='DAG ',
          schedule_interval=None,
          start_date=datetime(2021, 1, 7),
          tags=["samples"],
          catchup=False)

class MyCustomSensor(BaseSensorOperator):

    @apply_defaults
    def __init__(self,
                 *args,
                 **kwargs):
        super(MyCustomSensor, self).__init__(*args, **kwargs)

    def poke(self, context):
        application_id = context['ti'].xcom_pull(key='application_id')
        print("We found " + application_id)
        return True


def launch_spark_job(**kwargs):
    application_id = "application_1613995447156_11473"
    kwargs['ti'].xcom_push(key='application_id', value=application_id)


launch_spark_job_op = PythonOperator(task_id='test_python',
                                     python_callable=launch_spark_job,
                                     provide_context=True,
                                     dag=dag)

wait_spark_job_sens = MyCustomSensor(task_id='wait_spark_job',
                                     dag=dag,
                                     mode="reschedule")

launch_spark_job_op >> wait_spark_job_sens
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从气流传感器中提取 xcom 值? 的相关文章

随机推荐

  • 找到两点之间角度的最快方法

    为了提高查找角度正弦 余弦的速度 我构建了一个参考表 而不是即时计算它们 我对从一点到另一点求角度也有同样的想法 我创建了一个包含 3600 个归一化向量的表 3600 10 十分之一度的精度 每当我需要知道从一个点到下一个点的角度时 我都
  • 从子片段访问父片段方法

    我正在尝试从以下位置访问Map分片至myfunction 位于父片段中Tabs 我尝试了以下行Tabs parentFragment Tabs getParentFragment 但是parentFragment 为空 我阅读了有关该主题的
  • 计算单元格中值的数量(空白除外)

    我有一个使用允许多项选择的下拉列表的单元格 如何计算选择的数量 我用过这个 LEN A2 LEN 替换 A2 1 但这并不能解释空白或没有选择的情况 如果没有选择 如何才能显示0 如果您的选择没有空格 则用逗号替换空格 修剪结果 然后按照上
  • 每次在嵌套条件 swift ios 中都会弹出 AlertController

    我定义了一个警报控制器 当用户名或密码不正确时 警报应该弹出 并且工作正常 但是当用户名和密码匹配时 尽管匹配 但每次登录时都会弹出 我想我没有以正确的方式定义嵌套条件 帮我对多重嵌套条件进行排序 登录代码 import UIKit imp
  • 如何使用 ReportViewer 设计将两个数据表添加到 Tablix

    我正在尝试使用 ReportViewer 将两个数据表添加到报告中的表 tablix 中 数据表 dt程序 名称 描述 dt改进 改进 我将有一个程序列表以及每个程序的改进列表 这是我正在寻找的示例 我不知道如何设计我的 ReportVie
  • 为什么将框架文件夹放在公共根目录之外更安全?

    为什么总是建议将框架文件放置在公共根目录之外 鉴于有时框架没有 ini or inc可以用浏览器打开的文件 好吧 肯定没什么可做的gained将框架源放置在 Web 根目录中 由于可以自由选择放置文件的位置 因此使用最小特权原则 http
  • 如何在 PhantomJS 中转到下一页进行抓取

    我正在尝试从具有多个页面的网站中获取多个元素 我目前正在使用 PhantomJS 来完成这项工作 我的代码几乎可以工作 但问题是我的代码在第一页上刮了两次 即使 根据日志 似乎我已经移到了第二页 这是代码 var page require
  • 获取jstree的已检查节点ID列表[重复]

    这个问题在这里已经有答案了 我是新来的jstree and jQuery并且在我的测试树中进行节点检查时遇到了一些问题 用户首先应勾选自己需要的节点 然后点击 概括 按钮以获取警报窗口中已检查节点的 ID 列表 我还想导出 ID 列表以供进
  • VB.NET 中的 WScript?

    这是我的程序中的一段代码 WSHShell WScript CreateObject WScript Shell 但由于某种原因 WScript 没有声明 我知道这段代码可以在 VBScript 中运行 但我正在尝试让它在 vb net 中
  • Python 中的机器 Epsilon

    我目前正在学习的一本手册 我是新手 说 相差小于机器 epsilon 的数字在数值上是相同的 使用 Python 可以通过键入获得浮点值的机器 epsilon eps numpy finfo float eps 现在 如果我检查 1 eps
  • 连接时 , 和 + 有什么区别?

    过去几个月我一直在用 c 编码 但每次连接时我总是对逗号之间的区别感到困惑 和加号 有时 适用于连接 其他时候 用来 我真的不明白其中的区别 请帮帮我 这是代码 class Faculty string firstName lastName
  • Eclipse 上的 Golang:Mac 上“资源没有相应的 Go 包”

    如标题所述 我在 Eclipse 上运行 Golang 代码时遇到问题 我目前使用的是Mac 我使用homebrew安装了go 目前 go安装的文件夹如下 usr local Cellar go 1 5 2 运行终端并输入后open bas
  • 如何刷新页面并保持元素不刷新(持久)直到用户单击提交?

    我正在寻找一种 jQuery 方法来在刷新页面时将页面元素保留在用户屏幕上 当我刷新页面并且丢失 jQuery 页面中的内容时 它会发生变化 我需要该页面是持久的 如何刷新页面并保持元素不刷新 持久 直到用户单击提交按钮 如何才能做到这一点
  • 在 android 中,相机“camera.setParameters”失败

    at android hardware Camera native setParameters Native Method at android hardware Camera setParameters Camera java 647 a
  • Microsoft VBScript 运行时错误:输入超过文件结尾错误

    我收到此错误 C se2 vbs 28 6 Microsoft VBScript 运行时错误 输入超出文件结尾 当我运行脚本时 第 28 行斜体 Dim strInput Dim filesys Dim path Set filesys C
  • Flutter Web:如何在 Flutter Web 应用程序中禁用浏览器的后退按钮

    成功登录后 用户重定向到主页 但当用户单击浏览器后退按钮时 它很容易重定向到登录屏幕 我应该怎么做才能禁用向后重定向 class SecondPage extends StatelessWidget override Widget buil
  • HTTP_ORIGIN 的安全性如何?

    我想查明来自第三方网站的传入 HTTP REQUEST 调用是否来自我定义的域列表 我知道可以使用HTTP REFERER来查找第三方域在哪里 但它不够安全 人们可以欺骗它或使用 Telnet 来伪造它 那么 HTTP ORIGIN 怎么样
  • 在互联网上哪里可以找到直方图差异算法?

    我在哪里可以找到互联网上的直方图差异算法 我想比较两个文本并找出差异 目前我正在尝试耐心差异算法 但这并不那么有效 我听说直方图差异算法是耐心差异的改进版本 The git软件实现了耐心和直方图差异算法 也可以看看 git diff pat
  • python OpenCv IMREAD_UNCHANGED 只返回三个通道

    我试图找出我的代码有什么问题 我想加载包含 Alpha 通道的图像 官方网站的描述如下 cv IMREAD UNCHANGED 如果设置 则按原样返回加载的图像 带有 Alpha 通道 否则会被裁剪 这是我的尝试 import cv2 as
  • 如何从气流传感器中提取 xcom 值?

    主要问题 我正在尝试创建一个 BigQuery 表 如果不存在 方法 使用 BigQueryTableSensor 检查表是否存在 并根据返回值 使用 BigQueryCreateEmptyTableOperator 创建或不创建新表 Pr