解决方案1
Use 任务重试 https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.retry以其args
and kwargs
input.
retry(args=None, kwargs=None, exc=None, throw=True, eta=None, countdown=None, max_retries=None, **options)
重试该任务,将其添加到队列的末尾。
参数
args (Tuple)– 重试的位置参数。
kwargs (Dict)– 用于重试的关键字参数。
传递参数时要注意,因为在args
and kwargs
会导致失败。下面,我选择只使用args=(<values here>)
并清空kwargs={}
。您也可以选择以相反的方式使用kwargs={<values here>}
并清空args=()
.
tasks.py
from celery import Celery
app = Celery('tasks')
@app.task(
bind=True,
default_retry_delay=0.1,
retry_backoff=False,
max_retries=None,
)
def my_test(self, some_arg_1: int, my_list: list, some_arg_2: str):
print(f"my_test {some_arg_1} {my_list} {some_arg_2}")
# Filter the failed items. Here, let's say only the last item is successful.
new_list = my_list[:-1]
if new_list:
self.retry(
args=(
some_arg_1 + 1, # some_arg_1 increments per retry
new_list, # Failed items
some_arg_2 * 2, # some_arg_2's length doubles per retry
),
kwargs={}, # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.
)
日志(生产者)
>>> from tasks import *
>>> my_test.apply_async(args=(0, [1,2,3,4,5], "a"))
<AsyncResult: 121090c6-6b77-4cbd-b1d1-790005e8b18c>
>>>
>>> # The above command is just equivalent to the following (just the same result):
>>> # my_test.apply_async(kwargs={'some_arg_1': 0, 'my_list': [1,2,3,4,5], 'some_arg_2': "a"})
>>> # my_test.apply_async(args=(0,), kwargs={'my_list': [1,2,3,4,5], 'some_arg_2': "a"})
日志(消费者)
[2021-08-25 21:32:06,433: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,434: WARNING/MainProcess] my_test 0 [1, 2, 3, 4, 5] a
[2021-08-25 21:32:06,434: WARNING/MainProcess]
[2021-08-25 21:32:06,438: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,439: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,539: WARNING/MainProcess] my_test 1 [1, 2, 3, 4] aa
[2021-08-25 21:32:06,539: WARNING/MainProcess]
[2021-08-25 21:32:06,541: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,542: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,640: WARNING/MainProcess] my_test 2 [1, 2, 3] aaaa
[2021-08-25 21:32:06,640: WARNING/MainProcess]
[2021-08-25 21:32:06,642: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,643: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,742: WARNING/MainProcess] my_test 3 [1, 2] aaaaaaaa
[2021-08-25 21:32:06,743: WARNING/MainProcess]
[2021-08-25 21:32:06,745: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,747: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,844: WARNING/MainProcess] my_test 4 [1] aaaaaaaaaaaaaaaa
[2021-08-25 21:32:06,844: WARNING/MainProcess]
[2021-08-25 21:32:06,844: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] succeeded in 0.0005442450019472744s: None
- All task arguments are updated per retry:
-
some_arg_1
每次重试从起始值增加 10
到最后一个值4
-
my_list
每次重试都会丢失 1 个项目,从起始值开始[1, 2, 3, 4, 5]
到最后一个值[1]
-
some_arg_2
每次重试的大小从起始值加倍"a"
到最后一个值"aaaaaaaaaaaaaaaa"
解决方案2
只需从任务本身回忆起相同的任务,有点像递归。
tasks.py
from celery import Celery
app = Celery('tasks')
@app.task
def my_test(some_arg_1: int, my_list: list, some_arg_2: str):
print(f"my_test {some_arg_1} {my_list} {some_arg_2}")
# Filter the failed items. Here, let's say only the last item is successful.
new_list = my_list[:-1]
if new_list:
my_test.apply_async(
args=(
some_arg_1 + 1, # some_arg_1 increments per retry
new_list, # Failed items
some_arg_2 * 2, # some_arg_2's length doubles per retry
),
kwargs={}, # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.
)
日志(生产者和消费者)