如何限制创建 celery 任务的速度快于消耗速度的脚本?

2024-04-12

我有一个脚本可以生成数百万个 Celery 任务,数据库中每行一个任务。有没有办法限制它,以免它完全淹没芹菜?

理想情况下,我想让 Celery 保持忙碌,但我不希望 Celery 队列的长度超过几十个任务,因为这只是浪费内存(特别是因为如果没有某种限制,脚本将向队列添加数百万个任务)几乎立即排队)。


在过去的几天里,我花了一些时间来解决这个问题,并提出了我所说的CeleryThrottle目的。基本上,您告诉它队列中需要多少个项目,它会尽力将队列保持在该大小和 2× 该大小之间。

所以这是代码(假设 Redis 代理,但很容易更改):

# coding=utf-8
from collections import deque

import time

import redis
from django.conf import settings
from django.utils.timezone import now


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return r.llen(queue_name)


class CeleryThrottle(object):
    """A class for throttling celery."""

    def __init__(self, min_items=100, queue_name='celery'):
        """Create a throttle to prevent celery run aways.

        :param min_items: The minimum number of items that should be enqueued. 
        A maximum of 2× this number may be created. This minimum value is not 
        guaranteed and so a number slightly higher than your max concurrency 
        should be used. Note that this number includes all tasks unless you use
        a specific queue for your processing.
        """
        self.min = min_items
        self.max = self.min * 2

        # Variables used to track the queue and wait-rate
        self.last_processed_count = 0
        self.count_to_do = self.max
        self.last_measurement = None
        self.first_run = True

        # Use a fixed-length queue to hold last N rates
        self.rates = deque(maxlen=15)
        self.avg_rate = self._calculate_avg()

        # For inspections
        self.queue_name = queue_name

    def _calculate_avg(self):
        return float(sum(self.rates)) / (len(self.rates) or 1)

    def _add_latest_rate(self):
        """Calculate the rate that the queue is processing items."""
        right_now = now()
        elapsed_seconds = (right_now - self.last_measurement).total_seconds()
        self.rates.append(self.last_processed_count / elapsed_seconds)
        self.last_measurement = right_now
        self.last_processed_count = 0
        self.avg_rate = self._calculate_avg()

    def maybe_wait(self):
        """Stall the calling function or let it proceed, depending on the queue.

        The idea here is to check the length of the queue as infrequently as 
        possible while keeping the number of items in the queue as closely 
        between self.min and self.max as possible.

        We do this by immediately enqueueing self.max items. After that, we 
        monitor the queue to determine how quickly it is processing items. Using 
        that rate we wait an appropriate amount of time or immediately press on.
        """
        self.last_processed_count += 1
        if self.count_to_do > 0:
            # Do not wait. Allow process to continue.
            if self.first_run:
                self.first_run = False
                self.last_measurement = now()
            self.count_to_do -= 1
            return

        self._add_latest_rate()
        task_count = get_queue_length(self.queue_name)
        if task_count > self.min:
            # Estimate how long the surplus will take to complete and wait that
            # long + 5% to ensure we're below self.min on next iteration.
            surplus_task_count = task_count - self.min
            wait_time = (surplus_task_count / self.avg_rate) * 1.05
            time.sleep(wait_time)

            # Assume we're below self.min due to waiting; max out the queue.
            if task_count < self.max:
                self.count_to_do = self.max - self.min
            return

        elif task_count <= self.min:
            # Add more items.
            self.count_to_do = self.max - task_count
            return

用法如下:

throttle = CeleryThrottle()
for item in really_big_list_of_items:
    throttle.maybe_wait()
    my_task.delay(item)

非常简单,希望非常灵活。完成此操作后,代码将监视您的队列,并在队列变得太长时将等待添加到循环中。这是在我们的 github 仓库 https://github.com/freelawproject/courtlistener/blob/master/cl/lib/celery_utils.py以防有更新。

执行此操作时,它将跟踪任务的滚动平均速度,并尝试不比需要更频繁地检查队列长度。例如,如果每个任务需要两分钟运行,则在将 100 个项目放入队列后,它可能会等待相当长的时间,然后才必须再次检查队列的长度。该脚本的一个简单版本可以每次循环时检查队列长度,但这会增加不必要的延迟。这个版本试图聪明地处理它,但有时会出错(在这种情况下,队列会低于min_items).

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何限制创建 celery 任务的速度快于消耗速度的脚本? 的相关文章

随机推荐

  • 引导断点...需要一些说明“xs sm md lg”

    所以 在网上查找 我看到一些最近的文章指出xs断点是480px及以下 其他 声明767及以下 我的理解 可能不正确 xs 适用于手机 480 像素及以下 col sm 适用于平板电脑 480 像素至 767 像素 等 然而 当我应用 hid
  • PHP:获取 HTTP 协议版本(HTTP/1.1 与 HTTP/2)

    到目前为止 我的 php 应用程序到处都采用 HTTP 1 1 所以我定义了所有标题 如下所示 header HTTP 1 1 500 Internal Server Error 但现在我的服务器也支持 HTTP 2 我想使用正确的 HTT
  • 类方法和实例方法同名

    我想做这样的事情 class X classmethod def id cls return cls name def id self return self class name 现在打电话id 对于类或其实例 gt gt gt X id
  • 如何在 Tensorflow 中设置损失操作的名称?

    在 Tensorflow 中 我可以为操作和张量指定名称 以便稍后检索它们 例如在一个函数中我可以做 input layer tf placeholder tf float32 shape None 300 name input layer
  • “FOR”在cmd批处理文件中如何工作?

    20 年来我一直在使用数十种语言进行编程 但无论我如何努力 我始终无法理解 Windows cmd shell 批处理文件中的 FOR 是如何工作的 我读 http www ss64 com nt for html http www ss6
  • 在Access表中插入自增主键

    我们在 Access 数据库中有一个巨大的表 有超过 500k 条记录 并且没有 PK 是否可以将自动递增主键列插入到已经存在的 Access 表中 是的 它是并且可以通过编辑表并添加自动增量类型字段来非常简单地完成 唯一的规则是每个表只能
  • D3 中的堆叠变换

    我有一个已应用变换的 SVG 元素 这可以是单个变换 也可以是多个变换的组合 我正在尝试对其应用额外的变换 问题是这个变换可以重复应用 并且需要与现有的变换状态堆叠 因此在末尾附加额外的变换是不切实际的 浏览 d3 API 我没有注意到任何
  • 查找每第三个值并在 VIM 中插入 cr 或换行符

    因此 我有几个大型数据集 需要使其更具可读性 目前我必须进入并移动到每个第三个值并插入换行符 我已经在 VIM 中尝试了几种方法来使其正常工作 但似乎没有一个能返回我正在寻找的值 这是我的一些数据 0 96260310749184663 4
  • 如何改进多次 StringReplace 调用?

    我从客户那里读取文件 我需要处理读取的数据并删除一些不需要的字符 我的函数有效 但我正在尝试改进 FixData 函数以提高速度 性能和可维护性 是否可以将多个 StringReplace 调用替换为仅循环一次数据并替换为所需的任何内容的调
  • 有没有办法将多个 TraCI 模块连接到 OMNet++/veins 模拟?

    要启动任何静脉模拟 需要使用 Veins launchd 守护进程 它基本上采用 sumo 配置文件 找到未使用的端口 启动 sumo 并桥接 sumo 和 OMNet 之间的连接 现在 由于这是基于套接字的通信 我想连接一个单独的 Tra
  • 在 Pycharm 中以 DEBUG 模式跳过 Django 服务器上的系统检查

    我正在 Pycharm 中以调试模式运行 django 应用程序 每次我更改代码时都会执行一些系统检查 pydev debugger process 2354 is connecting Performing system checks 有
  • 如何在D3中的图表标签上触发onmouseover事件

    我用 D3 制作了以下图表 当将鼠标悬停在图表上时 中间的空间会显示信息 我有以下代码来处理该事件 var path svg selectAll path data pie get this data enter append path a
  • 如何在Java运行时动态添加类字段[重复]

    这个问题在这里已经有答案了 可能的重复 如何动态向类添加属性 https stackoverflow com questions 14442998 how to dynamically add properties to class 我想知
  • Vb。 NET 将日期转换为数字

    我在 VB 中有这段代码 网 dim date e As DateTime date e New DateTime CLng Convert ToDouble 635434240520170000 结果是 12 08 2014 07 07
  • Laravel 密室获得 401 授权

    我正在使用 Laravel Sanctum 和 Vuejs SPA 两者都驻留在同一顶级域中 Laravel backend app demo localhost Vue SPA app spa demo localhost 当使用 axi
  • 旋转相机与旋转场景点(仅该点,而不是整个场景)之间有什么不同?

    我认为旋转相机并拍摄场景照片与保持相机稳定并反向旋转场景会产生相同的结果 假设原始相机旋转矩阵为R1 旋转相机意味着我们应用另一个旋转矩阵 R12 因此 R2 R12 R1 是新的旋转矩阵 假设X是场景点的真实世界坐标 以反向方式旋转场景点
  • 如何从arraylist动态创建pscustomobject?

    这篇文章是对该主题的补充here https stackoverflow com a 56876792 8397835 基本上 我有2个脚本 script1 具有以下内容 exportObject New Object System Col
  • 如何用Python去除图像中的小物体

    我的 python 代码有问题 我想对胸部X光片进行图像处理以获得肺部模式 但我的代码结果仍然有一点污点 如何摆脱这些小物体 这是我的代码 import cv2 import numpy as np from skimage import
  • 如何从 C# 调用 Spotify API

    我正在尝试使用 C 调用 Spotify API 不幸的是 我已经被困在获取access token 这就是我试图得到它的方法 private static async Task
  • 如何限制创建 celery 任务的速度快于消耗速度的脚本?

    我有一个脚本可以生成数百万个 Celery 任务 数据库中每行一个任务 有没有办法限制它 以免它完全淹没芹菜 理想情况下 我想让 Celery 保持忙碌 但我不希望 Celery 队列的长度超过几十个任务 因为这只是浪费内存 特别是因为如果