如何让 celery Worker 停止接收新任务 (Kubernetes)

2024-04-19

因此,我们有一个 kubernetes 集群,运行一些带有 celery 工作线程的 pod。我们使用 python3.6 来运行这些工作程序,celery 版本是 3.1.2(我知道,真的很旧,我们正在努力升级它)。我们还设置了一些自动缩放机制来动态添加更多的 celery 工作人员。

问题如下。假设在任何给定时间我们都有 5 个工人。然后会出现大量任务,从而增加 Pod 的 CPU/RAM 使用率。这会触发一个自动缩放事件,比方说,添加两个 celery 工作单元。所以现在这两个新的 celery 工人承担了一些长时间运行的任务。在完成运行这些任务之前,kubernetes 会创建一个缩减事件,杀死这两个工作人员,并杀死那些长时间运行的任务。

此外,由于遗留原因,如果任务未完成,我们没有重试机制(并且我们现在无法实现)。

所以我的问题是,有没有办法告诉 kubernetes 等待 celery 工作线程运行完所有待处理的任务?我想解决方案必须包括某种方法来通知芹菜工作人员以使其停止接收新任务。现在我知道 Kubernetes 有一些脚本可以处理这种情况,但我不知道在这些脚本上写什么,因为我不知道如何让 celery Worker 停止接收任务。

任何想法?


我写了一个博客文章 https://blog.dy.engineering/hpa-for-celery-workers-6efd82444aee正是关于那个主题 - 看看吧。

当 Kubernetes 决定杀死一个 pod 时,它首先发送 SIGTERM 信号,以便您的应用程序有时间正常关闭,之后如果您的应用程序没有结束 - Kubernetes 将通过发送 SIGKILL 信号来杀死它。

SIGTERM 到 SIGKILL 之间的这个周期可以通过以下方式调整terminationGracePeriodSeconds(更多关于它here https://kubernetes.io/docs/concepts/containers/container-lifecycle-hooks/#hook-handler-execution).

换句话说,如果最长的任务需要 5 分钟,请确保将此值设置为高于 300 秒。

如您所见,芹菜为您处理这些信号here https://docs.celeryq.dev/en/stable/userguide/workers.html#stopping-the-worker(我想这也与您的版本相关):

应使用 TERM 信号来完成关闭。

当启动关闭时,工作人员将完成当前的所有工作 在实际终止之前执行任务。如果这些任务是 重要的是,你应该等待它完成后再做任何事情 激烈的,比如发送 KILL 信号。

正如文档中所解释的,您可以设置acks_late=True 配置 https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late因此,如果任务意外停止,它将再次运行。

我没有找到文档的另一件事(几乎可以肯定我在某个地方看到过它) - Celery 工作人员在收到 SIGTERM 后不会收到新任务 - 所以你应该安全地终止工作人员(可能需要设置worker_prefetch_multiplier = 1以及)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何让 celery Worker 停止接收新任务 (Kubernetes) 的相关文章

随机推荐

  • 通过 unix shell 命令查找给定文件扩展名的首选应用程序

    这可能不是strictly关于编程 但如果我找不到现成的解决方案 它可能会成为一个编程任务 在 UNIX 上 用于确定给定文件类型的用户首选应用程序的命令行方法是什么 我理想的解决方案是一个命令 让我不必执行以下操作 okular foo
  • OnItemCLickLIstener 不适用于 ListView

    我有一个带有 ListView 的活动 具有自定义视图的 ListView 我将 OnItemClickLIstener 添加到 ListView 中 当我点击项目时 结果我什么也没看到 ListView 的活动
  • 从日期中减去时间 - 时刻 js

    例如我有这个日期时间 01 20 00 06 26 2014 我想减去这样的时间 00 03 15 之后我想将结果格式化为这样 3 hours and 15 minutes earlier 我怎样才能做到这一点使用moment js edi
  • useReducer Action 调度两次

    Scenario 我有一个返回操作的自定义挂钩 父组件 Container 利用自定义钩子并将操作作为 prop 传递给子组件 Problem 当从子组件执行操作时 实际调度会发生两次 现在 如果子级直接使用钩子并调用操作 则调度仅发生一次
  • 当我不知道它是否是临时的时,C++ 返回类型

    假设Foo是一个相当大的数据结构 我应该怎样写一个const返回实例的虚函数Foo 如果我不知道继承的类是否会存储Foo内部 因此 允许通过引用返回 如果我无法在内部存储它 我的理解是我无法返回const引用它 因为它将是临时的 它是否正确
  • 如何使用 Laravel + JavaScript 创建搜索过滤器?

    我最近创建了一个 JavaScript 过滤器来过滤产品表中的数据 我有 5 个字段可以输入搜索 它们是 描述 型号 经销商和库存 我将表与另一个视图中的产品分开 并将字段保留在索引中 我需要该表返回我在字段中输入的值 我举了一个用 描述
  • WP7 检查互联网是否可用

    我的应用程序 WP7 未被接受 因为如果互联网不可用 它无法加载 我寻找一种方法来检查它并找到了这个命令 NetworkInterface GetIsNetworkAvailable 但它无法在模拟器上运行 而且我没有任何设备来测试它 有人
  • 在 git-svn 中克隆主干后克隆分支的最佳方法是什么?

    给定一个包含许多分支的大型 Subversion 存储库 我想开始使用git svn通过克隆trunk首先 然后添加特定分支 我看到至少三种方法可以做到这一点 但是其中任何一种都是 官方的 还是有最好的方法 假设以下布局 https svn
  • Delphi 6:在缺少抽象类方法时强制编译器错误?

    我使用的是 Delphi Pro 6 现在 了解类是否缺少基类抽象方法的唯一方法是等待 IDE 发出 包含抽象方法 base class 抽象方法名称 警告或在尝试调用缺少的方法时等待运行时抽象错误方法 前者是不够的 因为它只查找当前项目中
  • 如何在 log4j 中启用包级别日志记录

    谁能告诉我 log4j 中的包级别日志记录是什么 以及如何实现这一点 今天我的面试问题无法回答 即使我在谷歌中也没有找到好的解决方案 太感谢了 包级别日志记录是 log4j 的标准日志记录 使用 log4j 配置 您可以指定包和关联的级别
  • 将 std::wstring 转换为 int

    我认为这非常简单 但我无法让它发挥作用 我只是想将 std wstring 转换为 int 到目前为止我已经尝试了两种方法 第一种是将 C 方法与 atoi 一起使用 如下所示 int ConvertedInteger atoi OrigW
  • 如何让传单地图画布具有 100% 的高度?

    我的传单画布目前如下所示 高度为 700 像素 不过我希望它的高度为 100 以便占据整个空白区域 高度 100 在地图画布的 CSS 属性中不起作用 我找到了一些解决方案 但它们只适用于谷歌地图 有没有人有解决方案 即使这只是一个解决方法
  • 在 Matplotlib 3D 绘图中获取观察/相机角度?

    当我用鼠标旋转 Matplotlib 3D 图时 如何保存视角 相机位置 并在下次运行脚本时使用这些值以编程方式设置视角 TL DR 视角存储在图形的轴对象中 名称为elev and azim 并且视图可以设置为plt gca view i
  • jQuery AJAX 参数未传递给 MVC

    我有点陷入可能是常见的情况 但找不到太多解决方案 我将单个 int 参数传递给 MVC 控制器方法 期望返回 Json 响应 问题是 该参数虽然在客户端填充 但在服务器端无法识别并被解释为空 这是代码 function getBatches
  • 无法在react-native中获取iOS推送通知设备令牌

    我提到这个问题 https stackoverflow com questions 35387227 get device token with react native获取设备令牌以便将推送通知发送到我的应用程序 我使用创建了我的应用程序
  • 如何在 Swift 中获取由整数表示的 Unicode 代码点?

    所以我知道如何将字符串转换为utf8格式 如下所示 for character in strings utf8 for example A will converted to 65 var utf8Value character 我已经阅读
  • “栅栏已经激活——来不及添加写入”

    下面的错误信息是什么意思 栅栏已经激活 来不及添加写入 以下是如何获取它的示例 环境 Mac OS X http en wikipedia org wiki Mac OS X Lion 流星0 3 8 项目创建 meteor create
  • 熊猫到D3。将数据帧序列化为 JSON

    我有一个包含以下列且没有重复项的 DataFrame region type name value 可以看作是一个层次结构 如下所示 grouped df groupby region type name 我想将此层次结构序列化为 JSON
  • 任何无需 GUI/X 会话即可使用 GreaseMonkey 脚本运行 Firefox 的方法

    我需要为第三方网站构建一个小型 监控 抓取工具 这是一个外部网站 其中包含有关我们访问者的统计信息 不幸的是 这个网站很难通过正常的 wget 机制 因为它使用了大量复杂的 JS 其中一部分是由 GWT 生成的 所以我的解决方法是创建一个
  • 如何让 celery Worker 停止接收新任务 (Kubernetes)

    因此 我们有一个 kubernetes 集群 运行一些带有 celery 工作线程的 pod 我们使用 python3 6 来运行这些工作程序 celery 版本是 3 1 2 我知道 真的很旧 我们正在努力升级它 我们还设置了一些自动缩放