用redis实现支持优先级的消息队列

2023-11-01

用redis实现支持优先级的消息队列

为什么需要消息队列

系统中引入消息队列机制是对系统一个非常大的改善。例如一个web系统中,用户做了某项操作后需要发送邮件通知到用户邮箱中。你可以使用同步方式让用户等待邮件发送完成后反馈给用户,但是这样可能会因为网络的不确定性造成用户长时间的等待从而影响用户体验。   有些场景下是不可能使用同步方式等待完成的,那些需要后台花费大量时间的操作。例如极端例子,一个在线编译系统任务,后台编译完成需要30分钟。这种场景的设计不可能同步等待后在回馈,必须是先反馈用户随后异步处理完成,再等待处理完成后根据情况再此反馈用户与否。   另外适用消息队列的情况是那些系统处理能力有限的情况下,先使用队列机制把任务暂时存放起来,系统再一个个轮流处理掉排队的任务。这样在系统吞吐量不足的情况下也能稳定的处理掉高并发的任务。   消息队列可以用来做排队机制,只要系统需要用到排队机制的地方就可以使用消息队列来作。  

rabbitmq的优先级做法 

目前成熟的消息队列产品有很多,著名的例如rabbitmq。它使用起来相对还是比较简单的,功能也相对比较丰富,一般场合下是完全够用的。但是有个很烦人的就是它不支持优先级。 例如一个发邮件的任务,某些特权用户希望它的邮件能够更加及时的发送出去,至少比普通用户要优先对待。默认情况下rabbitmq是无法处理掉的,扔给rabbitmq的任务都是FIFO先进先出。但是我们可以使用一些变通的技巧来支持这些优先级。创建多个队列,并为rabbitmq的消费者设置相应的路由规则。   例如默认情况下有这样一个队列,我们拿list来模拟 [task1, task2, task3],消费者轮流按照FIFO的原则一个个拿出task来处理掉。如果有高优先级的任务进来,它也只能跟在最后被处理[task1, task2, task3, higitask1]. 但是如果使用两个队列,一个高优先级队列,一个普通优先级队列。 普通优先级[task1, task2, task3], 高优先级[hightask1 ] 然后我们设置消费者的路由让消费者随机从任意队列中取数据即可。   并且我们可以定义一个专门处理高优先级队列的消费者,它空闲的时候也不处理低优先级队列的数据。这类似银行的VIP柜台,普通客户在银行取号排队,一个VIP来了他虽然没有从取号机里拿出一个排在普通会员前面的票,但是他还是可以更快地直接走VIP通道。   使用rabbitmq来做支持优先级的消息队列的话,就像是上面所述同银行VIP会员一样,走不同的通道。但是这种方式只是相对的优先级,做不到绝对的优先级控制,例如我希望某一个优先级高的任务在绝对意义上要比其他普通任务优先处理掉,这样上面的方案是行不通的。因为rabbitmq的消费者只知道再自己空闲的情况下从自己关心的队列中“随机”取某一个队列里面的第一个数据来处理,它没法控制优先取找哪一个队列。或者更加细粒度的优先级控制。或者你系统里面设置的优先级有10多种。这样使用rabbitmq也是很难实现的。   但是如果使用redis来做队列的话上面的需求都可以实现。  

使用redis怎么做消息队列

首先redis它的设计是用来做缓存的,但是由于它自身的某种特性使得他可以用来做消息队列。它有几个阻塞式的API可以使用,正是这些阻塞式的API让他有做消息队列的能力。   试想一下在”数据库解决所有问题“的思路下,不使用消息队列也是可以完成你的需求的。我们把任务全部存放在数据库然后通过不断的轮询方式来取任务处理。这种做法虽然可以完成你的任务但是做法很粗劣。但是如果你的数据库接口提供一个阻塞的方法那么就可以避免轮询操作了,你的数据库也可以用来做消息队列,只不过目前的数据库还没有这样的接口。 另外做消息队列的其他特性例如FIFO也很容易实现,只需要一个List对象从头取数据,从尾部塞数据即可实现。 redis能做消息队列得益于他list对象blpop brpop接口以及Pub/Sub(发布/订阅)的某些接口。他们都是阻塞版的,所以可以用来做消息队列。      

redis消息队列优先级的实现

一些基础redis基础知识的说明
1. redis> blpop tasklist 0
2. "im task 01"
这个例子使用blpop命令会阻塞方式地从tasklist列表中取头一个数据,最后一个参数就是等待超时的时间。如果设置为0则表示无限等待。另外redis存放的数据都只能是string类型,所以在任务传递的时候只能是传递字符串。我们只需要简单的将负责数据序列化成json格式的字符串,然后消费者那边再转换一下即可。   这里我们的示例语言使用python,链接redis的库使用redis-py. 如果你有些编程基础把它切换成自己喜欢的语言应该是没问题的。  

1.简单的FIFO队列

01. import redis, time<br>
02. def handle(task):
03. print task
04. time.sleep(4)<br>
05. def main():
06. pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
07. r = redis.Redis(connection_pool=pool)
08. while 1:
09. result = r.brpop('tasklist'0)
10. handle(result[1])<br>
11. if __name__ == "__main__":
12. main()
上例子即使一个最简单的消费者,我们通过一个无限循环不断地从redis的队列中取数据。如果队列中没有数据则没有超时的阻塞在那里,有数据则取出往下执行。 一般情况取出来是个复杂的字符串,我们可能需要将其格式化后作为再传给处理函数,但是为了简单我们的例子就是一个普通字符串。另外例子中的处理函数不做任何处理,仅仅sleep 用来模拟耗时的操作。   我们另开一个redis的客户端来模拟生产者,自带的客户端就可以。多往tasklist 队列里面塞上一些数据。
1. redis> lpush tasklist 'im task 01'
2. redis> lpush tasklist 'im task 02'
3. redis> lpush tasklist 'im task 03'
4. redis> lpush tasklist 'im task 04'
5. redis> lpush tasklist 'im task 05' 
随后在消费者端便会看到这些模拟出来的任务被挨个消费掉。  

2.简单优先级的队列

  假设一种简单的需求,只需要高优先级的比低优先级的任务率先处理掉。其他任务之间的顺序一概不管,这种我们只需要在在遇到高优先级任务的时候将它塞到队列的前头,而不是push到最后面即可。 因为我们的队列是使用的redis的 list,所以很容易实现。遇到高优先级的使用rpush 遇到低优先级的使用lpush
1. redis> lpush tasklist 'im task 01'
2. redis> lpush tasklist 'im task 02'
3. redis> rpush tasklist 'im high task 01'
4. redis> rpush tasklist 'im high task 01'
5. redis> lpush tasklist 'im task 03'
6. redis> rpush tasklist 'im high task 03'
随后会看到,高优先级的总是比低优先级的率先执行。但是这个方案的缺点是高优先级的任务之间的执行顺序是先进后出的。  

3.较为完善的队列

例子2中只是简单的将高优先级的任务塞到队列最前面,低优先级的塞到最后面。这样保证不了高优先级任务之间的顺序。 假设当所有的任务都是高优先级的话,那么他们的执行顺序将是相反的。这样明显违背了队列的FIFO原则。 不过只要稍加改进就可以完善我们的队列。   跟使用rabbitmq一样,我们设置两个队列,一个高优先级一个低优先级的队列。高优先级任务放到高队列中,低的放在低优先队列中。redis和rabbitmq不同的是它可以要求队列消费者从哪个队列里面先读。
1. def main():
2. pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
3. r = redis.Redis(connection_pool=pool)
4. while 1:
5. result = r.brpop(['high_task_queue''low_task_queue'], 0)
6. handle(result[1])
上面的代码,会阻塞地从'high_task_queue', 'low_task_queue'这两个队列里面取数据,如果第一个没有再从第二个里面取。 所以只需要将队列消费者做这样的改进便可以达到目的。
1. redis> lpush low_task_queue low001
2. redis> lpush low_task_queue low002
3. redis> lpush low_task_queue low003
4. redis> lpush low_task_queue low004
5. redis> lpush high_task_queue low001
6. redis> lpush high_task_queue low002
7. redis> lpush high_task_queue low003
8. redis> lpush high_task_queue low004
通过上面的测试看到,高优先级的会被率先执行,并且高优先级之间也是保证了FIFO的原则。 这种方案我们可以支持不同阶段的优先级队列,例如高中低三个级别或者更多的级别都可以。  

4.优先级级别很多的情况

假设有个这样的需求,优先级不是简单的高中低或者0-10这些固定的级别。而是类似0-99999这么多级别。那么我们第三种方案将不太合适了。 虽然redis有sorted set这样的可以排序的数据类型,看是很可惜它没有阻塞版的接口。于是我们还是只能使用list类型通过其他方式来完成目的。   有个简单的做法我们可以只设置一个队列,并保证它是按照优先级排序号的。然后通过二分查找法查找一个任务合适的位置,并通过 lset 命令插入到相应的位置。  例如队列里面包含着写优先级的任务[1, 3, 6, 8, 9, 14],当有个优先级为7的任务过来,我们通过自己的二分算法一个个从队列里面取数据出来反和目标数据比对,计算出相应的位置然后插入到指定地点即可。   因为二分查找是比较快的,并且redis本身也都在内存中,理论上速度是可以保证的。但是如果说数据量确实很大的话我们也可以通过一些方式来调优。   回想我们第三种方案,把第三种方案结合起来就会很大程度上减少开销。例如数据量十万的队列,它们的优先级也是随机0-十万的区间。我们可以设置10个或者100个不同的队列,0-一万的优先级任务投放到1号队列,一万-二万的任务投放到2号队列。这样将一个队列按不同等级拆分后它单个队列的数据就减少许多,这样二分查找匹配的效率也会高一点。但是数据所占的资源基本是不变的,十万数据该占多少内存还是多少。只是系统里面多了一些队列而已
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

用redis实现支持优先级的消息队列 的相关文章

  • 如何使用 PHP 解释 HTML5 输入日期值

    我需要让用户选择一个日期 最好采用 dd mm yy 格式 我决定尝试新的 HTML5 输入日期类型 但是我不知道如何解释它在服务器端给出的值 我得到的值是 yyyy mm dd 我怎样才能做到这一点 如果用户使用不支持它的旧版浏览器怎么办
  • R 语言与 php 集成以获取 R 的结果

    我有以下 R 脚本 assign data path data path lt C Users Owner Desktop R work assign valus to the following three percent train p
  • 在产品页面上显示最近浏览过的产品

    magento 当前的默认功能是在类别页面的右侧显示最近查看的产品 现在我想在产品页面底部显示相同的内容 使用的 phtml 文件在位置命名为 frontend base default template reports product v
  • 将 Woocommerce 购物车项目自定义数据保存为订单项目元数据,将其显示在订单和电子邮件上

    关于 Woocommerce 我有要添加到购物车的自定义数据 在functions php 文件中 我有以下函数 Display cart item custom data in cart and checkout pages add fi
  • 如何禁用 Yii2 中的按钮

    我正在尝试禁用创建项目 Button当用户未登录时 该按钮将Hide or disable 这是我的条件 p p 它正在工作 但是 当用户登
  • mysql 数据库更新和 codeigniter 行受影响的功能[重复]

    这个问题在这里已经有答案了 我在更新查询后使用此功能此 codeigniter 功能 this gt db gt affected rows 如果我更新值 它会返回受影响的行数 但如果我更新与之前相同的值 它会返回 0 受影响的行 任何帮助
  • 通过 preg_match_all PHP 函数从 html 代码字符串中提取 img 标签

    我有一些 html 代码并提取了img src来自它的属性 html 字符串中有一些像这样的 img img src http www pecso it wp content uploads 2016 12 10 WRAS png 我尝试使
  • 回退到正则表达式中字符串的开头

    是否可以让正则表达式退回到字符串的开头并再次开始匹配 这就是我问的原因 给定下面的字符串 我想捕获子字符串black red blue and green按照该顺序 无论主题字符串中出现的顺序如何 并且仅当所有子字符串都存在于主题字符串中时
  • 如何实现嵌套注释?

    我想在我的网站上显示评论 如下所示 li Parent ul li child one li li child two ul li grandchild li li other grandchild li ul li ul li li An
  • 使用 nuxtjs 登录 laravel sainttum 后出现 401(未经身份验证)错误

    我已经安装了 laravel v 7 30 4 nuxtjs v 2 15 7 当我使用 nuxtjs auth 模块登录我的 laravel sainttum 后 当 nuxt 尝试获取用户时 laravel 响应 401 错误 未经身份
  • str_ireplace 与 str_replace 一样工作

    我需要用 url 字符串中的小 ascii 字符替换所有本地字符 包括大写字母 str echo str ireplace array array c c d s z str 结果 c c d s z 我期望 c c d s z c c d
  • 从 mysql 检索数据并通过电子邮件发送

    我有一个 php 页面 它显示 mysql 数据库中每个用户的课程表数据 如下所示 result mysql query sql echo table border 0 thead tr th Class Link th th Studen
  • 发送带有附件的 PHP HTML 邮件

    我遇到了一个问题 直到今天 我使用 PHP 发送 HTML 电子邮件 其中包含的标头 Content type text html 现在 我添加了添加附件的功能 为此 我必须将此行更改为 Content Type multipart mix
  • 清理 php 中的句子

    标题可能听起来很奇怪 但我有点尝试设置这个 preg replace 来处理文本区域的混乱写入者 它必须 如果有感叹号 则不应连续出现另一个感叹号 如果有 则逗号胜出 并且必须是 当昏迷前有一个 空格时 应将其减少到零 该句子不能以逗号开头
  • 最佳实践:在 PHP 中导入 mySQL 文件;分割查询

    我遇到了一种情况 我必须更新共享托管提供商上的网站 该网站有一个 CMS 使用 FTP 上传 CMS 文件非常简单 我还必须导入一个大的 相对于 PHP 脚本的范围 数据库文件 未压缩时大约 2 3 MB Mysql 已关闭 无法从外部访问
  • 随机字体颜色

    我需要用 2 或 3 种随机颜色为文本着色 我如何在 PHP 或 JavaScript 中执行此操作 color str pad sprintf x x x rand 0 255 rand 0 255 rand 0 255 6 rand 0
  • 从数据库结果生成多维数组的递归函数

    我正在编写一个函数 它接受页面 类别数组 来自平面数据库结果 并根据父 ID 生成嵌套页面 类别项目数组 我想递归地执行此操作 以便可以完成任何级别的嵌套 例如 我在一个查询中获取所有页面 这就是数据库表的样子 id parent id t
  • 在 PHP 扩展中,推荐从 std::string 返回值的方法

    我们有一个简单的 PHP 函数 其目的是调用 C 自由函数std string callLibrary std string 并返回其std string返回值 目前看起来是这样的 PHP FUNCTION call library cha
  • PHP 指针与引用

    在 PHP 中 使用指针有什么区别 例如 function foo var var 3 a 0 foo a 以及参考 function foo var var 3 a 0 foo a 它们都修改了原始变量的值 但是它们内部的表示方式不同吗
  • PHP - 将文件系统路径转换为 ​​URL

    我经常发现项目中的文件需要从文件系统和用户浏览器访问 一个例子是上传照片 我需要访问文件系统上的文件 以便可以使用 GD 来更改图像或移动它们 但我的用户还需要能够从类似以下的 URL 访问文件example com uploads myp

随机推荐

  • Oracle 数据库升级

    转载来源 Oracle 数据库升级 https mp weixin qq com s LIDIsmeZRRfZmOVtOkeznQ 一 环境准备 本次测试尽量按照生产环境升级进行模拟 故而使用2台主机进行测试 注意 源库为生产环境 linu
  • 【Java编程】JavaSE基础总结(六):多线程

    JavaSE基础总结 六 进程是程序执行的实体 每一个进程都是一个应用程序 比如我们运行QQ 浏览器 LOL 网易云音乐等软件 都有自己的内存空间 CPU 一个核心同时只能处理一件事情 当出现多个进程需要同时运行时 CPU一般通过 时间片轮
  • [精通Objective-C]块(block)

    精通Objective C 块 block 参考书籍 精通Objective C 美 Keith Lee 目录 精通Objective C块block 目录 块的语法 块的词汇范围 块的内存管理 块的使用 使用块为数组排序 使用块的并行编程
  • 让你真正明白cinder与swift、glance的区别

    http www aboutyun com thread 10060 1 1 html 问题导读 1 你认为cinder与swift区别是什么 2 cinder是否存在单点故障 3 cinder是如何发展而来的 在openstack中 我们
  • 计算机辅助绘图考试题,计算机辅助设计绘图考试题(A)(大学期末复习试题).doc...

    教师试做时间出题教师 取题时间审核教研室主任出题单位使用班级考试日期院 部 主任考试成绩期望值印刷份数规定完成时间交教务科印刷日期 学号 姓名 班级 密 封 线 专业 年级 班 学年 第 学期 计算机辅助设计绘图 A 课试卷 题号一二三四五
  • Swagger 的简介和使用

    文章目录 Swagger 的简介和使用 什么是Swagger 简介 Swagger页面 Swagger快速上手 pom xml文件中引入依赖 构建Swagger配置类 Swagger使用 常用注解说明 注解的使用 总结 Swagger 的简
  • Spark jar包加载顺序及冲突解决

    一 spark jar包加载顺序 1 SystemClasspath Spark安装时候提供的依赖包 通常是spark home目录下的jars文件夹 SystemClassPath 2 Spark submit jars 提交的依赖包 U
  • Java-Map集合

    基本使用 public class Demomap public static void main String args Map
  • 关于VMware workstation Player的虚拟网络编辑器没有的情况

    VMware workstation Player 是没有 虚拟网络编辑器的 如果要按照韦东山老师的方法去配置NAT网络 可以再下载VMware workstation pro 尽管不在试用期 依然会给你虚拟网络编辑器的应用安装
  • 光照 (5) 光照贴图

    物体在不同的部件上都有不同的材质属性 1 1 漫反射 允许我们对物体的漫反射分量 以及间接地对环境光分量 它们几乎总是一样的 和镜面光分量有着更精确的控制 漫反射贴图 Diffuse Map 使用一张覆盖物体的图像 让我们能够逐片段索引其独
  • 十大排序算法:快速排序算法

    一 快速排序算法思想或步骤 分解 数组A p r 被划分为两个子数组A p q 1 和A q 1 r 使得A q 为大小居中的数 左侧A p q 1 中的每个元素都小于等于它 而右边A q 1 r 每个元素都大于等于它 解决 通过递归调用快
  • linux shell oracle脚本_分享一个shell脚本--统计Oracle最消耗资源的SQL语句

    概述 This project meant to provide useful scripts for DB maintance and management to make work easier and interesting 今天主要
  • Unity Shader Graphs无法代码动态赋值的问题解决

    起因 给一个材质球更换图片 动态更换了很久 换不上去 解决 Reference unity给的是随机的名字只需要改成自己的名字就可以了 完美解决 不需要下划线 只是自己定义的名字 box EndTarget image material S
  • sqli-labs通关攻略教程六(less26~less28a)

    文章目录 less 26 方法1 方法2 补充知识 less 26a less 27 less 27a less 28 less 28a less 26 方法1 由题目可知 本题绕过了空格和注释 注释符用 1 1或者 00绕过 空白符绕过
  • android intent深入解析

    一 Intent的显示调用 1 intent setClass this OtherActivity class 2 intent setClassName this com zizhu activitys OtherActivity 3
  • Linux chown命令

    Linux Unix 是多人多工操作系统 所有的文件皆有拥有者 利用 chown 将指定文件的拥有者改为指定的用户或组 用户可以是用户名或者用户ID 组可以是组名或者组ID 文件是以空格分开的要改变权限的文件列表 支持通配符 一般来说 这个
  • java解决Exception in thread “main“ java.lang.OutOfMemoryError: GC overhead limit exceeded

    这个就是内存占用超过了限制 解决方案 加载文件的容量太大 这个只能切分文件 使用BufferedInputStream一行行读取 BufferedInputStream bufferedReader new BufferedInputStr
  • 输入三角形的3个边长,a,b,c求出三角形的面积。(C语言)

    代码 define CRT SECURE NO WARNINGS 1 include
  • requestBody注解转化json报错

    RequestBody ResponseBody 注解详解 转 解决方法 不要用modelMap 新建一个hashMap类即可 进来给app写接口比较多 遇到一个bug requestBody会自动往modelMap里加解决办法 清空map
  • 用redis实现支持优先级的消息队列

    用redis实现支持优先级的消息队列 为什么需要消息队列 系统中引入消息队列机制是对系统一个非常大的改善 例如一个web系统中 用户做了某项操作后需要发送邮件通知到用户邮箱中 你可以使用同步方式让用户等待邮件发送完成后反馈给用户 但是这样可