使用airflow的DataflowPythonOperator安排数据流作业时出错

2024-04-09

我正在尝试使用airflow 的DataflowPythonOperator 来安排数据流作业。这是我的 dag 运算符:

test = DataFlowPythonOperator(
    task_id = 'my_task',
    py_file = 'path/my_pyfile.py',
    gcp_conn_id='my_conn_id',
    dataflow_default_options={
        "project": 'my_project',
        "runner": "DataflowRunner",
        "job_name": 'my_job',
        "staging_location": 'gs://my/staging', 
        "temp_location": 'gs://my/temping',
        "requirements_file": 'path/requirements.txt'
  }
)

gcp_conn_id 已设置并且可以工作。 错误显示数据流失败,返回代码为 1。完整日志如下。

[2018-07-05 18:24:39,928] {gcp_dataflow_hook.py:108} INFO - Start waiting for DataFlow process to complete.
[2018-07-05 18:24:40,049] {base_task_runner.py:95} INFO - Subtask: 
[2018-07-05 18:24:40,049] {models.py:1433} ERROR - DataFlow failed with return code 1
[2018-07-05 18:24:40,050] {base_task_runner.py:95} INFO - Subtask: Traceback (most recent call last):
[2018-07-05 18:24:40,050] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1390, in run
[2018-07-05 18:24:40,050] {base_task_runner.py:95} INFO - Subtask: result = task_copy.execute(context=context)
[2018-07-05 18:24:40,050] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/dataflow_operator.py", line 182, in execute
[2018-07-05 18:24:40,050] {base_task_runner.py:95} INFO - Subtask: self.py_file, self.py_options)
[2018-07-05 18:24:40,050] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 152, in start_python_dataflow
[2018-07-05 18:24:40,050] {base_task_runner.py:95} INFO - Subtask: task_id, variables, dataflow, name, ["python"] + py_options)
[2018-07-05 18:24:40,051] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 138, in _start_dataflow
[2018-07-05 18:24:40,051] {base_task_runner.py:95} INFO - Subtask: _Dataflow(cmd).wait_for_done()
[2018-07-05 18:24:40,051] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 119, in wait_for_done
[2018-07-05 18:24:40,051] {base_task_runner.py:95} INFO - Subtask: self._proc.returncode))
[2018-07-05 18:24:40,051] {base_task_runner.py:95} INFO - Subtask: Exception: DataFlow failed with return code 1

gcp_dataflow_hook.py 似乎有问题,除此之外没有更多信息。有什么方法可以解决这个问题吗?有 DataflowPythonOperator 的示例吗?(到目前为止我找不到任何使用案例)


我没有收到相同的错误消息,但我认为这可能会有所帮助。 python Dataflow 运行程序似乎以一种奇怪的方式终止,该方式不会影响独立的 Dataflow 作业,但无法由 DataFlowPythonOperator python Airflow 类正确处理。我正在提交票证,但这里有一个解决方法可以解决我的问题。重要的!该补丁必须应用于 Dataflow 作业而不是 Airflow 作业。

在数据流作业的顶部添加以下导入

import threading
import time
import types   
from apache_beam.runners.runner import PipelineState

接下来在数据流代码上方添加以下内容。这主要是从主 ~dataflow.dataflow_runner 类中剪切和粘贴的,并带有注释编辑

def local_poll_for_job_completion(runner, result, duration):
    """Polls for the specified job to finish running (successfully or not).
    Updates the result with the new job information before returning.
    Args:
      runner: DataflowRunner instance to use for polling job state.
      result: DataflowPipelineResult instance used for job information.
      duration (int): The time to wait (in milliseconds) for job to finish.
        If it is set to :data:`None`, it will wait indefinitely until the job
        is finished.
    """
    last_message_time = None
    current_seen_messages = set()

    last_error_rank = float('-inf')
    last_error_msg = None
    last_job_state = None
    # How long to wait after pipeline failure for the error
    # message to show up giving the reason for the failure.
    # It typically takes about 30 seconds.
    final_countdown_timer_secs = 50.0
    sleep_secs = 5.0

    # Try to prioritize the user-level traceback, if any.
    def rank_error(msg):
        if 'work item was attempted' in msg:
            return -1
        elif 'Traceback' in msg:
            return 1
        return 0

    if duration:
        start_secs = time.time()
        duration_secs = duration // 1000

    job_id = result.job_id()
    keep_checking = True  ### Changed here!!!
    while keep_checking:  ### Changed here!!!
        response = runner.dataflow_client.get_job(job_id)
        # If get() is called very soon after Create() the response may not contain
        # an initialized 'currentState' field.
        logging.info("Current state: " + str(response.currentState))
        # Stop looking if the job is not terminating normally
        if str(response.currentState) in (  ### Changed here!!!
                'JOB_STATE_DONE',  ### Changed here!!!
                'JOB_STATE_CANCELLED',  ### Changed here!!!
                # 'JOB_STATE_UPDATED',
                'JOB_STATE_DRAINED',  ### Changed here!!!
                'JOB_STATE_FAILED'):  ### Changed here!!!
            keep_checking = False  ### Changed here!!!
            break
        if response.currentState is not None:
            if response.currentState != last_job_state:
                logging.info('Job %s is in state %s', job_id, response.currentState)
                last_job_state = response.currentState
            if str(response.currentState) != 'JOB_STATE_RUNNING':
                # Stop checking for new messages on timeout, explanatory
                # message received, success, or a terminal job state caused
                # by the user that therefore doesn't require explanation.
                if (final_countdown_timer_secs <= 0.0
                        or last_error_msg is not None
                        or str(response.currentState) == 'JOB_STATE_UPDATED'):  ### Changed here!!!
                    keep_checking = False  ### Changed here!!!
                    break

                # Check that job is in a post-preparation state before starting the
                # final countdown.
                if (str(response.currentState) not in (
                        'JOB_STATE_PENDING', 'JOB_STATE_QUEUED')):
                    # The job has failed; ensure we see any final error messages.
                    sleep_secs = 1.0      # poll faster during the final countdown
                    final_countdown_timer_secs -= sleep_secs

        time.sleep(sleep_secs)

        # Get all messages since beginning of the job run or since last message.
        page_token = None
        while True:
            messages, page_token = runner.dataflow_client.list_messages(
                job_id, page_token=page_token, start_time=last_message_time)
            for m in messages:
                message = '%s: %s: %s' % (m.time, m.messageImportance, m.messageText)

                if not last_message_time or m.time > last_message_time:
                    last_message_time = m.time
                    current_seen_messages = set()

                if message in current_seen_messages:
                    # Skip the message if it has already been seen at the current
                    # time. This could be the case since the list_messages API is
                    # queried starting at last_message_time.
                    continue
                else:
                    current_seen_messages.add(message)
                # Skip empty messages.
                if m.messageImportance is None:
                    continue
                logging.info(message)
                if str(m.messageImportance) == 'JOB_MESSAGE_ERROR':
                    if rank_error(m.messageText) >= last_error_rank:
                        last_error_rank = rank_error(m.messageText)
                        last_error_msg = m.messageText
            if not page_token:
                break

        if duration:
            passed_secs = time.time() - start_secs
            if passed_secs > duration_secs:
                logging.warning('Timing out on waiting for job %s after %d seconds',
                                job_id, passed_secs)
                break

    result._job = response
    runner.last_error_msg = last_error_msg


def local_is_in_terminal_state(self):
    logging.info("Current Dataflow job state: " + str(self.state))
    logging.info("Current has_job: " + str(self.has_job))
    if self.state in ('DONE', 'CANCELLED', 'DRAINED', 'FAILED'):
        return True
    else:
        return False


class DataflowRuntimeException(Exception):
    """Indicates an error has occurred in running this pipeline."""

    def __init__(self, msg, result):
        super(DataflowRuntimeException, self).__init__(msg)
        self.result = result


def local_wait_until_finish(self, duration=None):
    logging.info("!!!!!!!!!!!!!!!!You are in a Monkey Patch!!!!!!!!!!!!!!!!")
    if not local_is_in_terminal_state(self):  ### Changed here!!!
        if not self.has_job:
            raise IOError('Failed to get the Dataflow job id.')

        # DataflowRunner.poll_for_job_completion(self._runner, self, duration)
        thread = threading.Thread(
            target=local_poll_for_job_completion,  ### Changed here!!!
            args=(self._runner, self, duration))

        # Mark the thread as a daemon thread so a keyboard interrupt on the main
        # thread will terminate everything. This is also the reason we will not
        # use thread.join() to wait for the polling thread.
        thread.daemon = True
        thread.start()
        while thread.isAlive():
            time.sleep(5.0)

        terminated = local_is_in_terminal_state(self)  ### Changed here!!!
        logging.info("Terminated state: " + str(terminated))
        # logging.info("duration: " + str(duration))
        # assert duration or terminated, (  ### Changed here!!!
        #     'Job did not reach to a terminal state after waiting indefinitely.')  ### Changed here!!!

        assert terminated, "Timed out after duration: " + str(duration)  ### Changed here!!!

    else:  ### Changed here!!!
        assert False, "local_wait_till_finish failed at the start"  ### Changed here!!!

    if self.state != PipelineState.DONE:
        # TODO(BEAM-1290): Consider converting this to an error log based on
        # theresolution of the issue.
        raise DataflowRuntimeException(
            'Dataflow pipeline failed. State: %s, Error:\n%s' %
            (self.state, getattr(self._runner, 'last_error_msg', None)), self)

    return self.state

然后当你启动管道时使用约定 (不是“with beam.Pipeline(options=pipeline_options) p:”版本)

p = beam.Pipeline(options=pipeline_options)

最后,当您的管道建成后,请使用以下命令

result = p.run()
# Monkey patch to better handle termination
result.wait_until_finish = types.MethodType(local_wait_until_finish, result)
result.wait_until_finish()

Note:如果您运行的是气流服务器 v1.9,就像我使用 1.10 补丁文件一样,此修复程序仍然无法解决问题。 _Dataflow.wait_for_done 的补丁文件函数没有返回 job_id,但它也需要。补丁的补丁比上面的还差。如果可以的话升级一下。如果您无法使用最新文件将以下代码作为标题粘贴到 Dag 脚本中,那么它应该可以工作。气流/contrib/hooks/gcp_api_base_hook.py、气流/contrib/hooks/gcp_dataflow_hook.py 和 气流/contrib/operators/dataflow_operator.py

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用airflow的DataflowPythonOperator安排数据流作业时出错 的相关文章

随机推荐

  • Javacard 中的 ECDSA 签名

    我正在 Javacard 中使用 ECDSA 实现签名代码 我的代码在异常部分输出 0x0003 NO SUCH ALGORITHM 这意味着该卡不支持该算法 我不明白这一点 因为我的供应商告诉我它支持 ECC 我的结论是 我不知道如何使用
  • org.json.JSONException:名称没有值

    下面的代码中出现此错误的原因可能是什么 loginButton setOnClickListener new View OnClickListener Override public void onClick View v final St
  • 获取系统中已安装的应用程序

    如何使用c 代码获取系统中安装的应用程序 遍历注册表项 SOFTWARE Microsoft Windows CurrentVersion Uninstall 似乎可以提供已安装应用程序的完整列表 除了下面的示例之外 您还可以找到与我所做的
  • SKPhysicsJoint:接触和碰撞不起作用

    在 IOS7 1 上 使用 SpriteKit 我创建了两个简单的矩形精灵以及相应的物理体 我设置了接触和碰撞位掩码 所有工作都完全符合我的预期 检测到接触并且碰撞防止两个矩形重叠 但是 当我创建 SKPhysicsJointSpring
  • java中相关对象的序列化

    假设我有 A B 和 C 类型的对象 我有 3 个 Map 分别包含 A B 和 C 的所有实例 在内部 A和B都有C的Map 我希望能够随时存储和恢复应用程序的状态 因此 直到今天我总是序列化类似金字塔的应用程序 我会在顶部对象上调用序列
  • PostgreSQL Sqlalchemy 提交需要大量时间

    当我尝试将更改提交到表中时 需要花费大量时间 每 1000 行大约 300 秒 型号类别 class Quotes base tablename quotes id Column INTEGER primary key True autoi
  • Apache + mod_wsgi 与 nginx + Gunicorn

    我想部署一个django站点 它是github上的开源edx代码 我面临着使用之间的选择 Apache 与 mod wsgi nginx 与 Gunicorn 我已经将 Apache 与 mod wsgi 一起使用 它很酷 但我对第二个选项
  • SQL Server Reporting Services 对聚合数据运行总计

    每个人 在 SSRS 中 我们有 2 列 如下所示 Sales Running Sales 5 00 5 00 3 00 8 00 1 00 9 00 区别在于 第一列 销售额 是一个分组行 因此要获取每行的销售额总计 我们使用 Sum F
  • Mathematica:为什么 3D 绘图会记住最后的视点/旋转,即使在再次评估后也是如此?

    我觉得这有点烦人 我制作了一个 3D 绘图 最初它以默认方向出现 然后 我使用鼠标以某种方式旋转它 现在我再次运行该命令 期望获得原始形状 即通过鼠标旋转它之前的原始方向 但相反 它只是给了我与屏幕上相同的绘图 即它似乎保留 记住了该输出单
  • 从函数返回结果(javascript、nodejs)

    谁能帮我处理这段代码吗 我需要从 routeToRoom 函数返回一个值 var sys require sys function routeToRoom userId passw var roomId 0 var nStore requi
  • Memcache 统计数据理解

    Memcache telnet 接口有命令称为STATS 它显示了很多数字 我在哪里可以看到它的含义 如何分析它们 多少缓存使用是有效的等等 更新的文档位于https github com memcached memcached blob
  • 在我更改 PHP 设置后,gzopen 函数不存在 [关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 使用新的 PHP 5 4gzopen
  • Firebase 推送通知不适用于 iOS

    我想使用 Firebase Cloud Messaging 实现推送通知 我已经按照说明设置了我的项目并上传了 APN 证书 我正在使用发送测试消息fcmtoken到我的真实设备 我在AppDelegate中的配置如下 func appli
  • 是否可以在不同的类中编写/包装异常处理组件(try、catch)?

    这是关于将异常处理逻辑包装在某种类中 在写c 的时候 代码中 很多时候我们需要根据客户端抛出的异常来捕获许多类型 变体 这导致我们在 catch 子句中 多次 编写类似类型的代码 在下面的示例中 我编写了 function 它可以以多种可能
  • Access 2010 SQL 查询仅在完整单词的字符串中查找部分匹配

    我希望这是一个简单的 我只是找不到如何获得我想要的结果 也许我在 SQL 中使用了错误的关键字 我正在搜索包含全名字段的员工表 该字段可以是 Sam 或 Evans 先生 或 Sam Evans 先生 我正在尝试查找与另一个包含名称字段的名
  • RecyclerView onClick 无法正常工作?

    我在片段中使用 RecyclerView 来显示带有网格格式文本的图像 Recycler 视图 grid item xml 如下所示
  • 队列管理和新线程

    在 Net 4 0 框架上使用 C 我有一个 Windows 窗体主线程 迄今为止唯一的一个 等待文件系统事件 然后必须对这些事件提供的文件启动一些预定义的处理 我计划执行以下操作 A1 当主进程启动时立即创建一个单独的线程 A2 让主线程
  • python中按特定顺序读取文件

    假设我的文件夹中有三个文件 file9 txt file10 txt 和 file11 txt 我想按这个特定顺序读取它们 谁能帮我这个 现在我正在使用代码 import glob os for infile in glob glob os
  • 无法获取 OLEObject 类的 Object 属性 - Excel Interop

    我用谷歌搜索了这个问题 但未能找到解决方案 如果文件保存为 xls 而不是 xlsm 则此代码有效 我使用的是 Office 2013 32 位 我编写了一个 COM 公开的 C 类库 Excel工作簿实例化一个对象并传入对当前工作簿的引用
  • 使用airflow的DataflowPythonOperator安排数据流作业时出错

    我正在尝试使用airflow 的DataflowPythonOperator 来安排数据流作业 这是我的 dag 运算符 test DataFlowPythonOperator task id my task py file path my