我有一个脚本可以生成数百万个 Celery 任务,数据库中每行一个任务。有没有办法限制它,以免它完全淹没芹菜?
理想情况下,我想让 Celery 保持忙碌,但我不希望 Celery 队列的长度超过几十个任务,因为这只是浪费内存(特别是因为如果没有某种限制,脚本将向队列添加数百万个任务)几乎立即排队)。
在过去的几天里,我花了一些时间来解决这个问题,并提出了我所说的CeleryThrottle
目的。基本上,您告诉它队列中需要多少个项目,它会尽力将队列保持在该大小和 2× 该大小之间。
所以这是代码(假设 Redis 代理,但很容易更改):
# coding=utf-8
from collections import deque
import time
import redis
from django.conf import settings
from django.utils.timezone import now
def get_queue_length(queue_name='celery'):
"""Get the number of tasks in a celery queue.
:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
return r.llen(queue_name)
class CeleryThrottle(object):
"""A class for throttling celery."""
def __init__(self, min_items=100, queue_name='celery'):
"""Create a throttle to prevent celery run aways.
:param min_items: The minimum number of items that should be enqueued.
A maximum of 2× this number may be created. This minimum value is not
guaranteed and so a number slightly higher than your max concurrency
should be used. Note that this number includes all tasks unless you use
a specific queue for your processing.
"""
self.min = min_items
self.max = self.min * 2
# Variables used to track the queue and wait-rate
self.last_processed_count = 0
self.count_to_do = self.max
self.last_measurement = None
self.first_run = True
# Use a fixed-length queue to hold last N rates
self.rates = deque(maxlen=15)
self.avg_rate = self._calculate_avg()
# For inspections
self.queue_name = queue_name
def _calculate_avg(self):
return float(sum(self.rates)) / (len(self.rates) or 1)
def _add_latest_rate(self):
"""Calculate the rate that the queue is processing items."""
right_now = now()
elapsed_seconds = (right_now - self.last_measurement).total_seconds()
self.rates.append(self.last_processed_count / elapsed_seconds)
self.last_measurement = right_now
self.last_processed_count = 0
self.avg_rate = self._calculate_avg()
def maybe_wait(self):
"""Stall the calling function or let it proceed, depending on the queue.
The idea here is to check the length of the queue as infrequently as
possible while keeping the number of items in the queue as closely
between self.min and self.max as possible.
We do this by immediately enqueueing self.max items. After that, we
monitor the queue to determine how quickly it is processing items. Using
that rate we wait an appropriate amount of time or immediately press on.
"""
self.last_processed_count += 1
if self.count_to_do > 0:
# Do not wait. Allow process to continue.
if self.first_run:
self.first_run = False
self.last_measurement = now()
self.count_to_do -= 1
return
self._add_latest_rate()
task_count = get_queue_length(self.queue_name)
if task_count > self.min:
# Estimate how long the surplus will take to complete and wait that
# long + 5% to ensure we're below self.min on next iteration.
surplus_task_count = task_count - self.min
wait_time = (surplus_task_count / self.avg_rate) * 1.05
time.sleep(wait_time)
# Assume we're below self.min due to waiting; max out the queue.
if task_count < self.max:
self.count_to_do = self.max - self.min
return
elif task_count <= self.min:
# Add more items.
self.count_to_do = self.max - task_count
return
用法如下:
throttle = CeleryThrottle()
for item in really_big_list_of_items:
throttle.maybe_wait()
my_task.delay(item)
非常简单,希望非常灵活。完成此操作后,代码将监视您的队列,并在队列变得太长时将等待添加到循环中。这是在我们的 github 仓库 https://github.com/freelawproject/courtlistener/blob/master/cl/lib/celery_utils.py以防有更新。
执行此操作时,它将跟踪任务的滚动平均速度,并尝试不比需要更频繁地检查队列长度。例如,如果每个任务需要两分钟运行,则在将 100 个项目放入队列后,它可能会等待相当长的时间,然后才必须再次检查队列的长度。该脚本的一个简单版本可以每次循环时检查队列长度,但这会增加不必要的延迟。这个版本试图聪明地处理它,但有时会出错(在这种情况下,队列会低于min_items
).
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)