luffy-11/celery

2023-05-16

这里写目录标题

  • 一.昨日回顾
  • 二.今日内容
    • 1.celery基本使用
    • 2.celery多任务结构
      • 多任务结构小案例
    • 3.高级使用之延时任务
    • 4.高级使用之定时任务
    • 5.django中使用celery
    • 6.首页轮播图定时更新
      • 6.1首页轮播图走redis缓存(不支持定时更新)
      • 6.2使用celery定时更新缓存
        • luffyapi/settings/dev.py
        • celery_task/home_task.py
        • celery_task/celery.py
        • home/views.py
        • 启动worker,启动beat

一.昨日回顾

1 redis的列表操作
	lpush
	llen
	linsert
	lset
	lrem
	lpop
	lrange # 使用它,自定义增量迭代
	blpop # 分布式
	自定义增量迭代
2 redis通用方法
	-delete
	-exist
	-expire
	-rename
	-type
3 redis管道
	-不支持事务,但是通过管道模拟实现
	-批量的多个命令一次性执行
	-pipline,管道--->实现事务
4 django中集成redis
	-通用做法
		-写一个连接池
		-通过模块导入,单例
			-通过模块
			-类装饰器
			-类的绑定方法
			-__new__
			-元类
		-conn=redis.Redis(connection_pool=POOL)
	-第二种:
		-pip install django-redis
		-配置cache
			CACHES = {
                "default": {
                    "BACKEND": "django_redis.cache.RedisCache",
                    "LOCATION": "redis://127.0.0.1:6379",
                    "OPTIONS": {
                        "CLIENT_CLASS": "django_redis.client.DefaultClient",
                        "CONNECTION_POOL_KWARGS": {"max_connections": 100}
                        # "PASSWORD": "123",
                    }
                }
            }
		-直接使用cache
		cache.set(key,value是任意数据类型,10*60)
		-from django_redis import get_redis_connection
		-conn=get_redis_connection(alias='default')
5 celery介绍
	-分布式异步任务框架
	-可以做的事
		-异步任务:(同步任务)
		-延迟任务:一段时间后,执行任务(函数)
		-定时任务:每隔多长时间执行任务
	-架构
		-提交任务(user)
		-消息队列(使用第三方:redis,rabbitmq)
		-任务执行单元(work:工人),可以运行在不同的节点上
		-结果存储:redis
6 python中执行定时任务:APScheduler--->公司里使用比较多
	

二.今日内容

1.celery基本使用

1 写一个py文件
    import celery
    # 消息中间件(redis)
    broker='redis://127.0.0.1:6379/1'     # 1 表示使用redis 1 这个db
    # 结果存储(redis)
    backend='redis://127.0.0.1:6379/2'   # 2 表示使用redis 2 这个db
    # 实例化得到对象,指定中间件和结果存储
    app=celery.Celery('test',broker=broker,backend=backend)
    @app.task
    def add(a,b):
        return a+b
    @app.task
    def mul(a,b):
        return a*b
    
2 提交任务(在其它文件中)
from t_celery import add, mul

# 同步任务
# res = add(3, 4)
# res =mul(3,4)
# print(res)

# 异步任务,提交任务
res=add.delay(100,4)  # 把计算add的任务提交到了redis中
#
# #celery.result.AsyncResult
# # print(type(res))
print(res)  #dbc32eaf-bb54-4392-94d0-27073b9cdb7f
    
3 启动worker,使用命令启动()
	-4.x版本
	# 非windows平台:celery worker -A t_celery -l info
    # windows装eventlet:celery worker -A t_celery -l info -P eventlet
    -5.x版本
    # 非windows平台:celery -A t_celery worker -l info
    # windows装eventlet:celery -A t_celery worker -l info -P eventlet
    
4 查看执行结果
	from t_celery import app
    from celery.result import AsyncResult
    # 关键字,变量不能定义为关键字
    id = '5331c70b-1b51-4a15-aa17-2fa0f7952c00'
    if __name__ == '__main__':
        res = AsyncResult(id=id, app=app)
        if res.successful():
            result = res.get()
            print(result)
        elif res.failed():
            print('任务失败')
        elif res.status == 'PENDING':
            print('任务等待中被执行')
        elif res.status == 'RETRY':
            print('任务异常后正在重试')
        elif res.status == 'STARTED':
            print('任务已经开始被执行')

在这里插入图片描述

2.celery多任务结构

package_celery:     # 项目名
    celery_task     # celery包名
        __init__.py 
        celery.py   # celery 的app,必须叫celery
        order_task.py # 任务
        user_task.py  # 任务
    result.py         # 结果查询
    submit_task.py    # 提交任务
    
    
# 运行worker(在package_celery目录下执行)
	-celery4.x版本
	celery worker -A celery_task -l info -P eventlet
	-celery5.x版本
	celery -A celery_task  worker -l info -P eventlet
# 提交任务
    from celery_task import order_task,user_task
    # 提交一个给用户发短信的任务
    res=user_task.send_sms.delay('18723454566')
    print(res)
    # 提交一个取消订单任务
    res=order_task.cancel_order.delay()
    print(res)

    
# 真实应用场景
	-秒杀系统
    	-不能秒超,使用锁(mysql悲观锁,乐观锁),redis锁
        -提高并发量---》把同步做成异步---》使用celery
        	-前端点击秒杀按钮,向后端发送秒杀请求---》同步操作
            	-同步操作
                    -请求来到后端,判断数量是否够,如果够,要生成订单(mysql),订单状态是待支付状态						
                    -请求返回,告诉前端,秒杀成功
                -异步操作
                	-请求来到后端,提交一个celery任务---》celery任务异步的执行判断数量是否够,如果够,要生成订单(mysql)
                    -秒杀是否成功的结果还没有,直接返回了(返回任务id-前端启动一个定时任务,每隔5s,向后台发送一个查询请求,查询秒杀任务是否执行完成(带着任务id查)
                    -如果是未执行状态,或者执行中---》返回给前端,前端不处理,定时任务继续执行
                    -又隔了5s,发送查询,查询到秒杀成功的结果,返回给前端,秒杀成功
        	

多任务结构小案例

'''=============>package_celery/celery_task/celery.py'''
import celery
# 消息中间件(redis)
broker='redis://127.0.0.1:6379/1'     # 1 表示使用redis 1 这个db
# 结果存储(redis)
backend='redis://127.0.0.1:6379/2'   # 2 表示使用redis 2 这个db
# 实例化得到对象,指定中间件和结果存储

app=celery.Celery('test',broker=broker,backend=backend,
                  include=['celery_task.order_task','celery_task.user_task']
                  )


'''=============>package_celery/celery_task/user_task.py'''

from .celery import app

@app.task
def send_sms(phone):
	import time
	time.sleep(1)
	print('%s短信发送成功'%phone)
	return '%s短信发送成功'%phone

'''=============>package_celery/celery_task/order_task.py'''
from .celery import app

@app.task
def process_order(a,b):
	print(a)
	print(b)
	return '订单处理完了'

@app.task
def cancel_order():
	import random
	res=random.choice([1,0])
	if res==0:
		print('订单状态改了,取消订单了')
		return True
	else:
		print('订单取消失败')
		return False
'''=============>package_celery/submit_task.py'''
from celery_task import order_task,user_task

# 提交一个给用户发短信的任务
res=user_task.send_sms.delay('110')
print(res)

# 提交一个取消订单任务
res=order_task.cancel_order.delay()
print(res)

'''=============>package_celery/result.py'''
from celery_task.celery import app
from celery.result import AsyncResult

# 关键字,变量不能定义为关键字
#发短信任务: bbfbb809-71fd-4dd7-bb3b-e84590840819
# 取消订单任务:4595fd2f-1f8d-4546-8176-8a0627803584
id = '4595fd2f-1f8d-4546-8176-8a0627803584'
if __name__ == '__main__':
    res = AsyncResult(id=id, app=app)
    if res.successful():
        result = res.get()
        print(result)
    elif res.failed():
        print('任务失败')
    elif res.status == 'PENDING':
        print('任务等待中被执行')
    elif res.status == 'RETRY':
        print('任务异常后正在重试')
    elif res.status == 'STARTED':
        print('任务已经开始被执行')

3.高级使用之延时任务

'''=============>package_celery/submit_task.py'''

# celery执行延时任务
## 第一种方式:2021年1月7日17点3分12秒发送短信
# from datetime import datetime
# # # eta:延迟多长时间执行,eta需要传时间对象,并且是utc时间
# v1 = datetime(2021, 1, 7, 17, 3, 12)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# res=user_task.send_sms.apply_async(args=['18977654332',],eta=v2)


## 第二种方式:隔几秒后执行
from datetime import datetime
from datetime import timedelta
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())

time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
print(task_time)
res=user_task.send_sms.apply_async(args=['18977654332',],eta=task_time)

4.高级使用之定时任务

# -1.在celery.py中配置

# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
    'send-msg':{
        'task': 'celery_task.user_task.send_sms',
        # 'schedule': timedelta(hours=24*10),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'schedule': crontab(hour=8, day_of_month=1),  # 每月一号早八点
        'args': ('18964352112',),
    }
}

-celery4.x版本
# -2.启动beat,负责每隔3s提交一个任务
celery beat -A celery_task -l info
# -3.启动worker
celery worker -A celery_task -l info -P eventlet
-celery5.x版本
# 启动beat,负责每隔3s提交一个任务
celery -A celery_task beat -l info
# 启动worker
celery -A celery_task worker -l info -P eventlet

5.django中使用celery

1 celery是独立的,跟框架没有关系
2 django-celery第三方模块,兼容性不好,咱们不采用,咱们使用通用方式
3 目录
	celery_task
    	__init__.py
        celery.py
        home_task.py
        order_task.py
        user_task.py
	luffyapi

1.在luffyapi根目录下创建一个包celery_task



'''celery_task/celery.py'''
import celery
# 消息中间件(redis)
broker='redis://127.0.0.1:6379/1'     # 1 表示使用redis 1 这个db
# 结果存储(redis)
backend='redis://127.0.0.1:6379/2'   # 2 表示使用redis 2 这个db
# 实例化得到对象,指定中间件和结果存储

app=celery.Celery('test',broker=broker,backend=backend,
                  include=['celery_task.order_task','celery_task.user_task']
                  )

'''celery_task/order_task.py'''
from .celery import app

@app.task
def send_sms(phone):
	import time
	time.sleep(1)
	print('%s短信发送成功'%phone)
	return '%s短信发送成功'%phone
'''celery_task/user_task.py'''
from .celery import app

@app.task
def process_order(a,b):
	print(a)
	print(b)
	return '订单处理完了'

@app.task
def cancel_order():
	import random
	res=random.choice([1,0])
	if res==0:
		print('订单状态改了,取消订单了')
		return True
	else:
		print('订单取消失败')
		return False
		
'''luffyapi/apps/user/views.py'''
from celery_task import user_task
from celery_task.celery import app
from celery.result import AsyncResult
def test_celery(request):
    res_id=request.GET.get('id')
    if res_id:
        res=AsyncResult(id=res_id,app=app)
        if res.successful():
            result=res.get()
            print(result)
            return HttpResponse('执行完成了,结果是:%s'%result)
    res=user_task.send_sms.delay('110')

    return HttpResponse('任务号是:%s'%str(res))
    
'''luffyapi/apps/user/urls.py'''
path('test_celery/',views.test_celery),

2.启动luffyapi,在浏览器输入以下网址
在这里插入图片描述

在这里插入图片描述
3.启动worker
在这里插入图片描述

6.首页轮播图定时更新

1 把首页轮播图接口改成,先去缓存中取,缓存中没有,再去数据库查
2 首页轮播图加入了缓存
3 以后,如果你的接口,请求慢,第一反应就是先使用缓存
4 使用celery定时更新缓存

6.1首页轮播图走redis缓存(不支持定时更新)

只有第一次访问是没走缓存,后面都是走的redis缓存,减少访问数据库的次数,为数据库减压

在这里插入图片描述

'''luffyapi/apps/home/views.py'''
from django.core.cache import cache
from rest_framework.response import Response

class BannerView(GenericViewSet, ListModelMixin):
    queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('order')[:settings.BANNER_SIZE]
    serializer_class = serializer.BannerModelSerializer

    # 改成先从缓存中取,缓存中如果有,直接返回,没有,再去数据库查

    def list(self,request,*args,**kwargs):
        # 如果缓存中有值,直接取出来返回,速度很快
        banner_data=cache.get('banner_data')
        print(banner_data)
        if banner_data:
            print('走了缓存')
            return Response(data=banner_data)
        # 如果缓存中没有,再走数据,查出来,放到缓存中
        res=super().list(request,*args,**kwargs)
        # 把首页轮播图放到缓存中
        cache.set('banner_data',res.data)
        print('没走缓存')
        return res

'''luffyapi/settings/dev.py'''
# redis的配置
#以后django的缓存,用的就是redis,很方便使用redis的连接
CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "CONNECTION_POOL_KWARGS": {"max_connections": 100}
            # "PASSWORD": "123",
        }
    }
}

6.2使用celery定时更新缓存

luffyapi/settings/dev.py

# redis的配置
#以后django的缓存,用的就是redis,很方便使用redis的连接
CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "CONNECTION_POOL_KWARGS": {"max_connections": 100}
            # "PASSWORD": "123",
        }
    }
}

celery_task/home_task.py

from celery_task.celery import app

@app.task
def update_banner():
    from django.core.cache import cache
    from django.conf import settings
    from home import models
    from home import serializer
    banners=models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-order')[:settings.BANNER_SIZE]

    ser = serializer.BannerModelSerializer(instance=banners,many=True)
    banner_data=ser.data

    # 拿不到request对象,所以头像的连接base_url要自己组装
    for banner in banner_data:
        banner['img'] = 'http://127.0.0.1:8000%s' % banner['img']


    cache.set('banner_data',banner_data)

    return True

celery_task/celery.py

import celery

import os
# 执行django配置文件,环境变量加入
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")

broker='redis://127.0.0.1:6379/1'     # 1 表示使用redis 1 这个db

backend='redis://127.0.0.1:6379/2'   # 2 表示使用redis 2 这个db

app=celery.Celery('test',broker=broker,backend=backend,
                  include=['celery_task.order_task','celery_task.user_task','celery_task.home_task']
                  )
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {

    # 'send-msg':{
    #     'task': 'celery_task.user_task.send_sms',
    #     # 'schedule': timedelta(hours=24*10),
    #     # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
    #     'schedule': crontab(hour=8, day_of_month=1),  # 每月一号早八点
    #     'args': ('18964352112',),
    # }
    'update-banner':{
        'task': 'celery_task.home_task.update_banner',
        'schedule': timedelta(seconds=10),
        'args': (),
    }
}

home/views.py

from django.core.cache import cache
from rest_framework.response import Response

class BannerView(GenericViewSet, ListModelMixin):
    queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('order')[:settings.BANNER_SIZE]
    serializer_class = serializer.BannerModelSerializer

    # 改成先从缓存中取,缓存中如果有,直接返回,没有,再去数据库查

    def list(self,request,*args,**kwargs):
        # 如果缓存中有值,直接取出来返回,速度很快
        banner_data=cache.get('banner_data')
        print(banner_data)
        if banner_data:
            print('走了缓存')
            return Response(data=banner_data)
        # 如果缓存中没有,再走数据,查出来,放到缓存中
        res=super().list(request,*args,**kwargs)
        # 把首页轮播图放到缓存中
        cache.set('banner_data',res.data)
        print('没走缓存')
        return res

启动worker,启动beat

# 启动beat
celery -A celery_task beat -l info
# 启动worker
celery -A celery_task worker -l inf
o -P eventlet

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

luffy-11/celery 的相关文章

随机推荐

  • 什么是三层交换机、网关、DNS、子网掩码、MAC地址?

    转载自品略图书馆 http www pinlue com article 2020 08 2313 2511146576256 html 一文讲懂什么是三层交换机 网关 DNS 子网掩码 MAC地址 很多朋友多次问到什么是网关 dns 子网
  • C++类对象共享数据的5种实现方法

    转自 xff1a http www pinlue com article 2020 09 2617 0611262487540 html
  • c语言free的用法

    转自 xff1a http www pinlue com article 2020 03 3100 4610073901713 html
  • Spring Boot 修改默认端口号

    修改配置文件 xff0c 加上参数 xff1a server port 61 8014 或者 xff1a server port 8014 启动后可发现tomcat运行在端口8014上了 实现原因可看以下链接 转载 SpringBoot修改
  • php调用类中的方法

    转自 xff1a http www pinlue com article 2020 06 1219 0410725563037 html
  • 人工智能 : 第三篇”脑机接口“

    本文作者Tim Urban xff1a Wait but Why的作者Tim Urban 是埃隆马斯克 xff08 特斯拉 SpaceX创始人 xff09 强烈推荐的科技博主 他写的AI文章是全世界转发量最高的 他的粉丝还包括 xff1a
  • 如何找回一台丢失的Win10电脑?

    今天说说如何找电脑 为什么小微想到了这个问题 还要从一次关于奇葩办公地点的讨论说起 看到大家的回答 xff0c 小微佩服得五体投地 办公经历还可以如此精jing彩xin绝dong伦po 作为结实靠谱的出行伙伴 ThinkPad陪伴大家出现在
  • 不必再狂按空格键了!Word 里文字对齐推荐这4种方法

    我们在用Word写论文 制作简历的时候 xff0c 通常会遇到把word中某些特定文字对齐的情况 那么问题来了 xff0c 你平时都是怎么对齐文字的 xff1f 傻傻的用空格来对齐吗 xff1f 在字符数不等的情况下 xff0c 加空格不仅
  • AMI主板BIOS菜单图文讲解设置!

    电脑硬件 xff0c 包括电脑中所有物理的零件 xff0c 以此来区分它所包括或执行的数据和为硬件提供指令以完成任务的软件 主要包含 机箱 xff0c 主板 xff0c 总线 xff0c 电源 xff0c 硬盘 xff0c 存储控制器 xf
  • luffy-02

    这里写目录标题 一 昨日回顾二 今日内容1 路飞前台配置 1 重构项目目录 2 文件修订 xff1a 目录中非配置文件的多余文件可以移除router的使用 3 前台配置 全局样式 配置文件 axios vue cookies element
  • luffy-03

    这里写目录标题 一 昨日回顾二 今日内容1 跨域问题1 1后端自己处理跨域简单请求非简单请求中间件处理 1 2前端处理跨域App vuevue config js 2 头部组件 尾部组件components Header vuecompon
  • luffy-04

    这里写目录标题 一 昨日回顾二 今日内容1 路飞项目使用xadmin2 首页轮播图接口 轮播图表 视图类 轮播图数量4 通过配置实现 前端对接 后续 接口缓存 3 git的使用3 1git的工作流程 4 git分支git提交代码出现冲突的2
  • luffy-05

    这里写目录标题 一 首页轮播图接口二 今日内容1 过滤文件2 从远端拉项目3 远程仓库3 1链接远程仓库的两种情况 4 冲突出现的原因及解决 一 首页轮播图接口 span class token number 1 span 首页轮播图接口
  • 手把手教你搭建鸿蒙hi3518开发和运行环境

    前言 学习 C 语言 xff0c C 43 43 语言 xff0c 数据结构和算法 xff0c 操作系统 xff0c 网络 xff0c 驱动 xff0c 设计模式等知识 用鸿蒙来强化就太对了 本文教你一步一步搭建鸿蒙的开发和运行环境 xff
  • luffy-06

    这里写目录标题 一 上节回顾二 今日内容1 ssh链接和https链接2 gitlab3 git远程分支合并4 git冲突出现原因及解决5 首页登录注册vue页面Header vueLogin vue 6 登录注册功能接口分析 一 上节回顾
  • luffy-07

    这里写目录标题 一 昨日回顾二 今日内容1 登录注册前端页面1 1Login vue1 2Register vue1 3Header vue 2 多方式登录接口3 手机号是否存在接口3 1路由层user urls py3 2user ser
  • luffy-08

    这里写目录标题 一 集成了腾讯短信 封装 luffyapi lib t sms settings pyluffyapi lib t sms sms py 二 短信验证码接口2 1路由 配置2 2视图 三 短信登录接口3 1视图3 2序列化类
  • luffy-09/redis

    这里写目录标题 一 昨日回顾二 今日内容1 redis介绍2 redis的Windows安装和配置3 普通链接和连接池3 1代码3 2redis pool py 4 redis之string操作5 redis之hash操作 一 昨日回顾 s
  • luffy-10/redis/celery简单介绍

    这里写目录标题 一 昨日回顾二 今日内容2 1redis之列表操作2 2 redis之其他操作2 3 redis之管道2 4 django中使用redis2 4 1通用方案redis pool pyviews py 2 4 2django提
  • luffy-11/celery

    这里写目录标题 一 昨日回顾二 今日内容1 celery基本使用2 celery多任务结构多任务结构小案例 3 高级使用之延时任务4 高级使用之定时任务5 django中使用celery6 首页轮播图定时更新6 1首页轮播图走redis缓存