如何让 Celery 工作人员返回任务结果

2024-04-21

我有一个调用任务的烧瓶应用程序。该任务从数据库中提取数据,绘制折线图并返回在 html 页面上呈现的 html 内容。如果没有 Celery,Flask 应用程序可以正常工作并在客户端呈现折线图,但现在我想委托 celery 通过以下方式运行任务RabbitMQBroker,它运行时我可以在 Celery shell 中看到日志输出,但生成的 html 内容永远不会发送回 Flask 服务器应用程序。怎么做?

这是后续http://stackoverflow.com/questions/37839551/how-to-restart-flask-process-by-refreshing-page/38146003#38146003 http://stackoverflow.com/questions/37839551/how-to-restart-flask-process-by-refreshing-page/38146003#38146003.

#server-celery.py

app = Flask(__name__)

@app.route('/',methods=['GET'])
def index():
   return render_template("index.html")

@app.route('/', methods=['GET', 'POST'])
def plotdata():
   form = InputForm(request.form)
   if request.method == 'POST' and form.validate():
       lineChart = task.main.delay(form.startdate.data, form.enddate.data) 
       return render_template('view.html', form=form, result= lineChart)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

并且任务运行为Celery worker:

#task.py
from celery import Celery

broker = 'amqp://localhost:5672'
app = Celery(__name__, broker=broker)

def queryDatabase(param1, param2):
   #query database, return results
   return data

def plotLineChart(data):
   #plot data as line Chart
   return LineChartHtmlcontent

@app.task
def main(param1,param2):
    data = queryDatabase(param1,param2)
    return plotLineChart(data)

和客户端 html 页面:

<!--view.html-->

<form method=post action="">
<table>
  {% for field in form %}
    <tr>
    <td>{{ field.label }}</td>
        <td>{{ field }}</td>

    </tr>
  {% endfor %}
</table>
<p><input type=submit value=Submit></form></p>

<p>
{% if result != None %}
<img src="{{ result }}" width="500">
{% endif %}
</p>

更好的解决方案是简单地让任务使用 celery 异步运行,就像它的预期用途一样,并使用页面上的 javascript 定期轮询 celery 任务以查看状态。

首先,当您创建 celery 任务时,请使用 bind=True 参数。这可以让你通过self进入函数。

@app.task(bind=True)
def main(self, param1, param2):
    self.update_state(state='PENDING')
    data = queryDatabase(param1,param2)
    self.update_state(state='COMPLETE')
    return plotLineChart(data)

您可以看到我们现在可以更新任务的状态。您可以修改代码以允许元数据进入update_state方法也是如此,例如:self.update_state(state='PROGRESS', meta={'current': 0, 'total': 12})

现在您需要轮询 celery 任务以查看它的运行情况。

@blueprint.route('/status/<task_id>', methods=['GET'])
def taskstatus(task_id=None):
    task = main.AsyncResult(task_id)
    response = {
        'state': task.state,
    }
    return jsonify(response)

当您使用任务 ID 轮询 URL 时,这将返回一个 json 对象。您可以使用以下方式获取任务 IDlineChart.id当你打电话之后.delay()

请参阅 miguel grinberg 的精彩教程:http://blog.miguelgrinberg.com/post/using-celery-with-flask http://blog.miguelgrinberg.com/post/using-celery-with-flask

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何让 Celery 工作人员返回任务结果 的相关文章

  • 在 pandas 中单独打印一列的原始值?

    我有一个数据框 df pd DataFrame name george age 23 name anna age 26 现在我想检索乔治的年龄 df df name george age 但这会输出一些额外的信息以及原始值 0 23 Nam
  • 如何使用 conda 在一行中安装多个包?

    我需要使用 conda 安装以下多个软件包 我不确定 conda forge 是什么 有些使用 conda forge 有些不使用它 是否可以将它们安装成一行而不需要一一安装 谢谢 conda install c conda forge d
  • 如何使用 Python 裁剪图像中的矩形

    谁能给我关于如何裁剪两个矩形框并保存它的建议 我已经尝试过这段代码 但效果不佳 import cv2 import numpy as np Run the code with the image name keep pressing spa
  • 如何让 Angular-Flask 应用加载 html 部分?

    我试图让我的 Angular Flask 应用程序在基本 html 文件中渲染部分 HTML 文件 应用程序加载基本 html 窗口标题和页脚加载 但 ng view 没有加载任何内容 也许我到局部的角度路由不正确 文件结构 gt flas
  • Pandas 连接问题:列重叠但未指定后缀

    我有以下数据框 print df a mukey DI PI 0 100000 35 14 1 1000005 44 14 2 1000006 44 14 3 1000007 43 13 4 1000008 43 13 print df b
  • 无法使用 BeautifulSoup 和 Requests 抓取下拉菜单

    我想抓取百年灵网站上的产品页面以获取各种信息 示例页面 https www breitling com gb en watches navitimer b01 chronograph 46 AB0127211C1A1 https www b
  • 使用 pandas 将字符串对象转换为 int/float

    import pandas as pd path1 home supertramp Desktop 100 life 180 data csv mydf pd read csv path1 numcigar Never 0 1 5 Ciga
  • Perl 是否有相当于 Python 的 `if __name__ == '__main__'` 的功能?

    有没有一种方法可以确定当前文件是否是 Perl 源中正在执行的文件 在 Python 中 我们使用以下结构来做到这一点 if name main This file is being executed raise NotImplemente
  • 如何在Python代码中查找列号

    简短问题 当按上述方式调用函数时 我可以找到行号here https stackoverflow com questions 3056048 filename and line number of python script 同样 如何找到
  • 张量流和线程

    下面是来自 Tensorflow 网站的简单 mnist 教程 即单层 softmax 我尝试通过多线程训练步骤对其进行扩展 from tensorflow examples tutorials mnist import input dat
  • 如何使用 PyMongo 在重复键错误后继续插入

    如果我需要在 MongoDB 中插入尚不存在的文档 db stock update one document set document upsert True 将完成这项工作 如果我错了 请随时纠正我 但是 如果我有一个文档列表并想将它们全
  • Python 3在for循环中更改字典键的值不起作用

    我的 python 3 代码没有按预期工作 def addFunc x y print x y def subABC x y z print x y z def doublePower base exp print 2 base exp d
  • Pandas groupby apply 执行缓慢

    我正在开发一个涉及大量数据的程序 我正在使用 python pandas 模块来查找数据中的错误 这通常工作得非常快 然而 我当前编写的这段代码似乎比应有的速度慢得多 我正在寻找一种方法来加快速度 为了让你们正确测试它 我上传了一段相当大的
  • 如何使用 Celery 多工作人员启用自动缩放?

    命令celery worker A proj autoscale 10 1 loglevel info启动具有自动缩放功能的工作人员 当创建多个工人时 me mypc projects x celery multi start mywork
  • 在 scipy 中创建新的发行版

    我试图根据我拥有的一些数据创建一个分布 然后从该分布中随机抽取 这是我所拥有的 from scipy import stats import numpy def getDistribution data kernel stats gauss
  • Python 导入非常慢 - Anaconda python 2.7

    我的 python import 语句变得非常慢 我使用 Anaconda 包在本地运行 python 2 7 导入模块后 我编写的代码运行得非常快 似乎只是导入需要很长时间 例如 我使用以下代码运行了一个 tester py 文件 imp
  • 如何在Tensorflow中保存估计器以供以后使用?

    我按照教程 TF Layers 指南 构建卷积神经网络 以下是代码 https github com tensorflow tensorflow blob r1 1 tensorflow examples tutorials layers
  • 如何从 nltk 下载器中删除数据/模型?

    我在 python3 NLTK 中安装了一些 NLTK 包 通过nltk download 尝试过它们 但不需要它们 现在想删除它们 我怎样才能删除例如包large grammars来自我的 NLTK 安装 我不想删除完整的 NLTK 安装
  • Java/Python 中的快速 IPC/Socket 通信

    我的应用程序中需要两个进程 Java 和 Python 进行通信 我注意到套接字通信占用了 93 的运行时间 为什么通讯这么慢 我应该寻找套接字通信的替代方案还是可以使其更快 更新 我发现了一个简单的修复方法 由于某些未知原因 缓冲输出流似
  • 使用 SERVER_NAME 时出现 Flask 404

    在我的 Flask 配置中 我将 SERVER NAME 设置为 app example com 之类的域 我这样做是因为我需要使用url for with external网址 如果未设置 SERVER NAME Flask 会认为服务器

随机推荐