应用场景:
发布系统:让他晚上2点执行发布的一串任务 定时任务
更新访问量: 晚上2点定时将数据从redis更新到mysql中去
商城类的抢购工作:大批量的用户涌入,承载不了一次性处理这么多的活儿,用这个方式也可以
目录结构
1.让woker运行
注意要pip install selery==3.1.1??
windos下如何启动
2.运行flask
3.创建任务
创建任务是立即创建成功的 对用户友好,意思是能马上看到我的任务被创建了 如下
4.查看详情
(拿到随机字符串去检测是否成功)
终止功能的参数
创建一个终止的功能
如何配置定时任务
代码:见老师的 01_celery(技术分享)
目录结构
代码
celery.py
from celery import Celery
from celery.schedules import crontab
cel = Celery(
'tasks',
broker='redis://127.0.0.1:6379',
backend='redis://127.0.0.1:6379',
include=['celery_tasks.s1','celery_tasks.s2']
)
View Code
s1.py
from .celery import cel
@cel.task
def hello1(*args, **kwargs):
return "hello1"
View Code
s2.py
from .celery import cel
@cel.task
def hello2(*args, **kwargs):
return "hello2"
View Code
app.py 调用celery中的内容,处理请求
import datetime
from flask import Flask,request,render_template,redirect
from celery.result import AsyncResult
from celery_tasks.celery import cel
from celery_tasks.s1 import hello1
app = Flask(__name__)
TASK_LIST = [
]
@app.route('/index')
def index():
"""
任务列表
:return:
"""
return render_template('index.html',tasks= TASK_LIST)
@app.route('/add_task',methods=['GET','POST'])
def add_task():
if request.method == 'GET':
return render_template('add_task.html')
else:
title = request.form.get('title')
# 在celery中添加一个任务
ctime = datetime.datetime.now()
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
ctime_x = utc_ctime + datetime.timedelta(seconds=10)
# 在celery中添加任务 执行一个定时任务,
result = hello1.apply_async(args=[1, 3], eta=ctime_x)
TASK_LIST.append({'id':result.id,'title':title})
return redirect('/index')
@app.route('/status')
def status():
#查看任务状态的功能
id = request.args.get('id')
# 根据id查看任务状态
try:
async = AsyncResult(id=id, app=cel)
# async.revoke(terminate=True) # 无论现在是什么时候,都要终止
# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
if async.successful():
result = async.get()
return "执行完成,结果是:%s" %(result,)
# async.forget() # 将结果删除
elif async.failed():
return '执行失败'
elif async.status == 'PENDING':
return '任务等待中被执行'
elif async.status == 'RETRY':
return '任务异常后正在重试'
elif async.status == 'STARTED':
return '任务已经开始被执行'
except Exception as e:
return "执行异常"
@app.route('/status')
def revoke():
#终止功能
id = request.args.get('id')
# 根据id查看任务状态
async = AsyncResult(id=id, app=cel)
async.revoke(terminate=True) # 无论现在是什么时候,都要终止
return '已经终止'
if __name__ == '__main__':
app.run()