如何在任何 MQ 平台上实现这个单一并发分布式队列?

2024-06-24

我目前正在努力寻找实现特定类型队列的解决方案,这需要以下特征:

  1. 所有队列必须遵守作业添加的顺序。
  2. 整个队列的并发度为1,这意味着每个队列一次只会执行一个作业queue,不是工人。
  3. 像这样排队的人会超过几千人。
  4. 它需要分布式并且能够扩展(例如,如果我添加一个工作人员)

基本上它是一个单进程 FIFO 队列,这正是我在尝试不同的消息队列软件(如 ActiveMQ 或 RabbitMQ)时想要的,但是一旦我将其扩展到 2 个工作线程,它就不起作用,因为在这种情况下我想要它扩展并保持单个进程队列的完全相同的功能。下面我附上它如何在具有多个工作人员的分布式环境中工作的描述。

拓扑结构的示例:(请注意,它是多对多关系Queue and Workers)

其运行方式示例:

+------+-----------------+-----------------+-----------------+
| Step | Worker 1        | Worker 2        | Worker 3        |
+------+-----------------+-----------------+-----------------+
| 1    | Fetch Q/1/Job/1 | Fetch Q/2/Job/1 | Waiting         |
+------+-----------------+-----------------+-----------------+
| 2    | Running         | Running         | Waiting         |
+------+-----------------+-----------------+-----------------+
| 3    | Running         | Done Q/2/Job/1  | Fetch Q/2/Job/2 |
+------+-----------------+-----------------+-----------------+
| 4    | Done Q/1/Job/1  | Fetch Q/1/Job/2 | Running         |
+------+-----------------+-----------------+-----------------+
| 5    | Waiting         | Running         | Running         |
+------+-----------------+-----------------+-----------------+

也许这不是最好的代表,但它表明,即使在Queue 1 and Queue 2,工作机会多了,但是Worker 3 在上一个作业完成之前不会开始获取下一个作业。

这就是我努力寻找好的解决办法。

我尝试了很多其他解决方案,例如rabbitMQ、activeMQ、apollo...这些允许我创建数千个队列,但是当我尝试时,所有这些都将使用worker 3来运行队列中的下一个作业。并且并发数是每个工人

是否有任何解决方案可以在任何 MQ 平台(例如 ActiveMQ、RabbitMQ、ZeroMQ 等)中实现这一点?

谢谢 :)


您可以使用 Redis 列表来实现此目的,并带有一个附加的“调度”队列,所有工作人员都可以使用该队列BRPOP继续他们的工作。调度队列中的每个作业都标有原始队列 ID,当工作人员完成作业时,它会转到该原始队列并执行RPOPLPUSH到调度队列中,以使下一个作业可供任何其他工作人员使用。因此,调度队列最多有队列数元素。

您必须处理的一件事是当源队列为空时调度队列的初始填充。这可能只是发布者针对最初设置的每个队列的“空”标志进行的检查,并且当原始队列中没有剩余内容可供调度时也由工作人员设置。如果设置了这个标志,发布者就可以LPUSH第一个作业直接放入调度队列。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在任何 MQ 平台上实现这个单一并发分布式队列? 的相关文章

  • 删除 PriorityQueue 的顶部?

    假设我使用 Java util 中的 PriorityQueue 类 我想从 PriorityQueue pq 中删除最大的数字 我们假设它位于队列的头部 下面的工作会起作用吗 1 int head pq peek pq dequeue h
  • 单台机器最快的 Perl IPC/消息队列是多少?

    我正在开发一个 主要 Perl 项目 并希望使用消息队列来相互隔离进程 我有这样的工作流程 输入 gt 接收器 gt 处理器 gt 输出 我需要每秒处理数百笔交易 所以速度是我最大的动力 对于这种类型的设置来说 最快的消息队列系统是什么 我
  • 检查 Redis 列表中是否已存在某个值

    我想知道是否有办法检查 redis 列表中是否已存在某个键 我无法使用集合 因为我不想强制唯一性 但我确实希望能够检查字符串是否确实存在 Thanks 您的选择如下 Using LREM如果发现则更换它 维护一个单独的SET与您的LIST
  • 使用通配符查找键

    我已经使用分号保存了数据 redis gt keys party 1 party congress president 2 party bjp president 3 party bjp 4 party sena 是否有任何命令可以列出所有
  • redis能完全取代mysql吗?

    简单的问题 我是否可以使用 redis 而不是 mysql 来处理各种 Web 应用程序 社交网络 地理位置服务等 IT 领域没有什么是不可能的 但有些事情可能会变得极其复杂 将键值存储用于全文搜索之类的事情可能会非常痛苦 另外 据我所知
  • Redis 客户端忽略其上设置的配置选项并尝试连接到默认 IP 127.0.01

    在AWS中 我使用ElastiCache Redis服务器并使用节点作为后端和 promise redis 包 这就是我尝试连接到我的 redis 服务器端点的方法 client redis createClient host my red
  • .NET 中 UniqueQueue 和 UniqueReplacementQueue 集合最有效的实现

    考虑到入队和出队操作的速度同样重要 NET 中 UniqueQueue 和 UniqueReplacementQueue 集合最有效 就速度而言 的实现是什么 UniqueQueue是一个不可能出现重复的队列 因此 如果我将一个元素推送到队
  • 如何在Redis中使用HSCAN命令?

    我想在我的作业中使用 Redis 的 HSCAN 命令 但我不知道它是如何工作的 Redis 的官方页面 http redis io commands hscan http redis io commands hscan 这个命令给了我空白
  • 具有延迟的简单可扩展工作/消息队列

    我需要设置一个作业 消息队列 并可以选择为任务设置延迟 以便空闲工作人员不会立即拾取它 而是在一定时间后 可能因任务而异 我研究了几个 Linux 队列解决方案 rabbitmq gearman memcacheq 但它们似乎都没有提供开箱
  • 在 C 中使用单个消息队列是否可以实现双向通信

    我希望服务器向客户端发送一些消息 并让客户端确认它 我被分配了这个任务 我可以在 C linux 中使用单个消息队列来完成它还是我需要创建两个 谢谢 是的 可以使用 sysV 消息队列来做到这一点 从您之前的问题来看 您正在使用该队列 您可
  • 执行 SET {Key} 超时,inst: 0,mgr: Inactive,queue: 2, qu=1, qs=1, qc=0, wr=1/1, in=0/0

    我正在尝试使用 StackExchange Redis 客户端将 90 KB pdf 文件保存到 Azure Redis 缓存中 我已将该文件转换为字节数组并尝试使用 stringSet 方法保存它并收到错误 Code byte bytes
  • 如果没有过期的内容,Redis maxmemory-policy volatile-lru 是否会被驱逐?

    我有一个 redis 服务器 设置了maxmemory policy set to volatile lru 文档表明 当达到内存限制时 这将从设置过期的条目集中逐出 在这种情况下 redis 是否只驱逐过期的项目 如果内存中的所有内容都设
  • Redis如何存储关联数组?设置、散列还是列表?

    我对 Redis 的所有可用存储选项有点困惑 我想做一些简单的事情 并且不想过度设计它 我正在与phpredis and Redis v2 8 6 我有一个需要存储的简单关联数组 我还需要能够通过其键检索项目并循环遍历所有项目 a arra
  • 是否有一个好的开源 MongoDB 队列 C# 驱动程序实现

    并不是说编写一个程序不够容易 或有趣 可以说 不重新发明轮子是有道理的 我已经浏览了各种尝试 但我似乎还没有遇到支持这些标准的实现 具有MongoDB持久化的简单队列OSS系统 基于 C 驱动程序 官方 如此完整的 POCO 序列化 可尾游
  • 如何在Redis中存储聚合目录树搜索结果

    我有一个很大的产品目录树 目前包含约 36000 个类别和约 100 万个产品 即叶子 它的结构如下 最大深度为 5 Cat1 Cat11 Cat111 Cat1111 Product1 Cat1112 Product1 Cat1113 P
  • 使用 MongoDB 作为我们的主数据库,我应该使用单独的图数据库来实现实体之间的关系吗?

    我们目前正在为一家专业公司内部实施类似 CRM 的解决方案 由于存储信息的性质以及信息的不同值和键 我们决定使用文档存储数据库 因为它完全适合目的 在本例中我们选择 MongoDB 作为此 CRM 解决方案的一部分 我们希望存储实体之间的关
  • 有没有好的方法支持 Redis 排序集中的 pop 成员?

    有没有好的方法可以像 List 的 api LPOP 一样支持 Redis 排序集中的 pop 成员 我发现从 Redis 排序集中弹出消息是使用 ZRANGE ZREM 但是它不是线程安全性 并且当多个线程从不同主机同时访问它们时需要分布
  • 有没有办法在jedis中传递redis命令,而不使用函数?

    我们正在尝试构建一个控制台来处理 Redis 查询 但是 在后端我们需要使用Jedis 因此 作为输入给出的命令需要使用 Jedis 进行处理 例如 在redis cli中 我们使用 keys 同样 我们在 Jedis 中使用 jedis
  • 跨水平服务器计算 socket.io 用户数

    我有多个使用 redisstore 水平扩展的 socket io 服务器 我已经有效地设置了房间 并且能够成功地跨服务器广播到房间等 现在我正在尝试构建一个状态页面 但我无法弄清楚如何简单地计算跨服务器连接的用户数量所有服务器 io so
  • 检查redis是否正在运行->node js

    我只是想知道 在 NodeJS 进程开始时 Redis 是否启动 因此用户会话是否会被存储 这是我目前所拥有的 var session require express session var RedisStore require conne

随机推荐

  • Android ListView 复选框选择

    我在这里有一个由两部分组成的问题 1 如何填充我的ListView以便显示字符串 但是当选择项目时 不可见的id值 来自手机联系人的联系人id 是实际使用的值 2 我有一个使用 multipleChoice 模式进行项目选择的 ListVi
  • Node.js 中的 Python 多处理 - 在子进程上打印不起作用

    我有一个运行客户端界面的 Node js 应用程序 该界面公开触发机器学习任务的操作 由于在实现机器学习相关内容时 Python 是更好的选择 因此我实现了一个按需运行机器学习任务的 Python 应用程序 现在 我需要集成这两个应用程序
  • MongoDB list集合过滤器

    我正在使用 Node js 我正在尝试过滤必须排除集合 出口 的集合并检索所有其他集合 但我似乎无法弄清楚语法 我试过了 db listCollections filter outlets toArray err docs 有什么建议么 您
  • 如何将位图转换为文件而不压缩

    我需要从服务器下载图像并以其原始质量保存 但显然保存后图像被压缩并且质量下降 我使用 android async http 库来处理 http 请求 我的请求和保存文件的代码 AsyncHttpClient client new Async
  • 从 WPF 窗口中删除图标

    我可以使用 WinApi 从 WPF 窗口中删除窗口图标 但是当我仅运行 WPF 项目的可执行文件时 我会在应用程序窗口中再次获得该图标 如何删除该图标 From WPF教程 http www wpftutorial net RemoveI
  • 构建表达式树

    我正在努力思考如何为更多 lambda 构建表达式树 如下所示 更不用说可能有多个语句的东西了 例如 Func
  • Python语法错误:无法分配给模块中的运算符,但可以在解释器中工作

    我有一根绳子a我想根据它的长度将它分成两半 所以我有 a front len a 2 len a 2 这在解释器中工作正常 但是当我从命令行运行模块时 python 给了我一个SyntaxError can t assign to oper
  • 如何使 Cucumber 功能作为本地单元测试在 Android 项目中运行?

    我有一个用 Java 编写的 Android 项目 正在 Android Studio 中进行 我想使用 Cucumber 对一些内部组件进行集成测试 注意 我知道这不是 BDD 方式 但对我来说很有用 我希望测试运行为本地单元测试 htt
  • WSO2 Identity Server 5.0.0 无法从辅助用户存储中为用户返回 SAMLResponse 中的用户声明

    我在使用 SAML SSO 身份验证时遇到此问题 我已经成功设置了 WSO2IS 5 0 0 身份服务器 我还成功设置了 至少我希望如此 辅助用户存储 我使用 JDBCUserStoreManager 实现 我已将此商店设置为 DOMAIN
  • 微服务中的事务

    我读过一些关于微服务架构的文章 但没有人涉及事务的主题 他们都说这很难做到 也许有人可以描述如何处理这个问题 但不是从领域方面 而是从技术方面 假设我们有一个业务案例 我们需要调用两个不同的服务 并且它们都对数据库进行一些更改 但是如果第二
  • 如何从存储在 char* 指针中的 name 调用 c 函数?

    我想通过函数的名称动态调用函数 例如 假设有以下函数和字符串 void do fork printf Fork called n char pFunc do fork 现在我需要打电话do fork 就在 pFunc 那么这可能吗 欢迎 C
  • SQL Server 2005 中的分层查询

    早在我在 Oracle 商店工作时 我就认为 CONNECT BY 是理所当然的 现在我一直在使用 SQL Server 2005 并且有一些令人讨厌的对象层次结构 具体来说 我们有一个自引用表 其中所有子记录都有一个包含其父记录 ID 的
  • 如何为复杂的印度尼西亚电话号码格式构建正则表达式?

    最近 我正在使用regexpal http regexpal com构建这个自定义正则表达式 我正在处理印度尼西亚电话号码的几个测试用例 这是一个简单的例子08xx 3456 7890 or 08xx34567890 但如果我得到以下格式
  • 像 String#replace 一样替换 Ruby 中引用的 Integer 值

    我有以下代码 def mymethod a a replace a end mystring b mymethod mystring p mystring gt a 但我想用 Integer 执行相同的操作 那可能吗 简短的回答 不 长答案
  • 尝试在另一个线程wxpython中创建一个对话框

    我正在另一个线程中运行一个函数 该函数应该填写一个对话框 然后显示它 但只要我尝试以任何方式更改对话框 它就会出现段错误 我读到这是 WxPython 的一个常见问题 并且开发人员无意直接更改另一个线程中的对话框 我该如何解决这个问题 我可
  • 如何从类型中省略属性?

    我对 Typescript 中的 Omit 类型有疑问 所以我知道 Omit 类型与Pick并且是这样构建的 type Omit
  • 与 jUnit 相比 TestNG 的缺点? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我熟悉 jUnit 并且听说 TestNG 可能是解决 jUnit 的一些烦恼的方法 例如它坚持为每个测试创建一个单独的测试类实例 从而迫使我对
  • ASP.NET MVC 使用自定义模型绑定程序时从客户端检测到潜在危险的 Request.Form 值

    在这里得到错误 ValueProviderResult value bindingContext ValueProvider GetValue ConfirmationMessage 如何仅允许选择值 IE ValidateInput fa
  • 将基于材质的对话框主题与 AppCompat 结合使用

    我的清单中有一项活动 我曾经使用对话框主题进行样式设置 我找不到如何替换它AppCompat图书馆
  • 如何在任何 MQ 平台上实现这个单一并发分布式队列?

    我目前正在努力寻找实现特定类型队列的解决方案 这需要以下特征 所有队列必须遵守作业添加的顺序 整个队列的并发度为1 这意味着每个队列一次只会执行一个作业queue 不是工人 像这样排队的人会超过几千人 它需要分布式并且能够扩展 例如 如果我