Django高级扩展之celery使用

2023-11-04

Celery是一个简单、灵活、可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。是一个专注于实时处理的任务队列,同时还支持任务调度。

目录

应用场景

问题

解决

celery架构图

安装

配置celery

Settings.py配置

创建celery

修改__init__

开启celery

异步执行

创建任务文件

视图

路由

演示

启动django

重启celery

浏览器访问

获取执行结果

AsyncResult属性和方法

演示

视图

路由

浏览器访问

Celery报错

kombu.async.timer

cannot import name 'current_app'

AttributeError

定时执行

增加配置

多个任务执行

创建执行方法

启动celery

绑定任务

实现绑定

实现错误重试

celery管理和监控

安装

运行命令

运行flower

浏览器访问

总结

参考文章


应用场景

问题

1.用户发起请求,需要等待响应返回;但是在视图中如果有一些耗时操作,可能导致用户会等待很长时间,这样用户体验非常不好。

2.网站隔一段时间要同步一次数据,但是http请求是需要触发的。

解决

使用celery来解决:耗时的操作方法在celery中异步执行;还可以使用celery定时执行。

 

celery架构图

celery由以下四部分构成:

任务(Task)、代理(Broker)、任务执行(Worker)、结果存储(Backend)。

 

安装

命令如下:

pip install celery
pip install redis

# window下安装,linux下不需要
pip install eventlet

 

配置celery

Settings.py配置

Settings.py中在最下面配置

# celery配置
# Broker配置,使用Redis作为消息中间件
BROKER_URL = 'redis://127.0.0.1:6379/1'
# 若有密码这样配置
# BROKER_URL = 'redis://:pwd@127.0.0.1:6379/1'

# BACKEND配置,这里使用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
# 若有密码这样配置
# CELERY_RESULT_BACKEND = 'redis://:pwd@127.0.0.1:6379/1'

# 结果序列化方案
CELERY_RESULT_SERIALIZER = 'json'

# 任务结果过期时间,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24

# 时区配置
CELERY_TIMEZONE = 'Asia/Shanghai'

# 指定导入的任务模块,可以指定多个
# CELERY_IMPORTS = (
#    'other_dir.tasks',
# )

 

创建celery

在工程目录下的project目录下创建celery.py文件。

内容如下:

import os
from celery import Celery
from django.conf import settings


# 设置系统环境变量,安装django,必须设置,否则在启动celery时会报错

# project 是当前工程目录名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')

celery_app = Celery('project')
celery_app.config_from_object('django.conf:settings')
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

修改__init__

在工程目录下project的__init__文件中添加

from .celery import celery_app

__all__ = ['celery_app']

开启celery

Window命令如下:

celery  -A project worker  -l debug -P eventlet

pip 安装eventlet,在启动celery的参数加上eventlet,

原理是windows不支持celery的进程执行。

Linux 命令如下:

celery  -A project worker  -l

成功截图如下:

Redis数据库 

 

异步执行

创建任务文件

在子应用目录下创建tasks.py来执行任务(写耗时异步执行)

内容如下:

from celery import shared_task
import time



@shared_task
def add(x, y):
    print('添加一个事件')
    time.sleep(10)
    print(x, y)
    print('事件执行完成')

视图

应用视图views.py中增加引入和视图方法

from .tasks import *


def task_add(request):
    add.delay(100, 200)
    return HttpResponse(f'调用函数结果')

路由

urlpatterns = [
    # celery
    path(r'task_add', views.task_add, name='task_add'),
]

演示

启动django

python manage.py runserver

重启celery

celery  -A project worker  -l debug -P eventlet

浏览器访问

然后浏览器访问task路由。

 通过worker的控制台,可以看到任务被worker处理。

获取执行结果

可通过AsyncResult对象通过返回的事件id来获取事件信息。

AsyncResult属性和方法

state: 返回任务状态,等同status;

task_id: 返回任务id;

result: 返回任务结果,同get()方法;

ready(): 判断任务是否执行以及有结果,有结果为True,否则False;

info(): 获取任务信息,默认为结果;

wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;

successful(): 判断任务是否成功,成功为True,否则为False;

演示

视图

from celery import result
from django.http import JsonResponse


def task_info(request):

    task_id = request.GET.get('task_id')

    res = result.AsyncResult(task_id)
    if res.ready():
        return JsonResponse({'status': res.state, 'result': res.get()})
    else:
        return JsonResponse({'status': res.state, 'result': ''})

路由

path(r'task_info/<str:id>', views.task_info, name='task_info'),

浏览器访问

地址栏传入新增事件时,返回的事件id

 

Celery报错

kombu.async.timer

开启celery,提示错误Kombu.async.timer import Entry......

解决方法:

卸载celery,重装固定版本

命令如下:

pip install celery==4.3

 

cannot import name 'current_app'

cannot import name 'current_app' from 'celery'

解决方法:

打开D:\python3.7\lib\site-packages\celery\local.py"

原方法

def getappattr(path):
    """Get attribute from current_app recursively.

    Example: ``getappattr('amqp.get_task_consumer')``.

    """
    from celery import current_app
    return current_app._rgetattr(path)

修改为:

def getappattr(path):

    """Get attribute from current_app recursively.

    Example: ``getappattr('amqp.get_task_consumer')``.


    """
    # from celery import current_app
    from celery._state import current_app
    return current_app._rgetattr(path)

AttributeError

AttributeError: 'EntryPoints' object has no attribute 'get'

解决方法:

安装固定版本的importlib-metadata

命令如下:

pip install importlib-metadata==4.13.0

 

定时执行

增加配置

工程目录下/settings.py

CELERYBEAT_SCHEDULE = {
    'every_5_seconds': {
        # 任务路径 应用目录/任务文件/方法
        'task': 'myapp.tasks.schedule_execute',
        # 每5秒执行一次
        'schedule': 5,
        # 设置参数
        'args': (14, 6, 5)
    }
}

多个任务执行

可在原来基础上继续增加

CELERYBEAT_SCHEDULE = {
    'every_5_seconds': {
        # 任务路径 应用目录/任务文件/方法
        'task': 'myapp.tasks.schedule_execute',
        # 每5秒执行一次
        'schedule': 5,
        # 设置参数
        'args': (14, 6, 5)
    },
    'every_10_seconds': {
        # 任务路径 应用目录/任务文件/方法
        'task': 'myapp.tasks.schedule_execute',
        # 每10秒执行一次
        'schedule': 10,
        # 设置参数
        'args': (18, 10, 10)
    },
}

创建执行方法

在task.py中设置执行方法并记录日志

from celery import shared_task
import logging

logger = logging.getLogger(__name__)


@shared_task
def schedule_execute(x, y, s):
    logger.info('_____' * 20)
    logger.info('每%d秒执行一次' % s)
    logger.info('%d + %d = %d' % (x, y, (x+y)))
    logger.info('执行结束')
    logger.info('_____' * 20)

启动celery

(两个cmd)分别启动worker和beat

celery -A project worker -l debug -P eventlet
celery beat -A project -l debug

可通过控制台查看执行结果。

绑定任务

Celery可通过task绑定到实例获取到task的上下文,这样我们可以在task运行时候获取到task的状态,记录相关日志等

实现绑定

修改应用任务文件内容:在装饰器中加入参数 bind=True,在add函数中的第一个参数设置为self。

内容如下:

@shared_task(bind=True)
def add(self, x, y):

    print('添加一个事件')
    time.sleep(2)
    print(x, y)
    logger.info(self.name)
    logger.info(dir(self))
    print('事件执行完成')

实现错误重试

self对象是myapp.tasks.add实例,可用于实现重试。

@shared_task(bind=True)
def add(self, x, y):

    try:
        print('添加一个事件')
        time.sleep(2)
        print(x, y)
        logger.info(self.name)
        # 没有state属性 只是为了验证重试
        logger.info(self.state)
        logger.info(dir(self))
        print('事件执行完成')

    except Exception as e:

        # 出错每5秒尝试一次,尝试3次
        self.retry(exc=e, countdown=5, max_retries=3)

 

celery管理和监控

celery通过flower组件实现管理和监控功能。

Flower官网

Getting started — Flower 2.0.0 documentation

安装

pip install flower

运行命令

celery -A project flower

Or

celery -A project flower --port=5001

参数说明

-A 项目名称

--port 端口号,默认5555

运行flower

运行命令,显示如下:

浏览器访问

http://127.0.0.1:port

 

总结

本文主要介绍了celery的应用场景;

如何安装及安装哪些类库;

异步和定时执行实现以及任务可视化管理。

参考文章

https://www.cnblogs.com/chunyouqudongwuyuan/p/16892475.html

Django 中celery的使用_django celery_宠乖仪的博客-CSDN博客

在django中使用celery_哔哩哔哩_bilibili

一文读懂 Python 分布式任务队列 celery

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Django高级扩展之celery使用 的相关文章

  • minAreaRect OpenCV 返回的裁剪矩形 [Python]

    minAreaRectOpenCV 中返回一个旋转的矩形 如何裁剪矩形内图像的这部分 boxPoints返回旋转矩形的角点的坐标 以便可以通过循环框内的点来访问像素 但是在 Python 中是否有更快的裁剪方法 EDIT See code在
  • 如何在seaborn热图标签中使用科学计数法?

    我正在尝试在 python 中使用seaborn 获取热图 不幸的是 即使数字非常大 它也没有使用科学记数法 我想知道是否有任何简单的方法可以转换为科学记数法或任何其他合理的格式 这是显示问题的一段代码 import seaborn as
  • Python函数组成

    我尝试使用良好的语法来实现函数组合 这就是我所得到的 from functools import partial class compfunc partial def lshift self y f lambda args kwargs s
  • 在 PhotoImage 下调整图像大小

    我需要调整图像大小 但我想避免使用 PIL 因为我无法使其在 OS X 下工作 不要问我为什么 无论如何 因为我对 gif pgm ppm 感到满意 所以 PhotoImage 类对我来说没问题 photoImg PhotoImage fi
  • Python 中 time.sleep 和多线程的问题

    我对 python 中的 time sleep 函数有疑问 我正在运行一个脚本 需要等待另一个程序生成 txt 文件 虽然 这是一台非常旧的机器 所以当我休眠 python 脚本时 我遇到了其他程序不生成文件的问题 除了使用 time sl
  • 如何在动态执行的代码字符串中使用inspect.getsource?

    如果我在文件中有这段代码 import inspect def sample p1 print p1 return 1 print inspect getsource sample 当我运行脚本时 它按预期工作 在最后一行 源代码sampl
  • 烧瓶 - 404 未找到

    我是烧瓶开发的新手 这是我在烧瓶中的第一个程序 但它向我显示了这个错误 在服务器上找不到请求的 URL 如果您输入了网址 请手动检查拼写并重试 这是我的代码 from flask import Flask app Flask name ap
  • 从字典中绘制直方图

    我创建了一个dictionary计算 a 中出现的次数list每个键的内容 我现在想绘制其内容的直方图 这是我想要绘制的字典的内容 1 27 34 1 3 72 4 62 5 33 6 36 7 20 8 12 9 9 10 6 11 5
  • 如何将 Pyspark Dataframe 标题设置到另一行?

    我有一个如下所示的数据框 col1 col2 col3 id name val 1 a01 X 2 a02 Y 我需要从中创建一个新的数据框 使用 row 1 作为新的列标题并忽略或删除 col1 col2 等行 新表应如下所示 id na
  • 将 Python Selenium 输出写入 Excel

    我编写了一个脚本来从在线网站上抓取产品信息 目标是将这些信息写入 Excel 文件 由于我的Python知识有限 我只知道如何在Powershell中使用Out file导出 但结果是每个产品的信息都打印在不同的行上 我希望每种产品都有一条
  • 如何处理 Tkinter 中的窗口关闭事件?

    如何在 Python Tkinter 程序中处理窗口关闭事件 用户单击 X 按钮 Tkinter 支持一种称为协议处理程序 http web archive org web 20201111215134 http effbot org tk
  • 使用 Django Rest Framework 自定义验证错误消息

    DRF 给出的默认验证错误消息是密钥和消息的列表 将此格式自定义为文本格式的最佳方法是什么 例如 这是默认格式 message phone customer with this phone already exists email cust
  • 更改用作函数全局作用域的字典

    我想做一个 purePython 的装饰器 其中一部分是能够有选择地禁止访问函数的全局范围 有没有一种方法可以以编程方式更改哪个字典事物充当函数的全局 外部作用域 因此 例如在下面我希望能够拦截对f in h并抛出错误 但我想允许访问g因为
  • 通过新数据更新绘图,而不是在 Jupyter 笔记本中制作新绘图

    我有一些问题 希望你能帮我解决 我需要使用下拉小部件创建交互式绘图 我可以在其中选择并绘制感兴趣的数据 我通过以下方式做到这一点 import plotly graph objects as go import ipywidgets as
  • Python RE(总之检查第一个字母是否区分大小写,其余部分不区分大小写)

    在下面的情况下 我想匹配字符串 Singapore 其中 S 应始终为大写 其余单词可能为小写或大写 但在下面的字符串 s 是小写的 它在搜索条件中匹配 任何人都可以让我知道如何实施吗 import re st Information in
  • 在不同的 GPU 上同时训练多个 keras/tensorflow 模型

    我想在 Jupyter Notebook 中同时在多个 GPU 上训练多个模型 我正在使用 4GPU 的节点上工作 我想将一个 GPU 分配给一个模型并同时训练 4 个不同的模型 现在 我通过 例如 为一台笔记本选择 GPU import
  • 旧版本的 spaCy 在尝试安装模型时抛出“KeyError: 'package'”错误

    我在 Ubuntu 14 04 4 LTS x64 上使用 spaCy 1 6 0 和 python3 5 为了安装 spaCy 的英文版本 我尝试运行 这给了我错误消息 ubun ner 3 NeuroNER master src pyt
  • 如何在 Qt 中以编程方式制作一条水平线

    我想弄清楚如何在 Qt 中制作一条水平线 这很容易在设计器中创建 但我想以编程方式创建一个 我已经做了一些谷歌搜索并查看了 ui 文件中的 xml 但无法弄清楚任何内容 ui 文件中的 xml 如下所示
  • 正则表达式 - 匹配不包含字符串的模式

    我对正则表达式很陌生 并且一直在寻找方法来做到这一点 但没有成功 给定一个字符串 我想删除以 abc 开头 以 abc 结尾且中间不包含 abc 的任何模式 如果我做 abc abc abc 它将匹配以 b 开头 以 abc 结尾并且中间包
  • 用 Beautiful Soup 进行抓取:为什么 get_text 方法不返回该元素的文本?

    最近我一直在用 python 开发一个项目 其中涉及抓取一些网站的一些代理 我遇到的问题是 当我尝试抓取某个知名代理站点时 当我要求 Beautiful Soup 查找 IP 在代理表中的位置时 它并没有按照我的预期执行操作 我将尝试查找每

随机推荐

  • 智能手机普及游戏 国内外巨头上演GPU芯片争霸

    转自 http tech sina com cn t 2014 02 12 16139155996 shtml 新浪科技讯 2月12日下午消息 随着近日国家解除游戏机禁令以及游戏向手机终端转移 国内外移动通信芯片厂商高通 75 62 0 9
  • umi-request设置请求头_scrapy_splash 设置随机请求头

    本文为 霾大 scrapy splash 爬取 js 加载网页初体验 zhuanlan zhihu com 的补充 在上面的文章中我们仅仅是初步完成了 scrapy splash 的简单使用 接下来我们将介绍如何是使得 splash 在 r
  • 时间序列模型(二):AR模型

    全文共8000余字 预计阅读时间约18 30分钟 满满干货 建议收藏 介绍 在时间序列分析中 我们经常遇到一种强大而灵活的模型 即ARIMA模型 这个模型已经在各种领域 如经济学 气候学 股票市场分析等 发挥了巨大的作用 尽管ARIMA模型
  • 掌握react,这一篇就够了

    react众所周知的前端3大主流框架之一 由于出色的性能 完善的周边设施风头一时无两 本文就带大家一起掌握react jsx语法 前端MVVM主流框架都有一套自己的模板处理方法 react则使用它独特的jsx语法 在组件中插入html类似的
  • vue 的指令

    目录 一 vue 的指令 1 v text 2 v html 3 v show 4 v if v esle if v else 1 v if 2 v if 与 v show 5 v for 1 v for 渲染一个数组 2 v for 渲染
  • node-sass安装后,启动本地环境爆出,Error: Node Sass does not yet support your current environment: Windows 64-bit

    最近有个很老的项目各种依赖库都很老 在本地环境中出现 Error Node Sass does not yet support your current environment Windows 64 bit with Unsupported
  • springboot运行出现 错误: 找不到或无法加载主类 com.xxxx.xxxx.Application

    项目打成jar包放在服务器上之后就未在使用 今天打开一运行居然报错 错误 找不到或无法加载主类 com fdway omui OmUiApplication 解决办法 1 项目 右键 Debug As 或 Run As Maven inst
  • 一些有趣的 js 功能函数

    一些有趣的 js 功能函数 数组 生成数组 打乱数组 数组简单数据去重 数组唯一值数据去重 多数组取交集 查找最大值索引 查找最小值索引 找到最接近的数值 压缩多个数组 拉链函数 矩阵交换行和列 数字转换 进制转换 正则 手机号格式化 去除
  • Docx:docx.opc.exceptions.PackageNotFoundError: Package not found at

    Docx docx opc exceptions PackageNotFoundError Package not found at https blog csdn net python reported article details 1
  • java finalize方法总结、GC执行finalize的过程

    注 本文的目的并不是鼓励使用finalize方法 而是大致理清其作用 问题以及GC执行finalize的过程 1 finalize的作用 finalize 是Object的protected方法 子类可以覆盖该方法以实现资源清理工作 GC在
  • 记一次计算机网络工程实验(1) 利用VLAN划分不同网段

    一学期没上过计算机网络工程的课 今天是第一次去做实验 把经验记在这里 免得过几天又忘了 安装Cisco Packet Tracer 首先需要下载和安装这次实验的工具 Cisco Packet Tracer 这是一个模拟路由器 交换机和各种终
  • Tomcat遇到闪退和Using CATALINA_OPTS:问题如何解决

    Tomcat遇到闪退和Using CATALINA OPTS 问题如何解决 最快的方法直接重新下载tomcat 链接 https pan baidu com s 1h12kdt5ZESJDdxY4AkcVjQ pwd oqsz 提取码 oq
  • 关于日期的正则表达式

    QTP是quicktest Professional的简称 是一种自动测试工具 QTP自带教程中有关于日期的正则表达式的例子 即对时间 月 日 年采用正则表达式方法进行检查 但经常是测试失败 例子中提供的表达式为 0 1 0 9 0 3 0
  • Vue3-ElemenPlu,全栈开发后台系统-JWT方案讲解第三章-Koa架构设计接口方面实现mongdb安装配置工具函数的封装前台首页实现

    第三章 Koa架构设计 usr bin env node Module dependencies var app require app var debug require debug
  • elasticsearch script实战

    写在前面 大家在开发elasticsearch的时候都会遇到很多去怪的需求 如果我们已知的RestAPI无法帮助我们完成搜索 是就需要我们自己动手写脚本来辅助搜索 完成需求 浅谈elasticsearch script脚本使用机制 通过阅读
  • angular:angular重用策略与ionic重用策略浅谈

    angular默认重用策略 同一个路由地址互相跳转会复用 否则会重新创建component 无任何重用 ionic默认重用策略 同一个路由地址会复用 在离开当前路由时会缓存路由地址对应的组件 当再次遇到相同路由地址时会恢复 但是复用后 如果
  • Vue移动框鼠标拖拽自定义指令

    在Vue中通过自定义指令 实现指定的模块带有鼠标拖拽移动效果 移动框自定指令 Vue directive drag bind el gt let initX null let initY null el style cursor move
  • 3-3 OneHot编码

    3 3 OneHot编码 请参考 数据准备和特征工程 中的相关章节 调试如下代码 基础知识 import pandas as pd g pd DataFrame gender man woman woman man woman g gend
  • 【华为机试刷题笔记】HJ16-购物单

    王强决定把年终奖用于购物 他把想买的物品分为两类 主件与附件 附件是从属于某个主件的 下表就是一些主件与附件的例子 主件 附件 电脑 打印机 扫描仪 书柜 图书 书桌 台灯 文具 工作椅 无 如果要买归类为附件的物品 必须先买该附件所属的主
  • Django高级扩展之celery使用

    Celery是一个简单 灵活 可靠的分布式系统 用于处理大量消息 同时为操作提供维护此类系统所需的工具 是一个专注于实时处理的任务队列 同时还支持任务调度 目录 应用场景 问题 解决 celery架构图 安装 配置celery Settin