celery的用法

2023-10-29

Celery是一个基于消息传递的分布式任务队列,它通过将任务发送到消息队列中并由工作进程进行消费来实现任务的异步处理。以下是Celery的实现和用法的详细描述:

1.创建Celery应用:
在Python代码中,首先需要创建一个Celery应用。这可以通过实例化Celery类来完成,指定应用的名称以及消息队列的连接信息。

from celery import Celery

app = Celery('myapp', broker='amqp://guest:guest@localhost//', backend='rpc://')
  1. 定义任务:
    在应用中,需要定义具体的任务函数。使用@app.task装饰器将函数标记为Celery任务。
@app.task
def add(x, y):
    return x + y

3.启动Celery Worker:
启动Celery Worker进程来处理任务的执行。Worker会监听消息队列中的任务,并在有任务到达时进行消费执行。

在终端中执行以下命令:

celery -A yourmodule worker --loglevel=info

任务提交和执行:

4.提交任务:
在代码中,可以调用任务函数并将任务提交给Celery进行异步处理。可以使用.delay()方法来提交任务,或使用.apply_async()方法来设置任务参数和其他选项。

result = add.delay(4, 6)

5.获取任务结果:
可以使用.get()方法获取任务的执行结果。

result = result.get()

监控和管理任务:

监控任务状态和结果:
Celery提供了一系列的状态和结果相关的方法,用于查询任务的状态、获取任务的结果、检查任务是否完成等。

result = add.delay(4, 6)
if result.ready():
    print("任务已完成")
else:
    print("任务还在执行中")

使用Flower进行监控:
Celery还提供了Flower工具,用于实时监控任务队列的状态和统计信息。可以使用Flower来查看任务队列的监控界面。

在终端中执行以下命令启动Flower:

celery flower -A yourmodule

然后在浏览器中访问http://localhost:5555即可查看任务队列的监控界面。

定时任务:
要使用 Celery 实现定时任务,您可以使用 Celery Beat 调度器来安排任务在特定时间间隔内执行。以下是使用 Celery 和 Celery Beat 实现定时任务的步骤:

1). 配置 Celery Beat

首先,您需要确保 Celery Beat 已正确配置。创建一个名为 celery.py 的文件,并在其中定义 Celery 和 Celery Beat 的配置。

# celery.py

from celery import Celery
from celery.schedules import crontab

app = Celery('your_project')

app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现并注册任务
app.autodiscover_tasks()

# 配置定时任务
app.conf.beat_schedule = {
    'my-task': {
        'task': 'your_app.tasks.my_task',  # 任务路径
        'schedule': crontab(minute=0, hour=0),  # 每天零点执行
    },
}

在上面的示例中,我们配置了一个名为 my-task 的定时任务,它将在每天的午夜执行。

2). 创建任务

在您的应用程序中创建任务,这些任务将在定时任务中执行。例如:

# your_app/tasks.py

from celery import shared_task

@shared_task
def my_task():
    # 在此处编写任务的具体逻辑
    return "Task completed."

上述代码中的 my_task 函数定义了一个任务,您可以在其中编写任务的具体逻辑。

3). 运行 Celery Beat

运行 Celery Beat 以启动任务调度器:

celery -A your_project_name beat

现在,您已经设置了一个每天定时执行的任务。您可以根据需要配置更多的定时任务,只需在 app.conf.beat_schedule 中添加更多的任务定义。

并发和扩展性:
Celery支持多个工作进程并发地处理任务,可以通过启动多个Worker进程来实现任务的并行处理和提高系统的吞吐量。

celery -A yourmodule worker --concurrency=4

在上面的示例中,启动了4个并发的Worker进程。

结果存储后端
Celery允许将任务的执行结果存储在不同的后端中,如数据库、消息队列等。可以根据需要选择适合的结果存储后端。

示例:

app = Celery('myapp', broker='amqp://guest:guest@localhost//', backend='db+sqlite:///results.db')

在上面的示例中,使用SQLite数据库作为任务结果的存储后端。

错误处理和重试:
在任务执行过程中可能会发生错误,Celery提供了错误处理和任务重试的机制,以增加任务的稳定性和可靠性。

from celery import Task

class MyTask(Task):
    autoretry_for = (Exception,)
    retry_kwargs = {'max_retries': 3}

    def run(self, *args, **kwargs):
        # 执行任务的代码
        pass

在上面的示例中,任务继承自Task类,并设置了自动重试的配置。

celery的常用命令:
celery -A your_project_name worker --loglevel=info:
启动Celery Worker进程,以便处理异步任务。-A选项后跟着你的项目名称,–loglevel指定日志级别。

celery -A your_project_name beat --loglevel=info:
启动Celery Beat进程,用于调度周期性任务。-A选项后跟着你的项目名称,–loglevel指定日志级别。

celery -A your_project_name flower:
启动Flower,一个用于监控和管理Celery集群的Web界面。-A选项后跟着你的项目名称。

celery -A your_project_name purge:
清除所有已完成的任务结果。

celery -A your_project_name inspect active:
查看当前活动任务的信息。

celery -A your_project_name inspect scheduled:
查看所有计划任务的信息。

celery -A your_project_name inspect reserved:
查看所有已预订的任务。

celery -A your_project_name inspect revoked:
查看已撤销的任务。

celery -A your_project_name status:
查看Celery Worker的状态。

celery -A your_project_name control shutdown:
关闭Celery Worker。

celery -A your_project_name control pool_grow n:
增加工作池中的工作进程数。

celery -A your_project_name control pool_shrink n:
减少工作池中的工作进程数。

celery -A your_project_name control rate_limit task_name rate:
限制任务的执行速率。

celery -A your_project_name control cancel_consumer worker_name:
取消指定Worker的所有任务。

celery -A your_project_name control enable_events:
启用Celery事件。

celery -A your_project_name control disable_events:
禁用Celery事件。

异步任务
1). 创建一个 Celery 任务

首先,创建一个 Celery 任务。在 Django 项目中,您可以在任何地方创建这个任务,通常将它放在一个独立的文件中,如 tasks.py:

# tasks.py

from celery import shared_task

@shared_task
def my_task(param):
    # 这是一个示例任务,您可以在这里执行您的异步操作
    result = param * 2
    return result

在这个示例中,我们创建了一个名为 my_task 的 Celery 任务,它接受一个参数 param,并返回 param 的两倍。

2). 在视图中调用 Celery 任务

在 Django 视图中,您可以像调用普通函数一样调用 Celery 任务,并将其推送到 Celery worker 进程执行。下面是一个示例视图:

# views.py

from django.shortcuts import render
from .tasks import my_task  # 导入任务

def my_view(request):
    param = 42  # 参数值,您可以根据需要设置
    result = my_task.delay(param)  # 异步执行任务
    return render(request, 'my_template.html', {'result': result.id})

在这个示例中,我们导入了之前创建的 my_task 任务,并使用 delay 方法异步执行它。任务的执行不会阻塞当前视图,这样可以加快响应速度。

3). 处理任务结果

要处理任务的结果,您可以使用任务返回的 AsyncResult 对象。通常,您可以将该对象的 id 存储在数据库中,然后在需要时获取结果:

# views.py

from celery.result import AsyncResult
from .tasks import my_task  # 导入任务

def get_task_result(request, task_id):
    result = AsyncResult(task_id)
    if result.ready():
        # 任务已完成,您可以获取结果
        result_value = result.get()
    else:
        # 任务尚未完成,您可以采取其他操作或等待
        result_value = None
    return render(request, 'result_template.html', {'result_value': result_value})

在这个示例中,我们导入了 AsyncResult 类,并使用 task_id 获取任务结果。如果任务已完成,我们可以使用 result.get() 获取结果值。

周期性任务
要使用 Celery 实现周期性任务,您需要配置 Celery 定时任务调度器(Celery Beat)。Celery Beat 是 Celery 的一个组件,它允许您安排定时任务,并在指定的时间间隔内运行任务。以下是如何实现周期性任务的步骤:

1). 配置 Celery Beat

首先,确保您的 Celery 项目中已经配置了 Celery Beat。通常,您需要创建一个名为 celery.py 的文件,其中包含 Celery 和 Celery Beat 的配置。确保在配置中设置 CELERY_BROKER_URL 和 CELERY_RESULT_BACKEND,以及指定调度器:

# celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab

# 设置 Django 的环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project_name.settings')

app = Celery('your_project_name')

# 配置 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动从所有注册的应用中发现异步任务
app.autodiscover_tasks()

# 定义定时任务
app.conf.beat_schedule = {
    'my-periodic-task': {
        'task': 'your_app.tasks.my_periodic_task',  # 任务路径
        'schedule': crontab(minute=0, hour=0),  # 设置定时规则,每天零点执行
    },
}

在上面的示例中,我们配置了一个名为 my-periodic-task 的周期性任务,它将每天在午夜执行(每天零点)。

2). 创建周期性任务

在 your_app/tasks.py 文件中创建周期性任务的实际代码:

# your_app/tasks.py

from celery import shared_task

@shared_task
def my_periodic_task():
    # 这是一个周期性任务的示例
    # 在这里执行您的周期性任务代码
    result = "Periodic task executed."
    return result

在这个示例中,my_periodic_task 函数是一个周期性任务的示例,您可以在其中执行任何周期性操作。

3). 运行 Celery Beat

最后,运行 Celery Beat 以启动定时任务调度器:

celery -A your_project_name beat

现在,my-periodic-task 将在每天零点执行。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

celery的用法 的相关文章

随机推荐

  • chainWebpack之optimization.splitChunks的cacheGroups缓存组代码分块实践案

    研究了好几天webpack打包 我们项目是vue的高版本 已经没有了webpack config js文件了 是直接在vue config js里的chainWebpack方法直接配置 这样做法的好处是用户既可以保留webpack的默认配置
  • sql 列求和_比较几种条件求和的方法——推荐PowerBI

    比较几种条件求和的方法 1 Excel鼠标框选 配合使用筛选功能 界面右下角显示求和结果 优点 可以快速 直观地满足一次简单的业务需求 缺点 只能快速地满足一次比较简单的查询需求 且求和结果无法被记录 2 Excel公式 sum sumif
  • 不支持请求方法POST或GET的一种解决方法

    Request method POST not supported 已解决 该错误一般是请求类型对不上导致的 比如PostMapping和GetMapping请求 一般错误发生在下图所示位置 我把Post和Get搞错了 RequiresAu
  • 线程的几种状态

    目录 前言 一 线程是什么 二 线程状态 1 新建状态 New 2 就绪状态 Runnable 3 运行状态 Running 4 阻塞状态 Blocked 5 等待状态 超时等待 Waiting Timed Waiting sleep 和
  • HTTP1.0、HTTP1.1 和 HTTP2.0 的区别

    原文 https mp weixin qq com s GICbiyJpINrHZ41u 4zT A 一 HTTP的历史 早在 HTTP 建立之初 主要就是为了将超文本标记语言 HTML 文档从Web服务器传送到客户端的浏览器 也是说对于前
  • 域名简单认识

    什么是域名 域名 Domain Name 是由一串用 点 分隔的字符组成的Internet上某一台计算机或计算机组的名称 用于在数据传输时标识计算机的电子方位 有时也指地理位置 地理上的域名 指代有行政自主权的一个地方区域 域名是一个IP地
  • SQL学习(五)查询结果过滤和排序

    如果初学 看看基础语法直接结合例子来看更容易理解 基础语法 DISTINCT 选取出唯一的结果的语法 SELECT DISTINCT column another column FROM mytable WHERE condition s
  • Retrofit上传/下载文件 (一)

    Retrofit是Square公司开源的简化 HTTP 请求的库 这篇文章主要介绍用Retrofit实现文件的上传与下载的功能 本文使用的是Retrofit 2 0 2版本 1 文件上传 api service接口 public inter
  • vue项目登录页面实现记住用户名和密码

    vue项目登录页面实现记住用户名和密码 记录一下实现的逻辑 应该分两步来理解这个逻辑 首次登录 页面没有用户的登录信息 实现逻辑如下 用户输入用户名和密码登录 用户信息为名为form的响应式对象 v model分别对应两个输入框 用户点击登
  • 八大排序算法-希尔排序

    希尔的定义 希尔排序是插入算法的一种 也叫缩小增量排序 是直接插入排序算法的一种改良版 希尔算法是把数据序列按下标的一定增量分组 对每组使用直接插入排序算法进行排序 然后依次缩减增量再进行排序 待整个序列中的元素基本 注 没有全部完成排序
  • WM_MOUSELEAVE和WM_MOUSEHOVER使用

    默认情况下 窗口是不响应WM MOUSEHOVER 和 WM MOUSELEAVE 这个消息上次测试是默认下也响应的 消息的 所以需要调用 TrackMouseEvent 函数来激活这两个消息 WM MOUSEHOVER 是调用 Track
  • STM32 Keil静态全局变量值被修改

    基于freertos进行开发 1 在a c文件中定义了char类型静态变量s chA 8 在b c文件中定义了int类型静态变量s intB 2 定义函数int GpioGet unsigned int id unsigned int va
  • 记一次修复mysq启动/usr/local/mysql/bin/mysqld: Can't create/write to file '/home/data/logs/mysql/mysqld.pid

    重启系统后 mysql启动不了 root MyCloudServer systemctl start mysqld Job for mysqld service failed because the control process exit
  • AD20设计规则检查设置(DRC检查设置)

    AD设计规则检查设置 通过以下设置能保证绝大部分的用户的使用不会在这个问题上出错 工具 设计规则检查 快捷键 T D 进入后界面如下所示 然后一次按图片所示进行配置 最后点击运行DRC检查
  • 类中的静态类成员变量

    MyObj h pragma once include
  • 与MySQL的零距离接触(一.约束修改数据表)

    八月中秋薄露 路上行人凄凉 小桥流水桂花香 日夜千思万想 心中不得宁静 清早览罢文章 十年寒苦在书房 方显才高志广 初涉MySQL MySQL的安装与配置 Mac系统安装MySQL及Navicat Premium http blog csd
  • mysql中字段到底该怎么指定长度?

    相信很多小伙伴在创建表的时候都给字段的类型指定过长度 但是大家可能都是随便指定的长度 或者照着别人的长度来给 比如一个int类型的字段 我们到底该如何指定长度呢 其实这里有个误区 mysql中的基本类型都是定长的 比如int 就是4个字节
  • 双一流博士整理的计算机视觉学习路线(深度学习+传统图像处理)

    因工作需要 年初花了4个月左右时间学习了机器学习 神经网络相关的知识 工作日每天大概学习4 6个小时 周末每天大概10个小时 工作中的需求应对也得心应手了 想快速入门的话 从自己的经验看 可以先不看高等数学和线性代数 因为机器学习和深度学习
  • ROS常用命令(个人备忘录)

    ROS常用命令 个人备忘录 可视化话题节点与话题 rosrun rqt graph rqt graph 新建功能包 catkin create pkg XXXXXX std msgs rospy roscpp 查看tf树 rosrun rq
  • celery的用法

    Celery是一个基于消息传递的分布式任务队列 它通过将任务发送到消息队列中并由工作进程进行消费来实现任务的异步处理 以下是Celery的实现和用法的详细描述 1 创建Celery应用 在Python代码中 首先需要创建一个Celery应用