芹菜“重试”并更新参数

2024-04-07

考虑一个任务将列表作为参数并处理列表中的每个元素,这可能会成功也可能会失败。在这种情况下,如何仅对失败的元素进行“重试”?

Example:

@app.task(bind=True)
def my_test(self, my_list:list):
    new_list = []
    for ele in my_list:
        try:
            do_something_may_fail(ele)
        except:
            new_list.append(ele)
    # how to retry with the new list?
    # like 
    # self.retry(my_list=new_list, countdown=5)
    # or
    # self.apply_async(new_list, countdown=5)


解决方案1

Use 任务重试 https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.retry以其args and kwargs input.

retry(args=None, kwargs=None, exc=None, throw=True, eta=None, countdown=None, max_retries=None, **options)

重试该任务,将其添加到队列的末尾。

参数

args (Tuple)– 重试的位置参数。

kwargs (Dict)– 用于重试的关键字参数。

传递参数时要注意,因为在args and kwargs会导致失败。下面,我选择只使用args=(<values here>)并清空kwargs={}。您也可以选择以相反的方式使用kwargs={<values here>}并清空args=().

tasks.py

from celery import Celery

app = Celery('tasks')


@app.task(
    bind=True,
    default_retry_delay=0.1,
    retry_backoff=False,
    max_retries=None,
)
def my_test(self, some_arg_1: int, my_list: list, some_arg_2: str):
    print(f"my_test {some_arg_1} {my_list} {some_arg_2}")

    # Filter the failed items. Here, let's say only the last item is successful.
    new_list = my_list[:-1]

    if new_list:
        self.retry(
            args=(
                some_arg_1 + 1,  # some_arg_1 increments per retry
                new_list,  # Failed items
                some_arg_2 * 2,  # some_arg_2's length doubles per retry
            ),
            kwargs={},  # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.
        )

日志(生产者)

>>> from tasks import *
>>> my_test.apply_async(args=(0, [1,2,3,4,5], "a"))
<AsyncResult: 121090c6-6b77-4cbd-b1d1-790005e8b18c>
>>>
>>> # The above command is just equivalent to the following (just the same result):
>>> # my_test.apply_async(kwargs={'some_arg_1': 0, 'my_list': [1,2,3,4,5], 'some_arg_2': "a"})
>>> # my_test.apply_async(args=(0,), kwargs={'my_list': [1,2,3,4,5], 'some_arg_2': "a"})

日志(消费者)

[2021-08-25 21:32:06,433: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,434: WARNING/MainProcess] my_test 0 [1, 2, 3, 4, 5] a
[2021-08-25 21:32:06,434: WARNING/MainProcess] 

[2021-08-25 21:32:06,438: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,439: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,539: WARNING/MainProcess] my_test 1 [1, 2, 3, 4] aa
[2021-08-25 21:32:06,539: WARNING/MainProcess] 

[2021-08-25 21:32:06,541: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,542: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,640: WARNING/MainProcess] my_test 2 [1, 2, 3] aaaa
[2021-08-25 21:32:06,640: WARNING/MainProcess] 

[2021-08-25 21:32:06,642: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,643: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,742: WARNING/MainProcess] my_test 3 [1, 2] aaaaaaaa
[2021-08-25 21:32:06,743: WARNING/MainProcess] 

[2021-08-25 21:32:06,745: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,747: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,844: WARNING/MainProcess] my_test 4 [1] aaaaaaaaaaaaaaaa
[2021-08-25 21:32:06,844: WARNING/MainProcess] 

[2021-08-25 21:32:06,844: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] succeeded in 0.0005442450019472744s: None
  • All task arguments are updated per retry:
    • some_arg_1每次重试从起始值增加 10到最后一个值4
    • my_list每次重试都会丢失 1 个项目,从起始值开始[1, 2, 3, 4, 5]到最后一个值[1]
    • some_arg_2每次重试的大小从起始值加倍"a"到最后一个值"aaaaaaaaaaaaaaaa"

解决方案2

只需从任务本身回忆起相同的任务,有点像递归。

tasks.py

from celery import Celery

app = Celery('tasks')


@app.task
def my_test(some_arg_1: int, my_list: list, some_arg_2: str):
    print(f"my_test {some_arg_1} {my_list} {some_arg_2}")

    # Filter the failed items. Here, let's say only the last item is successful.
    new_list = my_list[:-1]

    if new_list:
        my_test.apply_async(
            args=(
                some_arg_1 + 1,  # some_arg_1 increments per retry
                new_list,  # Failed items
                some_arg_2 * 2,  # some_arg_2's length doubles per retry
            ),
            kwargs={},  # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.
        )

日志(生产者和消费者)

  • 与解决方案1相同
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

芹菜“重试”并更新参数 的相关文章

  • 上传时的 Google Drive API——这些额外的空行从何而来?

    总结一下该程序 我从我的 Google 云端硬盘下载一个文件 然后在本地计算机中打开并读取一个文件 file a txt 然后在我的计算机中打开另一个文件 file b txt 处于附加模式 并且在使用这个新的 file b 更新我的 Go
  • 嵌套字典中的 Django 模板

    我正在使用 Django 模板 并且遇到了嵌套字典的一个问题 Dict result dict type 0 file name abc count 0 type 1 file name xyz count 50 我的 HTML 文件中的模
  • 如何在 Jupyter Notebook 中运行 Python 异步代码?

    我有一些 asyncio 代码在 Python 解释器 CPython 3 6 2 中运行良好 我现在想在具有 IPython 内核的 Jupyter 笔记本中运行它 我可以运行它 import asyncio asyncio get ev
  • 使用 Python 创建 MIDI

    本质上 我正在尝试从头开始创建 MIDI 并将它们放到网上 我对不同的语言持开放态度 但更喜欢使用Python 两种语言之一 如果这有什么区别的话 并且想知道我应该使用哪个库 提前致谢 看起来这就是您正在寻找的 适用于 Python 的简单
  • TensorFlow:带有轴选项的 bincount

    在 TensorFlow 中 我可以使用 tf bincount 获取数组中每个元素的计数 x tf placeholder tf int32 None freq tf bincount x tf Session run freq feed
  • 在 python 3 中使用子进程

    我使用 subprocess 模块在 python 3 中运行 shell 命令 这是我的代码 import subprocess filename somename py in practical i m using a real fil
  • cv2.drawContours() - 取消填充字符内的圆圈(Python,OpenCV)

    根据 Silencer的建议 我使用了他发布的代码here https stackoverflow com questions 48244328 copy shape to blank canvas opencv python 482465
  • PyTorch 给出 cuda 运行时错误

    我对我的代码做了一些小小的修改 以便它不使用 DataParallel and DistributedDataParallel 代码如下 import argparse import os import shutil import time
  • Python中列表中两个连续元素的平均值

    我有一个偶数个浮点数的列表 2 34 3 45 4 56 1 23 2 34 7 89 我的任务是计算 1 和 2 个元素 3 和 4 5 和 6 等元素的平均值 在 Python 中执行此操作的快捷方法是什么 data 2 34 3 45
  • 更改 x 轴比例

    我使用 Matlab 创建了这个图 使用 matplotlib x 轴绘制大数字 例如 100000 200000 300000 我想要 1 2 3 和 10 5 之类的值来指示它实际上是 100000 200000 300000 有没有一
  • CNTK 抱怨 LSTM 中的动态轴

    我正在尝试在 CNTK 中实现 LSTM 使用 Python 来对序列进行分类 Input 特征是固定长度的数字序列 时间序列 标签是 one hot 值的向量 Network input input variable input dim
  • 在相同任务上,Keras 比 TensorFlow 慢

    我正在使用 Python 运行斩首 DCNN 本例中为 Inception V3 来获取图像特征 我使用的是 Anaconda Py3 6 和 Windows7 使用 TensorFlow 时 我将会话保存在变量中 感谢 jdehesa 并
  • Alembic:如何迁移模型中的自定义类型?

    My User模型是 class User UserMixin db Model tablename users noinspection PyShadowingBuiltins uuid Column uuid GUID default
  • Werkzeug 中的线程和本地代理。用法

    首先 我想确保我正确理解了功能的分配 分配本地代理功能以通过线程内的模块 包 共享变量 对象 我对吗 其次 用法对我来说仍然不清楚 也许是因为我误解了作业 我用烧瓶 如果我有两个 或更多 模块 A B 我想将对象C从模块A导入到模块B 但我
  • `pyqt5'错误`元数据生成失败`

    我正在尝试安装pyqt5使用带有 M1 芯片和 Python 3 9 12 的 mac 操作系统 我怀疑M1芯片可能是原因 我收到一个错误metadata generation failed 最小工作示例 directly in the t
  • 使用 NLP 进行地址分割

    我目前正在开发一个项目 该项目应识别地址的每个部分 例如来自 str Jack London 121 Corvallis ARAD ap 1603 973130 输出应如下所示 street name Jack London no 121
  • 字符串列表,获取n个元素的公共子串,Python

    我的问题可能类似于this https stackoverflow com questions 37514193 count the number of occurrences of n length not given string in
  • bs4 `next_sibling` VS `find_next_sibling`

    我在使用时遇到困难next sibling 并且类似地与next element 如果用作属性 我不会得到任何返回 但如果用作find next sibling or find next 然后就可以了 来自doc https www cru
  • 使用 Python 将对象列表转为 JSON

    我在转换时遇到问题Object实例到 JSON ob Object list name scaping myObj base url u number page for ob in list name json string json du
  • tkinter:打开一个带有按钮提示的新窗口[关闭]

    Closed 这个问题需要调试细节 help minimal reproducible example 目前不接受答案 用户如何按下 tkinter GUI 中的按钮来打开新窗口 我只需要非常简单的解决方案 如果代码也能被解释那就太好了 这

随机推荐

  • ffmpeg:“未找到引用的 QT 章节轨道”

    Using ffmpeg将 QuickTime 中的音频替换为 WAV 中的音频 谁知道我为什么会这样Referenced QT chapter track not found Command ffmpeg i video t 25 i a
  • JavaScript 中的文本搜索?

    我有一个页面 其中包含 200 多个采用这种格式的链接 h1 a href somelink Somelink a some text that explain the meaning of the link h1 现在 为了更容易通过此链
  • 生成人类可区分的随机颜色

    我正在尝试在 JavaScript 中随机生成十六进制颜色 然而 生成的颜色几乎无法区分 有办法改善吗 这是我正在使用的代码 function randomColor var allowed ABCDEF0123456789 S while
  • Heroku-未找到 (Python)

    作为一个初学者 我无法再进一步 我正在尝试在heroku上加载python程序 但不知何故总是只出现以下内容 未找到 在服务器上找不到请求的 URL 如果您手动输入 URL 请检查拼写并重试 My logs 2018 02 12T11 33
  • file_get_contents() 与 CURL 等效吗?

    我正在尝试从这样的 url 获取一些 JSON 数据 url http site com search php term search term here result json decode file get contents url 然
  • postgresql 中第一个和最后一个值聚合函数可以正确处理 NULL 值

    我知道有聚合函数可以获取行的最后一个和第一个值PostgreSQL https wiki postgresql org wiki First last 28aggregate 29 我的问题是 它们不能按我的需要工作 我可以使用 postg
  • 根据另一个单元格范围的值创建对一个单元格范围的注释

    我想为一系列单元格创建注释 注释应包含另一个单元格范围的值 这是我到目前为止所拥有的 Private Sub Worksheet Change ByVal Target As Range Dim sResult As String If U
  • CSS3:背景颜色过渡为透明

    我想做一个过渡div that 以 开始background color rgba 242 245 169 1 三秒后 以 结束background color rgba 242 245 169 0 还有 在两场表演之间background
  • PySpark 在嵌套数组中反转 StringIndexer

    我正在使用 PySpark 使用 ALS 进行协作过滤 我原来的用户和项目 ID 是字符串 所以我使用StringIndexer将它们转换为数字索引 PySpark 的 ALS 模型要求我们这样做 安装模型后 我可以获得每个用户的前 3 个
  • 无法在 Python OpenCV v4.20 中使用 SIFT

    我正在使用 OpenCV v4 20 和 PyCharm IDE 我想使用 SIFT 算法 但我收到这个错误 我在互联网上寻找此错误的解决方案 但没有一个对我有帮助 你知道这个错误的解决办法吗 使用 pip 我可以安装至少 3 4 2 16
  • 一个 dataGridView 中的两个实体

    我有两个关联实体 我通过从对象数据源拖放并手动绑定到列表来创建 dataGridView 一个实体的一切工作都很好 是否有可能通过拖放 手动填充来创建一个具有两个实体 Zamow和ZamSkany 的dataGridView 我可以通过视图
  • 无 IDLE 子进程连接

    我是 python 编程新手 想尝试在 IDLE 而不是 OSX 命令行中编辑脚本 但是 当我尝试启动它时 它给出错误 空闲子进程没有建立连接 空闲子进程无法启动子进程或个人防火墙软件阻止连接 我没有配置防火墙 那么问题可能是什么 您可以尝
  • 在列表视图滚动期间,首先显示错误的图像,然后显示正确的图像

    I am using listview with images in my application But whenever I scroll the list the wrong images are shown first and th
  • 使用 MVVM 模式挂钩 Storyboard.Complete 事件

    基本上 我得到的是一个简单的警报消息设置 有一个单独警报的视图 其中一些数据绑定到虚拟机的文本和颜色 绿色表示成功 红色表示错误等
  • 在 MS PowerPoint 中查找并突出显示文本

    我使用此站点中的一些代码创建了一个宏 用于在 Word 文档上进行关键字搜索并突出显示结果 我想在 PowerPoint 中复制这种效果 这是我的 Word 代码 Sub HighlightKeywords Dim range As ran
  • Java:将出生数据转换为天数

    我真的需要一些关于具体任务的帮助 用户输入出生数据 YYYY MM DD 程序会告诉您您的年龄 以天为单位 控制台中的输出将是 例如 您出生于 1981 年 11 月 6 日 您已经 7068 天了 我已经重写了我的代码大约 20 次但没有
  • 使用 HTML 5 的进度条(下载)

    我正在开发一个需要从数据库下载内容的项目 通常使用手机 高端智能手机 访问该网站 我想知道是否可以在使用 HTML5 下载内容的过程中添加进度条 以便在执行此操作时会在整个屏幕上出现一个灯箱 并且只显示进度条 可以使用 XMLHttpReq
  • NSArray 越界检查

    菜鸟问题 检查 NSArray 或 NSMutableArray 的索引是否存在的最佳方法是什么 我到处找都没有结果 这是我尝试过的 if sections arr objectAtIndex 4 or sections arr objec
  • 为什么 Firefox 会截断我的 中的文本?

    我有一个简单的
  • 芹菜“重试”并更新参数

    考虑一个任务将列表作为参数并处理列表中的每个元素 这可能会成功也可能会失败 在这种情况下 如何仅对失败的元素进行 重试 Example app task bind True def my test self my list list new