如何暂停或恢复 celery 任务?

2024-06-19

我的项目中有一项要求,客户可以暂停或恢复正在挂起的流程,而不是流程流程。我在用网络套接字显示芹菜任务结果,但在暂停/恢复时我不明白如何设计代码。我想到的唯一方法就是revoke暂停请求中的任务,同时保留数据撤销的过程在缓存中,并稍后在resume api 中使用该缓存来再次启动 celery 任务。通过使用这种方法,我的 Web 套接字设计流程受到干扰,因为我通过 Websocket 轮询任务处理状态,并且当没有进程时我发送了一个完成 true 标志以关闭连接。为了了解哪个任务正在处理或待处理,我添加了一个单独的表用于任务映射,并在执行最后一个任务时刷新该表。请帮助我设计出完美的设计,如果我遗漏了什么,也请指出。


我想演示实现可暂停(和可恢复)的通用方法ongoing通过工作流模式执行 celery 任务。

Concept

With 芹菜工作流程 https://docs.celeryproject.org/en/stable/userguide/canvas.html- 您可以将整个操作设计为分为chain https://docs.celeryproject.org/en/stable/userguide/canvas.html#chains的任务。它不一定必须是纯粹的链,但它应该遵循一个任务接着另一个任务(或任务)的一般概念group https://docs.celeryproject.org/en/stable/userguide/canvas.html#groups)完成。

一旦你有了这样的工作流程,你就可以最后定义在整个工作流程中暂停的点。在each其中,您可以检查前端用户是否有请求暂停操作并继续相应操作。这个概念是这样的:-

一个复杂且耗时的操作 O 被分为 5 个 celery 任务 - T1、T2、T3、T4 和 T5 - 每个任务(第一个任务除外)都依赖于前一个任务的返回值。

假设我们定义了暂停点在每一项任务之后,所以工作流程看起来像 -

  • T1执行
  • T1 completes, check if user has requested pause
    • 如果用户未请求暂停 - 继续
    • 如果用户请求暂停,连载 the 剩余工作流程链并将其存储在某个地方以便稍后继续

... 等等。由于每个任务之后都有一个暂停点,因此在每个任务之后都会执行该检查(当然最后一个任务除外)。

但这只是理论,我很难在网上的任何地方找到它的实现,所以这就是我想出的-

执行

from typing import Any, Optional

from celery import shared_task
from celery.canvas import Signature, chain, signature

@shared_task(bind=True)
def pause_or_continue(
    self, retval: Optional[Any] = None, clause: dict = None, callback: dict = None
):
    # Task to use for deciding whether to pause the operation chain
    if signature(clause)(retval):
        # Pause requested, call given callback with retval and remaining chain
        # chain should be reversed as the order of execution follows from end to start
        signature(callback)(retval, self.request.chain[::-1])
        self.request.chain = None
    else:
        # Continue to the next task in chain
        return retval


def tappable(ch: chain, clause: Signature, callback: Signature, nth: Optional[int] = 1):
    '''
    Make a operation workflow chain pause-able/resume-able by inserting
    the pause_or_continue task for every nth task in given chain

    ch: chain
        The workflow chain

    clause: Signature
        Signature of a task that takes one argument - return value of
        last executed task in workflow (if any - othewise `None` is passsed)
        - and returns a boolean, indicating whether or not the operation should continue

        Should return True if operation should continue normally, or be paused

    callback: Signature
        Signature of a task that takes 2 arguments - return value of
        last executed task in workflow (if any - othewise `None` is passsed) and
        remaining chain of the operation workflow as a json dict object
        No return value is expected

        This task will be called when `clause` returns `True` (i.e task is pausing)
        The return value and the remaining chain can be handled accordingly by this task

    nth: Int
        Check `clause` after every nth task in the chain
        Default value is 1, i.e check `clause` after every task
        Hence, by default, user given `clause` is called and checked
        after every task

    NOTE: The passed in chain is mutated in place
    Returns the mutated chain
    '''
    newch = []
    for n, sig in enumerate(ch.tasks):
        if n != 0 and n % nth == nth - 1:
            newch.append(pause_or_continue.s(clause=clause, callback=callback))
        newch.append(sig)
    ch.tasks = tuple(newch)
    return ch

解释 -pause_or_continue

Here pause_or_continue就是前面提到的暂停点。这是一个将以特定间隔调用的任务(间隔如任务间隔,而不是时间间隔)。然后该任务调用用户提供的函数(实际上是一个任务) -clause- 检查任务是否应该继续。

If the clause函数(实际上是一个任务)返回True,用户提供的callback函数被调用时,最新的返回值(如果有 -None否则)被传递到此回调,以及剩余任务链. The callback做它需要做的事情并且pause_or_continue sets self.request.chain to None,它告诉 celery“任务链现在是空的 - 一切都完成了”。

If the clause函数(实际上是一个任务)返回False,前一个任务的返回值(如果有 -None否则)将返回以供下一个任务接收 - 并且链条继续。因此工作流程继续进行。

Why are clause and callback任务签名而不是常规函数?

Both clause and callback正在被召唤directly- 没有delay or apply_async。它在当前进程、当前上下文中执行。所以它的行为与普通函数完全相同,那么为什么要使用signatures https://docs.celeryproject.org/en/stable/userguide/canvas.html#signatures?

答案是序列化。您无法方便地将常规函数对象传递给 celery 任务。但是你can传递任务签名。这正是我在这里所做的。两个都clause and callback应该是一个regular signaturecelery 任务的对象。

What is self.request.chain?

self.request.chain存储一个字典列表(代表 json,因为 celery 任务序列化器默认为 json)——每个字典代表一个任务签名。该列表中的每个任务都按相反顺序执行。这就是为什么在传递给用户之前将列表颠倒过来callback函数(实际上是一个任务)——用户可能希望任务的顺序是从左到右。

快速说明:与此讨论无关,但如果您使用link参数来自apply_async构建一条链而不是chain原始本身。self.request.callback是要修改的属性(即设置为None删除回调和停止链)而不是self.request.chain

解释 -tappable

tappable只是一个基本函数,它采用一个链(为简洁起见,这是这里介绍的唯一工作流程原语)并插入pause_or_continue每次之后nth任务。您可以将它们插入到您真正想要的任何位置,由您在操作中定义暂停点。这只是一个例子!

对于每个chain对象,任务的实际签名(按从左到右的顺序)存储在.tasks财产。它是tuple任务签名。因此,我们所要做的就是将此元组转换为列表,插入暂停点并转换回元组以分配给链。然后返回修改后的链对象。

The clause and callback还附有pause_or_continue签名。正常的芹菜东西。

这涵盖了主要概念,但为了展示使用此模式的真实项目(以及展示暂停任务的恢复部分),这里有一个包含所有必要资源的小演示

Usage

此示例使用假设具有数据库的基本 Web 服务器的概念。每当一个操作(即工作流链)启动时,分配了一个id并存储到数据库中。该表的架构看起来像 -

-- Create operations table
-- Keeps track of operations and the users that started them
CREATE TABLE operations (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  requester_id INTEGER NOT NULL,
  completion TEXT NOT NULL,
  workflow_store TEXT,
  result TEXT,
  FOREIGN KEY (requester_id) REFERENCES user (id)
);

目前唯一需要了解的领域是completion。它只存储操作的状态-

  • 当操作开始并创建数据库条目时,它被设置为IN PROGRESS
  • 当用户请求暂停时,路由控制器(即视图)将其修改为REQUESTING PAUSE
  • 当操作实际上暂停并且callback (from tappable, 里面pause_or_continue) 被称为,callback应该将其修改为PAUSED
  • 任务完成后,应将其修改为COMPLETED

一个例子clause

@celery.task()
def should_pause(_, operation_id: int):
    # This is the `clause` to be used for `tappable`
    # i.e it lets celery know whether to pause or continue
    db = get_db()

    # Check the database to see if user has requested pause on the operation
    operation = db.execute(
        "SELECT * FROM operations WHERE id = ?", (operation_id,)
    ).fetchone()
    return operation["completion"] == "REQUESTING PAUSE"

这是在暂停点调用的任务,以确定是否暂停。这是一个需要 2 个参数的函数......好吧。第一个是强制性的,tappable requires the clause有一个(并且恰好一个)参数 - 因此它可以将前一个任务的返回值传递给它(即使该返回值是None)。在这个例子中,不需要使用返回值 - 所以我们可以忽略它。

第二个参数是操作id。看,这一切clause作用 - 检查数据库中的操作(工作流程)条目并查看其状态是否REQUESTING PAUSE。为此,它需要知道操作 ID。但clause应该是一项只有一个参数的任务,什么给出了?

好在签名可以是部分的。当任务第一次开始并且tappable链已创建。操作 IDis known因此我们可以做should_pause.s(operation_id)获取任务的签名one参数,即前一个任务的返回值。这有资格作为clause!

一个例子callback

import os
import json
from typing import Any, List

@celery.task()
def save_state(retval: Any, chains: dict, operation_id: int):
    # This is the `callback` to be used for `tappable`
    # i.e this is called when an operation is pausing
    db = get_db()

    # Prepare directories to store the workflow
    operation_dir = os.path.join(app.config["OPERATIONS"], f"{operation_id}")
    workflow_file = os.path.join(operation_dir, "workflow.json")
    if not os.path.isdir(operation_dir):
        os.makedirs(operation_dir, exist_ok=True)
    
    # Store the remaining workflow chain, serialized into json
    with open(workflow_file, "w") as f:
        json.dump(chains, f)

    # Store the result from the last task and the workflow json path
    db.execute(
        """
        UPDATE operations
        SET completion = ?,
            workflow_store = ?,
            result = ?
        WHERE id = ?
        """,
        ("PAUSED", workflow_file, f"{retval}", operation_id),
    )
    db.commit()

这是任务执行时要调用的任务正在暂停。请记住,这应该采用最后执行的任务的返回值和剩余的签名列表(按从左到右的顺序)。有一个额外的参数 -operation_id- 再次。对此的解释与clause.

此函数将剩余的链存储在 json 文件中(因为它是字典列表)。请记住,您可以使用不同的序列化器 - 我使用 json,因为它是 celery 使用的默认任务序列化器。

存储剩余链后,它更新completion状态为PAUSED还将 json 文件的路径记录到数据库中。

现在,让我们看看这些的实际效果——

启动工作流程的示例

def start_operation(user_id, *operation_args, **operation_kwargs):
    db = get_db()
    operation_id: int = db.execute(
        "INSERT INTO operations (requester_id, completion) VALUES (?, ?)",
        (user_id, "IN PROGRESS"),
    ).lastrowid
    # Convert a regular workflow chain to a tappable one
    tappable_workflow = tappable(
        (T1.s() | T2.s() | T3.s() | T4.s() | T5.s(operation_id)),
        should_pause.s(operation_id),
        save_state.s(operation_id),
    )
    # Start the chain (i.e send task to celery to run asynchronously)
    tappable_workflow(*operation_args, **operation_kwargs)
    db.commit()
    return operation_id

接收用户 ID 并启动操作工作流的函数。这或多或少是一个围绕视图/路由控制器建模的不切实际的虚拟函数。但我认为它传达了总体想法。

Assume T[1-4]是操作的所有单元任务,每个任务都将前一个任务的返回作为参数。这只是普通芹菜链的一个例子,您可以随意使用您的芹菜链。

T5是保存最终结果的任务(结果来自T4) 到数据库。所以随着返回值T4它需要operation_id。哪个被传递到签名中。

暂停工作流程的示例

def pause(operation_id):
    db = get_db()

    operation = db.execute(
        "SELECT * FROM operations WHERE id = ?", (operation_id,)
    ).fetchone()

    if operation and operation["completion"] == "IN PROGRESS":
        # Pause only if the operation is in progress
        db.execute(
            """
            UPDATE operations
            SET completion = ?
            WHERE id = ?
            """,
            ("REQUESTING PAUSE", operation_id),
        )
        db.commit()
        return 'success'

    return 'invalid id'

这采用了前面提到的修改数据库条目来更改的概念completion to REQUESTING PAUSE。一旦承诺,下次pause_or_continue calls should_pause,它会知道用户已请求暂停操作,并且会相应地执行操作。

恢复工作流程的示例

def resume(operation_id):
    db = get_db()

    operation = db.execute(
        "SELECT * FROM operations WHERE id = ?", (operation_id,)
    ).fetchone()

    if operation and operation["completion"] == "PAUSED":
        # Resume only if the operation is paused
        with open(operation["workflow_store"]) as f:
            # Load the remaining workflow from the json
            workflow_json = json.load(f)
        # Load the chain from the json (i.e deserialize)
        workflow_chain = chain(signature(x) for x in serialized_ch)
        # Start the chain and feed in the last executed task result
        workflow_chain(operation["result"])

        db.execute(
            """
            UPDATE operations
            SET completion = ?
            WHERE id = ?
            """,
            ("IN PROGRESS", operation_id),
        )
        db.commit()
        return 'success'

    return 'invalid id'

回想一下,当操作暂停时 - 剩余的工作流程存储在 json 中。由于我们目前将工作流程限制为chain目的。我们知道这个 json 是一个签名列表,应该将其转换为chain。因此,我们相应地反序列化它并将其发送给 celery worker。

请注意,剩余的工作流程仍然具有pause_or_continue任务与原来一样 - 所以这个工作流程本身再次是可暂停/可恢复的。当它暂停时,workflow.json将简单地更新。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何暂停或恢复 celery 任务? 的相关文章

随机推荐