项目目录结构:
my_celery
course # app 应用
admin.py
__init__.py
migrations
models.py
tests.py
views.py # 处理url请求类视图
tasks.py # 执行任务模块
manage.py
my_celery
__init__.py
celeryconfig.py # celery配置文件
settings.py # django 的配置文件
urls.py # url请求地址
wsgi.py
1.celeryconfi.py:
# -*- coding: utf-8 -*-
import djcelery
from celery import Celery, platforms
from datetime import timedelta
djcelery.setup_loader()
# 设置队列
CELERY_QUEUES = {
'beat_tasks': {
'exchage': 'beat_tasks',
'exchage_type': 'direct',
'binding_key': 'beat_tasks'
},
'work_queue': {
'exchage': 'work_queue',
'exchage_type': 'direct',
'binding_key': 'work_queue'
},
}
# 解决celery不能用root用户启动问题
platforms.C_FORCE_ROOT = True
# 设置默认的队列
CELERY_DEFAULT_QUEUE = 'work_queue'
# 导入指定的任务模块
CELERY_IMPORTS = (
'course.tasks',
)
# 有些情况可以防止死锁
CELERYD_FORCE_EXECV = True
# 设置并发的worker数量(一般情况下根据CUP核数设置)
CELERYD_CONCURRENCY = 4
# 任务失败了, 允许重试
CELERY_ACKS_LATE = True
# 每个worker最多执行100个任务被销毁,可以防止内存泄露
CELERYD_MAX_TASKS_PER_CHILD = 100
# 单个任务的最大运行时间
CELERD_TASK_TIME_LIMIT = 12 * 30
# 设置定时任务
CELERYBEAT_SCHEDULE = {
'task1': {
'task': 'course-task',
'schedule': timedelta(seconds=5),
'options': { # 指定任务队列
'queue': 'beat_tasks'
}
}
}
2.settings.py:
INSTALLED_APPS = (
.........
'djcelery', # 将djcelery 添加应用中
'course' # 新创建的course应用
)
# Celery 连接Redis
from .celeryconfig import *
BROKER_BACKEND = 'redis'
BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
3.urls.py:
from django.conf.urls import include, url
from django.contrib import admin
urlpatterns = [
# Examples:
# url(r'^$', 'my_celery.views.home', name='home'),
# url(r'^blog/', include('blog.urls')),
url(r'^admin/', include(admin.site.urls)),
url(r'^do/$', 'course.views.do', name='do') # 配置主url
4.views.py:
# -*- coding: utf-8 -*-
from django.http import JsonResponse
from course.tasks import CourseTask
def do(request):
print 'start do request'
CourseTask.delay()
# 如果指定参数的话, 建议使用apply_async
# CourseTask.apply_async(args=('hello',), queue='work_queue')
print 'end do request'
return JsonResponse({'result': 'ok'})
5.tasks.py:
# -*- coding: utf-8 -*-
import time
from celery.task import Task
# 执行任务
class CourseTask(Task):
def run(self, *args, **kwargs):
print 'start course task'
time.sleep(5)
print 'end course task'
-
在manage.py 当前模块执行启动django服务器命令:python manage.py runserver
-
在mange.py 当前模块启动celery worker 命令:python manage.py celery worker -l INFO
-
如果遇到以下错误:
Running a worker with superuser privileges when the
worker accepts messages serialized with pickle is a very bad idea!
If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).
-
解决办法:
from celery import Celery, platforms
# 解决celery不能用root用户启动问题
platforms.C_FORCE_ROOT = True
-
在mange.py 当前模块启动celery beat 命令:python manage.py celery beat -l INFO
-
访问:127.0.0.1:8000, 会显示:{'result': 'ok'}