这里写目录标题
- 一.昨日回顾
- 二.今日内容
- 1.celery基本使用
- 2.celery多任务结构
-
- 3.高级使用之延时任务
- 4.高级使用之定时任务
- 5.django中使用celery
- 6.首页轮播图定时更新
- 6.1首页轮播图走redis缓存(不支持定时更新)
- 6.2使用celery定时更新缓存
- luffyapi/settings/dev.py
- celery_task/home_task.py
- celery_task/celery.py
- home/views.py
- 启动worker,启动beat
一.昨日回顾
1 redis的列表操作
lpush
llen
linsert
lset
lrem
lpop
lrange
blpop
自定义增量迭代
2 redis通用方法
-delete
-exist
-expire
-rename
-type
3 redis管道
-不支持事务,但是通过管道模拟实现
-批量的多个命令一次性执行
-pipline,管道--->实现事务
4 django中集成redis
-通用做法
-写一个连接池
-通过模块导入,单例
-通过模块
-类装饰器
-类的绑定方法
-__new__
-元类
-conn=redis.Redis(connection_pool=POOL)
-第二种:
-pip install django-redis
-配置cache
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://127.0.0.1:6379",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"CONNECTION_POOL_KWARGS": {"max_connections": 100}
}
}
}
-直接使用cache
cache.set(key,value是任意数据类型,10*60)
-from django_redis import get_redis_connection
-conn=get_redis_connection(alias='default')
5 celery介绍
-分布式异步任务框架
-可以做的事
-异步任务:(同步任务)
-延迟任务:一段时间后,执行任务(函数)
-定时任务:每隔多长时间执行任务
-架构
-提交任务(user)
-消息队列(使用第三方:redis,rabbitmq)
-任务执行单元(work:工人),可以运行在不同的节点上
-结果存储:redis
6 python中执行定时任务:APScheduler--->公司里使用比较多
二.今日内容
1.celery基本使用
1 写一个py文件
import celery
broker='redis://127.0.0.1:6379/1'
backend='redis://127.0.0.1:6379/2'
app=celery.Celery('test',broker=broker,backend=backend)
@app.task
def add(a,b):
return a+b
@app.task
def mul(a,b):
return a*b
2 提交任务(在其它文件中)
from t_celery import add, mul
res=add.delay(100,4)
print(res)
3 启动worker,使用命令启动()
-4.x版本
-5.x版本
4 查看执行结果
from t_celery import app
from celery.result import AsyncResult
id = '5331c70b-1b51-4a15-aa17-2fa0f7952c00'
if __name__ == '__main__':
res = AsyncResult(id=id, app=app)
if res.successful():
result = res.get()
print(result)
elif res.failed():
print('任务失败')
elif res.status == 'PENDING':
print('任务等待中被执行')
elif res.status == 'RETRY':
print('任务异常后正在重试')
elif res.status == 'STARTED':
print('任务已经开始被执行')
2.celery多任务结构
package_celery:
celery_task
__init__.py
celery.py
order_task.py
user_task.py
result.py
submit_task.py
-celery4.x版本
celery worker -A celery_task -l info -P eventlet
-celery5.x版本
celery -A celery_task worker -l info -P eventlet
from celery_task import order_task,user_task
res=user_task.send_sms.delay('18723454566')
print(res)
res=order_task.cancel_order.delay()
print(res)
-秒杀系统
-不能秒超,使用锁(mysql悲观锁,乐观锁),redis锁
-提高并发量---》把同步做成异步---》使用celery
-前端点击秒杀按钮,向后端发送秒杀请求---》同步操作
-同步操作
-请求来到后端,判断数量是否够,如果够,要生成订单(mysql),订单状态是待支付状态
-请求返回,告诉前端,秒杀成功
-异步操作
-请求来到后端,提交一个celery任务---》celery任务异步的执行判断数量是否够,如果够,要生成订单(mysql)
-秒杀是否成功的结果还没有,直接返回了(返回任务id)
-前端启动一个定时任务,每隔5s,向后台发送一个查询请求,查询秒杀任务是否执行完成(带着任务id查)
-如果是未执行状态,或者执行中---》返回给前端,前端不处理,定时任务继续执行
-又隔了5s,发送查询,查询到秒杀成功的结果,返回给前端,秒杀成功
多任务结构小案例
'''=============>package_celery/celery_task/celery.py'''
import celery
broker='redis://127.0.0.1:6379/1'
backend='redis://127.0.0.1:6379/2'
app=celery.Celery('test',broker=broker,backend=backend,
include=['celery_task.order_task','celery_task.user_task']
)
'''=============>package_celery/celery_task/user_task.py'''
from .celery import app
@app.task
def send_sms(phone):
import time
time.sleep(1)
print('%s短信发送成功'%phone)
return '%s短信发送成功'%phone
'''=============>package_celery/celery_task/order_task.py'''
from .celery import app
@app.task
def process_order(a,b):
print(a)
print(b)
return '订单处理完了'
@app.task
def cancel_order():
import random
res=random.choice([1,0])
if res==0:
print('订单状态改了,取消订单了')
return True
else:
print('订单取消失败')
return False
'''=============>package_celery/submit_task.py'''
from celery_task import order_task,user_task
res=user_task.send_sms.delay('110')
print(res)
res=order_task.cancel_order.delay()
print(res)
'''=============>package_celery/result.py'''
from celery_task.celery import app
from celery.result import AsyncResult
id = '4595fd2f-1f8d-4546-8176-8a0627803584'
if __name__ == '__main__':
res = AsyncResult(id=id, app=app)
if res.successful():
result = res.get()
print(result)
elif res.failed():
print('任务失败')
elif res.status == 'PENDING':
print('任务等待中被执行')
elif res.status == 'RETRY':
print('任务异常后正在重试')
elif res.status == 'STARTED':
print('任务已经开始被执行')
3.高级使用之延时任务
'''=============>package_celery/submit_task.py'''
from datetime import datetime
from datetime import timedelta
ctime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
print(task_time)
res=user_task.send_sms.apply_async(args=['18977654332',],eta=task_time)
4.高级使用之定时任务
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'send-msg':{
'task': 'celery_task.user_task.send_sms',
'schedule': crontab(hour=8, day_of_month=1),
'args': ('18964352112',),
}
}
-celery4.x版本
celery beat -A celery_task -l info
celery worker -A celery_task -l info -P eventlet
-celery5.x版本
celery -A celery_task beat -l info
celery -A celery_task worker -l info -P eventlet
5.django中使用celery
1 celery是独立的,跟框架没有关系
2 django-celery第三方模块,兼容性不好,咱们不采用,咱们使用通用方式
3 目录
celery_task
__init__.py
celery.py
home_task.py
order_task.py
user_task.py
luffyapi
1.在luffyapi根目录下创建一个包celery_task
'''celery_task/celery.py'''
import celery
broker='redis://127.0.0.1:6379/1'
backend='redis://127.0.0.1:6379/2'
app=celery.Celery('test',broker=broker,backend=backend,
include=['celery_task.order_task','celery_task.user_task']
)
'''celery_task/order_task.py'''
from .celery import app
@app.task
def send_sms(phone):
import time
time.sleep(1)
print('%s短信发送成功'%phone)
return '%s短信发送成功'%phone
'''celery_task/user_task.py'''
from .celery import app
@app.task
def process_order(a,b):
print(a)
print(b)
return '订单处理完了'
@app.task
def cancel_order():
import random
res=random.choice([1,0])
if res==0:
print('订单状态改了,取消订单了')
return True
else:
print('订单取消失败')
return False
'''luffyapi/apps/user/views.py'''
from celery_task import user_task
from celery_task.celery import app
from celery.result import AsyncResult
def test_celery(request):
res_id=request.GET.get('id')
if res_id:
res=AsyncResult(id=res_id,app=app)
if res.successful():
result=res.get()
print(result)
return HttpResponse('执行完成了,结果是:%s'%result)
res=user_task.send_sms.delay('110')
return HttpResponse('任务号是:%s'%str(res))
'''luffyapi/apps/user/urls.py'''
path('test_celery/',views.test_celery),
2.启动luffyapi,在浏览器输入以下网址
3.启动worker
6.首页轮播图定时更新
1 把首页轮播图接口改成,先去缓存中取,缓存中没有,再去数据库查
2 首页轮播图加入了缓存
3 以后,如果你的接口,请求慢,第一反应就是先使用缓存
4 使用celery定时更新缓存
6.1首页轮播图走redis缓存(不支持定时更新)
只有第一次访问是没走缓存,后面都是走的redis缓存,减少访问数据库的次数,为数据库减压
'''luffyapi/apps/home/views.py'''
from django.core.cache import cache
from rest_framework.response import Response
class BannerView(GenericViewSet, ListModelMixin):
queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('order')[:settings.BANNER_SIZE]
serializer_class = serializer.BannerModelSerializer
def list(self,request,*args,**kwargs):
banner_data=cache.get('banner_data')
print(banner_data)
if banner_data:
print('走了缓存')
return Response(data=banner_data)
res=super().list(request,*args,**kwargs)
cache.set('banner_data',res.data)
print('没走缓存')
return res
'''luffyapi/settings/dev.py'''
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://127.0.0.1:6379",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"CONNECTION_POOL_KWARGS": {"max_connections": 100}
}
}
}
6.2使用celery定时更新缓存
luffyapi/settings/dev.py
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://127.0.0.1:6379",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"CONNECTION_POOL_KWARGS": {"max_connections": 100}
}
}
}
celery_task/home_task.py
from celery_task.celery import app
@app.task
def update_banner():
from django.core.cache import cache
from django.conf import settings
from home import models
from home import serializer
banners=models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-order')[:settings.BANNER_SIZE]
ser = serializer.BannerModelSerializer(instance=banners,many=True)
banner_data=ser.data
for banner in banner_data:
banner['img'] = 'http://127.0.0.1:8000%s' % banner['img']
cache.set('banner_data',banner_data)
return True
celery_task/celery.py
import celery
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
broker='redis://127.0.0.1:6379/1'
backend='redis://127.0.0.1:6379/2'
app=celery.Celery('test',broker=broker,backend=backend,
include=['celery_task.order_task','celery_task.user_task','celery_task.home_task']
)
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'update-banner':{
'task': 'celery_task.home_task.update_banner',
'schedule': timedelta(seconds=10),
'args': (),
}
}
home/views.py
from django.core.cache import cache
from rest_framework.response import Response
class BannerView(GenericViewSet, ListModelMixin):
queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('order')[:settings.BANNER_SIZE]
serializer_class = serializer.BannerModelSerializer
def list(self,request,*args,**kwargs):
banner_data=cache.get('banner_data')
print(banner_data)
if banner_data:
print('走了缓存')
return Response(data=banner_data)
res=super().list(request,*args,**kwargs)
cache.set('banner_data',res.data)
print('没走缓存')
return res
启动worker,启动beat
celery -A celery_task beat -l info
celery -A celery_task worker -l inf
o -P eventlet
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)