使用 Celery 通过 Gevent 进行实时、同步的外部 API 查询

2024-05-12

我正在开发一个 Web 应用程序,该应用程序将接收用户的请求,并且必须调用许多外部 API 来编写对该请求的答案。这可以直接从主 Web 线程使用 gevent 之类的东西来扇出请求来完成。

或者,我在想,我可以将传入的请求放入队列中,并使用工作人员来分配负载。我们的想法是尽量保持实时性,同时将请求分配给多个工作人员。这些工作人员中的每一个都只会查询众多外部 API 之一。然后,他们收到的响应将经过一系列转换,保存到数据库中,转换为通用模式并保存在通用数据库中,最终组合成一个大响应,通过 Web 请求返回。 Web 请求很可能一直被阻塞,用户正在等待,因此请保持 尽可能快地排队和出队很重要。

外部 API 调用可以轻松转化为单独的任务。我认为链接 从一个 api 任务到到数据库保存任务的转换可以使用链等来完成,并且使用弦将所有结果组合起来的最终结果返回到 Web 线程。

一些问题:

  • 这可以(并且应该)使用芹菜来完成吗?
  • 我正在使用 django。我应该尝试使用 django-celery 而不是普通芹菜吗?
  • 这些任务中的每一项都可能衍生出其他任务 - 例如记录刚刚发生的事情 发生或其他类型的分支。这可能吗?
  • 任务是否可以返回它们获得的数据 - 即通过 celery(在本例中为底层的 redis)可能返回 Kb 的数据,或者它们应该写入数据库,然后只传递指向该数据的指针?
  • 每个任务主要受 I/O 限制,最初只是要使用 Web 线程中的 gevent 来扇出请求并跳过整个排队设计,但事实证明它将被重用于不同的组件。试图保持 Q 的整个往返过程是实时的,可能需要许多工作人员来确保队列大部分是空的。或者是吗?运行 gevent 工作池对此有帮助吗?
  • 我是否必须编写 gevent 特定任务,或者使用 gevent 池自动处理网络 IO?
  • 是否可以为某些任务分配优先级?
  • 让它们保持秩序怎么样?
  • 我应该跳过芹菜而只使用昆布吗?
  • 芹菜似乎更适合那些可以推迟的“任务” 对时间不敏感。我为了保持实时性而疯了吗?
  • 我还应该关注哪些其他技术?

Update: Trying to hash this out a bit more. I did some reading on Kombu and it seems to be able to do what I'm thinking of, although at a much lower level than celery. Here is a diagram of what I had in mind. It's a simplified version, i.e. skipping the DB saving steps done by worker_2.

使用 Kombu 可以访问的原始队列似乎可以让许多工作人员订阅广播消息。如果使用队列,发布者不需要知道类型和数量。使用 Celery 可以实现类似的效果吗?似乎如果你想制作和弦,你需要在运行时知道和弦中将涉及哪些任务,而在这种情况下,你可以简单地将听众添加到广播中,并简单地确保他们宣布他们在将响应添加到最终队列的运行。

更新2:我看到有广播能力 http://docs.celeryproject.org/en/latest/userguide/routing.html#broadcast你能把它和和弦结合起来吗?一般来说,芹菜可以和生昆布一起吃吗?这听起来像是一个关于冰沙的问题。


我将尽力回答尽可能多的问题。

这可以(并且应该)使用芹菜来完成吗?

是的你可以

我正在使用 django。我应该尝试使用 django-celery 而不是普通芹菜吗?

Django 对 celery 有很好的支持,可以让开发过程变得更加轻松

这些任务中的每一项都可能衍生出其他任务 - 例如日志记录 刚刚发生了什么或其他类型的分支。这可能吗?

您可以使用ignore_result = true从任务中启动子任务,仅产生副作用

任务是否可以返回他们获得的数据 - 即可能是 Kb 的数据 通过 celery 获取数据(本例中以 redis 为基础)或者应该 写入数据库,然后只传递指向该数据的指针?

我建议将结果放入数据库,然后传递 id 会让您的经纪人和工作人员感到高兴。减少数据传输/酸洗等。

每个任务主要受 I/O 限制,最初只是要使用 来自网络线程的 gevent 以扇出请求并跳过整个请求 排队设计,但事实证明它会被重用 不同的组件。试图保持整个往返行程 Qs 实时可能需要许多工作人员来确保 队列大多是空的。或者是吗?会运行 gevent 工作线程 泳池帮忙解决这个问题吗?

由于进程是 io 绑定的,那么 gevent 肯定会在这里提供帮助。然而,gevent pool'd worker 的并发性应该是多少,这也是我正在寻找答案的问题。

我是否必须编写 gevent 特定任务或将使用 gevent 池 自动处理网络IO?

当您在池中使用 Gevent 时,它会自动进行猴子修补。但是您使用的库应该能够与 gevent 很好地配合。否则,如果您使用 simplejson (用 c 编写)解析一些数据,那么这将阻止其他 gevent greenlet。

是否可以为某些任务分配优先级?

您不能为某些任务分配特定的优先级,而是将它们路由到不同的队列,然后让不同数量的工作人员监听这些队列。特定队列的工作人员越多,该队列上任务的优先级就越高。

让它们保持秩序怎么样?

连锁是维持秩序的一种方式。和弦是一个很好的总结方式。 Celery 会照顾它,所以你不必担心。即使使用 gevent 池,最终也可以推断任务执行的顺序。

我应该跳过芹菜而只使用昆布吗?

如果您的用例不会随着时间的推移而变得更复杂,并且您愿意自己通过 celeryd +supervisord 管理您的流程,那么您可以。另外,如果你不关心celerymon、flower等工具自带的任务监控。

芹菜似乎更适合“任务”,可以 延迟且对时间不敏感。

Celery 也支持计划任务。如果这就是你那句话的意思。

我为了保持实时性而疯了吗?

我不这么认为。只要你的消费者足够快,它就会像实时一样好。

我还应该关注哪些其他技术?

对于芹菜,您应该明智地选择结果存储。我的建议是使用 cassandra。它对于实时数据(写入和查询方面)都有好处。您还可以使用 redis 或 mongodb。他们在结果存储方面有自己的一系列问题。但配置上的一些调整可能会大有帮助。

如果您的意思与 celery 完全不同,那么您可以研究 asyncio (python3.5) 和 Zeromq 来实现相同的目的。但我无法对此发表更多评论。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Celery 通过 Gevent 进行实时、同步的外部 API 查询 的相关文章

随机推荐

  • 如何为 Mac OS X 制作可拖动的菜单栏图标

    我正在为我正在开发的应用程序编写菜单栏图标 但是 NSStatusBar 类没有可以通过 cmd 鼠标左键拖动来使图标可拖动的方法 如何使用 Objective C 代码使菜单栏图标可拖动 谢谢 目前您无法使用 NSStatusBar 来完
  • 无法检索用户角色

    我正在尝试从 WL 服务器将角色列表返回到移动客户端设备 在我的 LoginModule 的 createIdentity 方法中 我添加了以下代码 HashMap
  • 鼠标输入时反应显示按钮

    我有一个反应组件 它包含如下方法 mouseEnter console log this is mouse enter render var album list const albums this props if albums user
  • 我在 apache 中使用乘客 for Rails 时收到 403 错误

    我已经安装了所需的工具 并遵循了几个教程 试图让乘客做出回应 我可以访问公共文件夹 public 500 html 或 422 hml 中的静态文件 昨天我通过虚拟主机进入 发现一些乘客错误 但一段时间后 托管重新启动了服务 从那时起我就无
  • Keycloak:获取 JSON 格式的授权代码?

    我们正在开展一个学生项目 我们的目标是实现用户可以通过Keycloak使用x509证书进行授权 实际上 我们不能继续接收授权代码以将其交换为令牌请求 基本上 我们发送授权码请求并通过 URL 参数接收授权码 但我们更希望接收 JSON 格式
  • 分配函数后如何删除 onmouseout 事件?

    我有一个问题 我正在为 onmouseout 事件分配一个函数 但运行该事件后 我需要将其删除 将非常感谢您的帮助 这取决于你的代码 如果你用 d3 这样做 那么你可以说 在 onmouseout 事件函数中 element on mous
  • java中wav文件转换为字节数组

    我的项目是 阿塞拜疆语音的语音识别 我必须编写一个程序来转换wav文件到字节数组 如何将音频文件转换为byte 基本上如第一个答案中的片段所描述 但不是BufferedInputStream use AudioSystem getAudio
  • Qt:将拖放委托给子级的最佳方式

    我在 QWidget 上使用拖放 我重新实现了 DragEnterEvent dragLeaveEvent dragMoveEvent 和 dropEvent 效果很好 在我的 QWidget 中 我有其他 QWidget 子级 我希望它们
  • Python:Factory Boy 生成对象创建时指定长度的列表

    我正在尝试使用 Factoryboy 在创建时指定长度的对象中创建一个列表 我可以创建列表 但由于提供的长度 大小的惰性性质 每次尝试创建具有指定长度的列表都会导致问题 这是我到目前为止所拥有的 class FooFactory facto
  • 何时引发 Window.SourceInitialized 事件

    我能保证Window SourceInitialized事件总是会在Window Loaded事件 我需要HwndSource在我的中进一步处理的对象Window Loaded 事件处理程序我不确定到那时这是否总是可用 以下是您可以预期的事
  • 如何在多个工作表上运行脚本,Google Sheets

    我有一个脚本 想在 Google 工作表中的特定选项卡上运行 但不一定在所有选项卡上运行 我尝试执行两个不同名称的脚本 但只有最后一个保存的脚本会运行 如何编写此脚本以在特定选项卡上运行 这是我的开始脚本 function onEdit v
  • SignTool 错误:访问被拒绝

    我尝试在安装了 VS2010 的 Windows Server 2008 R2 x64 上使用新的代码签名证书对 NET 应用程序进行authenticode 签名 但 SignTool 始终响应访问被拒绝 SignTool exe sig
  • mat-tab-group 不是 Angular 9 中的已知元素

    我正在使用 Angular 9 和 Angular Material 9 2 4 我正在尝试使用mat tab 组在我的 component html 中 但我不断收到错误 mat tab group is not a known elem
  • Trello API - 未经授权的权限请求

    我正在尝试编写一个小脚本来更新卡的当前列表中的时间量 以便我们可以优化吞吐量 我在 jsfiddle 上写了一个小脚本 几乎可以工作 但我得到了一个 请求未经授权的卡许可 当尝试使用时 Trello post cards card id a
  • 将 JSON 数组转换为 bash 数组并保留空格

    我想将 JSON 文件转换为 bash 字符串数组 稍后我可以对其进行迭代 我的JSON结构如下 USERID TMCCP CREATED DATE 31 01 2020 17 52 USERID TMCCP CREATED DATE 31
  • 如何从代码隐藏中向我的 div 添加点击事件?

    如何从代码隐藏中向我的 div 添加点击事件 当我点击 div 时 会出现一个消息框 其中显示 您想删除它吗 并在框中显示 是 或 否 全部来自后面的代码 while reader Read System Web UI HtmlContro
  • 指定 body CSS 宽度安全吗?

    现在通常满足于以一定的尺寸居中 div Blah div 不使用包装器进行对齐和大小并直接将其应用到body tag 像这样 Blah 尺寸是
  • 动态显示/隐藏 Xamarin.Forms.ListView 的页眉或页脚

    有没有一种方法可以根据运行时的条件动态显示 隐藏 ListView 的标题
  • 获取列的 [0, x] 元素的最小值

    我需要计算一列 其中值是对其他列进行矢量化运算的结果 df new col df col1 min 0 df col2 然而 事实证明我不能像上面的语法一样使用 min 那么 获得 pandas 列的零和给定值之间的最小值的正确方法是什么
  • 使用 Celery 通过 Gevent 进行实时、同步的外部 API 查询

    我正在开发一个 Web 应用程序 该应用程序将接收用户的请求 并且必须调用许多外部 API 来编写对该请求的答案 这可以直接从主 Web 线程使用 gevent 之类的东西来扇出请求来完成 或者 我在想 我可以将传入的请求放入队列中 并使用