如何通过celery任务访问orm?

2024-02-15

我正在尝试使用 sqlalchemy+celerybeats 翻转数据库中特定类型对象的布尔标志。但是如何从tasks.py 文件访问我的orm?

from models import Book
from celery.decorators import periodic_task
from application import create_celery_app

celery = create_celery_app()
# Create celery: http://flask.pocoo.org/docs/0.10/patterns/celery/

# This task works fine
@celery.task
def celery_send_email(to,subject,template):
    with current_app.app_context():
        msg = Message(
            subject,
            recipients=[to],
            html=template,
            sender=current_app.config['MAIL_DEFAULT_SENDER']
        )
        return mail.send(msg)

#This fails
@periodic_task(name='release_flag',run_every=timedelta(seconds=10))
def release_flag():
    with current_app.app_context(): <<< #Fails on this line
        books = Book.query.all() <<<< #Fails here too
        for book in books:
          book.read = True
          book.save()

我正在使用 celerybeat 命令来运行它:

celery -A 任务worker -l INFO --beat

但我收到以下错误:

raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context

它指向 with current_app.app_context() 行

如果我删除 current_app.app_context() 行,我将收到以下错误:

RuntimeError: application not registered on db instance and no application bound to current context

有没有一种特殊的方法可以访问flask-sqlalchemy orm来执行celery任务?或者对于我想做的事情会有更好的方法吗?

到目前为止,唯一有效的解决方法是在后面添加以下行db.init_app(app)在我的应用程序工厂模式中:

db.app = 应用程序

我正在按照这个存储库来创建我的芹菜应用程序https://github.com/mattupstate/overholt/blob/master/overholt/factory.py https://github.com/mattupstate/overholt/blob/master/overholt/factory.py


你收到这个错误是因为current_app需要应用程序上下文才能工作,但您正在尝试使用它来设置应用程序上下文。您需要使用实际的应用程序来设置上下文,然后您可以使用current_app.

with app.app_context():
    # do stuff that requires the app context

或者您可以使用一种模式,例如中描述的模式Flask 文档 http://flask.pocoo.org/docs/dev/patterns/celery/子类化celery.Task所以它默认知道应用程序上下文。

from celery import Celery

def make_celery(app):
     celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
     celery.conf.update(app.config)
     TaskBase = celery.Task

     class ContextTask(TaskBase):
         abstract = True

         def __call__(self, *args, **kwargs):
             with app.app_context():
                 return TaskBase.__call__(self, *args, **kwargs)

     celery.Task = ContextTask
     return celery

 celery = make_celery(app)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何通过celery任务访问orm? 的相关文章

随机推荐