C# 实现rabbitmq 延迟队列功能(不堵塞)

2023-11-19

最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉。当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训 数据库的方式就会变得特别耗资源。当面对千万级、上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了。除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式。但如果系统的 架构中本身就有RabbitMQ的话,那么选择RabbitMQ来实现类似的功能也是一种选择。 我们项目中用到了rabbitmq,可以做一个延迟队列完美的解决这个问题。

 rabbitmq本身不具有延时消息队列的功能,但是可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现。其原理给消息设置过期时间,在消息队列上为过期消息指定转发器,这样消息过期后会转发到与指定转发器匹配的队列上,变向实现延时队列。利用rabbitmq的这种特性,应该有了一个大概的思路。、

网上搜了一下 rabbitmq-delayed-message-exchange 这个插件也可以实现延迟队列的功能。今天介绍的是如何用C#来实现。

首先了解一下TTL和DLX

消息的TTL(Time To Live)
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

Dead Letter Exchanges
Exchage的概念在这里就不在赘述。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

  1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

  2. 上面的消息的TTL到了,消息过期了。

  3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

首先我建了两个控制台项目一个是生产者,一个是消费者。

生产者代码如下

 var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "test", Password = "test" };            using (var connection = factory.CreateConnection())
            {                while (Console.ReadLine() != null)
                {                    using (var channel = connection.CreateModel())
                    {

                        Dictionary<string, object> dic = new Dictionary<string, object>();
                        dic.Add("x-expires", 30000);
                        dic.Add("x-message-ttl", 12000);//队列上消息过期时间,应小于队列过期时间  
                        dic.Add("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由  
                        dic.Add("x-dead-letter-routing-key", "routing-delay");//过期消息转向路由相匹配routingkey  
                        //创建一个名叫"zzhello"的消息队列
                        channel.QueueDeclare(queue: "zzhello",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: dic);                        var message = "Hello World!";                        var body = Encoding.UTF8.GetBytes(message);                        //向该消息队列发送消息message
                        channel.BasicPublish(exchange: "",
                            routingKey: "zzhello",
                            basicProperties: null,
                            body: body);
                        Console.WriteLine(" [x] Sent {0}", message);
                    }
                }
            }

            Console.ReadKey();

消费者代码如下:

var factory = new ConnectionFactory() { HostName = "127.0.01", UserName = "test", Password = "test" };            using (var connection = factory.CreateConnection())
            {                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "exchange-direct", type: "direct");                    string name = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queue: name, exchange: "exchange-direct", routingKey: "routing-delay");                    //回调,当consumer收到消息后会执行该函数
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {                        var body = ea.Body;                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine(ea.RoutingKey);
                        Console.WriteLine(" [x] Received {0}", message);
                    };                    //Console.WriteLine("name:" + name);                    //消费队列"hello"中的消息                    channel.BasicConsume(queue: name,
                                         autoAck: true,
                                         consumer: consumer);

                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }

            Console.ReadKey();

效果 :
在等待了12秒后消费者等到了消息。
这样我们就实现了延迟队列的功能了。

参考:https://blog.51cto.com/kiujyhgt/1917303

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

C# 实现rabbitmq 延迟队列功能(不堵塞) 的相关文章

  • C# 锁(mylocker) 不起作用

    我有很多 Web 服务调用 异步 在回调中 我会将结果绘制到 Excel 中 我想同步绘图方法 所以我使用以下内容 但是 从我在 Visual Studio 中追踪到 每次 lock locker 都会成功 并且有许多线程运行clearco
  • 了解 VerQueryValue

    在 MSDN 上 我注意到 VerQueryValue 函数的以下内容 lplp缓冲区 输出 低电压空洞当此方法返回时 包含指向 pBlock 指向的缓冲区中所请求版本信息的指针的地址 当关联的 pBlock 内存被释放时 lplpBuff
  • C 语言中的套接字如何工作?

    我对 C 中的套接字编程有点困惑 You create a socket bind it to an interface and an IP address and get it to listen I found a couple of
  • C++11 中具有 C 链接的复杂类型

    我需要将 C 库的标头包含到我的 C 11 代码中 现在 标头提供了涉及大量的例程和数据结构double complex到处都是 例如 include
  • 如何在 C++ 中对四元结构进行有效排序?

    我有一个包含 x y z 和 w 成员的结构 如何高效排序 在 C 中首先按 x 然后按 y 按 z 最后按 w 如果你想实现字典排序 那么最简单的方法是使用std tie实现小于或大于比较运算符或函子 然后使用std sort http
  • 确保 unsigned int/long 始终在 C# 中的检查上下文中执行

    有没有人觉得奇怪 uint 和 ulong 的默认上下文是未检查的 而不是检查的 因为它们旨在表示永远不能为负的值 因此 如果某些代码试图违反该约束 在我看来 自然且首选的行为是抛出异常 而不是返回最大值 这很容易使重要数据处于无效状态并且
  • 如何在方法模板中使用模板类型的引用传递参数?

    我目前正在努力编译以下代码 首先是包含带有方法模板的类的头文件 ConfigurationContext h class ConfigurationContext public template
  • 如何获取 PropertyGrid 的单元格值 (c#)?

    如何在 C 中获取属性网格项和项的值 例如 Name Ali LastName Ahmadi Name 和 LastName 是 propertygrid 的 2 个属性 PropertyGrid只是对象的组件模型表示的视图 我会说 查看组
  • 用于轻松动态反射的 C# 库

    是否有任何库 例如开源项目等 可以更轻松地使用复杂的反射 例如动态创建对象或类 检查实例等 Thanks 有一个LinFu http www codeproject com KB cs LinFuPart1 aspx可用的库除了反射之外还可
  • C++在子类中调用虚方法

    我有以下课程 class A protected A inner public virtual void doSomething 0 class B public A void doSomething if inner NULL inner
  • gcc 删除内联汇编代码

    看起来 gcc 4 6 2 删除了它认为函数中未使用的代码 test c int main void goto exit handler asm volatile jmp 0x0 exit return 0 拆解main 0x0804840
  • 如何使用 Linq to Sql 修剪值?

    在数据库中 我有一个名为 联系人 的表 名字和其他此类字符串字段设计为使用 Char 数据类型 不是我的数据库设计 我的对象 Contact 映射到属性中的字符串类型 如果我想做一个简单的测试 通过 id 检索 Contact 对象 我会这
  • 如何使用 HttpClient 验证 Pardot API

    我花了大约一天的时间尝试对 Pardot API 进行身份验证 它不喜欢我尝试发布消息正文的方式 所以我想发布对我有用的解决方案 如果您有任何建议或替代方案 我想听听 ServicePointManager SecurityProtocol
  • 获取RFC返回的嵌套结构的值?

    我是 C 新手 我有 rfc 它以嵌套结构的形式从 SAP 系统返回数据 但是当我使用以下方式获取该数据时 IrfcTable table rfc getTable exporting parameter et customer 它仅返回第
  • 随机排列

    我无法找到一种随机洗牌元素的好方法std vector经过一些操作后 恢复原来的顺序 我知道这应该是一个相当简单的算法 但我想我太累了 由于我被迫使用自定义随机数生成器类 我想我不能使用std random shuffle 无论如何这没有帮
  • 当一种语言是另一种语言的平行超集时,这意味着什么?

    我正在阅读关于实时并发 C 的期刊文章 http link springer com article 10 1007 2FBF00365999 并且它在摘要中提到 因此你们中的任何人都可以通过该链接查看上下文 Concurrent C 是
  • 如何查明我的字符串是否包含“micro”Unicode 字符?

    我有一个包含实验室数据的 Excel 电子表格 如下所示 g L ppb 我想测试希腊字母 是否存在 如果发现我需要做一些特别的事情 通常 我会写这样的东西 if cell StartsWith matchSequence lt unive
  • 通过开源 PCL 使用 API 查看 3D 点云

    我使用 ToF 飞行时间 相机来获取 XYZ 格式的深度数据 为了实现 3D 点云的可视化目的 我想使用开源 PCL 提供的 API 网址为http pointclouds org documentation tutorials pcl v
  • 有没有办法将复选框列表绑定到 asp.net mvc 中的模型

    我在这里寻找一种快速简便的方法来在模型中发生回发时绑定复选框列表项的列表 显然现在常见的做法似乎是这样的form GetValues checkboxList 0 Contains true 这看起来很痛苦而且不太安全 有没有一种方法可以绑
  • 错误:C# 尝试读取或写入受保护的内存

    我很难纠正这个错误 该应用程序在 4 台不同的机器上进行了测试 在其中 3 台上运行良好 但一台 Vista PC 在尝试通过 WebBrowser1 打开页面时出现此错误 解决这个问题的任何帮助对我都会非常有帮助 System Acces

随机推荐

  • Jetson TX2刷JetPack3.3(方法二)

    官网上的刷机教程都是需要一个路由器才能刷机 这篇博客介绍如何不要路由器 只需要一个宿主机就可以刷机的教程 1 刷机准备工作 JetPack是一个x86二进制文件 不能在基于ARM的机器上运行 因此需要借助一个宿主机来刷JetPack 因此
  • 数据结构模板

    链接
  • Github-Copilot初体验-Pycharm插件的安装与测试

    引言 80 代码秒生成 AI神器Copilot大升级 最近copilot又在众多独角兽公司的合力下 取得了重大升级 GitHub Copilot发布还不到两年 就已经为100多万的开发者 编写了46 的代码 并提高了55 的编码速度 据官博
  • Quartus导出网表文件:.qxp和.vqm

    当项目过程中 不想给甲方源码时 该如何 我们可以用网表文件qxp或者vqm对资源进行保护 下面讲解这两个文件的具体生成步骤 一 基本概念 QuartusII的qxp文件为QuartusII Exported Partition 用于创建综合
  • c#float取小数点后两位_C# 保留小数点后两位(方法总结)

    最简单使用 float i 1 6667f string show i ToString 0 00 结果1 67 四舍五入 其他类似方法 string show i ToString F F2 f 不区分大小写 string show St
  • SpringBoot集成Swagger4

    Swagger是一种工具 可以帮助开发人员设计 构建 文档化和测试 RESTful Web 服务 Swagger提供了一种交互式文档格式 可以使用它来了解 API 的用法 参数 返回值等等 Spring Boot提供了集成Swagger的简
  • Servlet接口实现类

    JavaWeb 03 Servlet 02 Servlet接口实现类 1 什么是Servlet接口 有什么用 Servlet接口来自于Servlet规范中的一个接口 这个接口存在于Http服务器所提供的jar包中 Servlet接口的具体位
  • 【NVMe2.0b 14-6】Format NVM、Keep Alive、Lockdown command

    目录 5 14Format NVM command 5 14 1Command Completion 5 18Keep Alive command 5 18 1Command Completion 5 19Lockdown command
  • 啪啪动物城 源码

    游戏地址 http www 4399 com flash 192131 htm 啪啪动物城源码 https files cnblogs com files gamedaybyday E5 95 AA E5 95 AA E5 8A A8 E7
  • Metasploit 提权篇

    声明 文中所涉及的技术 思路和工具仅供以安全为目的的学习交流使用 任何人不得将其用于非法用途以及盈利等目的 否则后果自行承担 文章目录 内核漏洞提权 enum patches模块 Windows Exploit suggester Wind
  • web服务器响应的端口号,web服务器端口号

    web服务器端口号 内容精选 换一换 Nginx Web Server场景是以Nginx作为Web Server的场景 Nginx作为Web Server 可以被配置部署为静态资源Web Server 在该配置下可以高效的进行静态资源的请求
  • python学习笔记——条件判断

    上篇 https blog csdn net qq 42489308 article details 89388218 条件判断 条件判断是通过一条或多条判断语句的执行结果 True或者False 来决定执行的代码块 在Python语法中
  • uboot分析之Makefile

    Uboot分析之Makefile 1 uboot根目录下执行 make smdk2410 config smdk2410 config unconfig MKCONFIG config arm arm920t smdk2410 samsun
  • 数据集下载OTB,VOT,UAV,鸢尾花

    OTB数据集下载百度网盘链接 链接 https pan baidu com s 1snsJF 7Sw EbKtzdvLO1nw 提取码 ls23 VOT数据集下载百度网盘链接 链接 https pan baidu com s 1UiTG1z
  • AI顶级会议列表 & ACL相关

    The First Class tier 1的conferences 其实基本上就是AI里面大家比较公认的top conference 下面同分的按字母序排列 IJCAI 1 AI最好的综合性会议 1969年开始 每两年开一次 奇数年开 因
  • 基于互补搜索技术和新颖架构设计,结合MobileNetV3主干网络,打造不同的目标检测器

    基于互补搜索技术和新颖架构设计 结合MobileNetV3主干网络 打造不同的目标检测器 目标检测是计算机视觉中的一个重要任务 随着深度学习技术的发展和神经网络的不断优化 YOLOv5已成为目前最流行的目标检测框架之一 然而 为了进一步提高
  • opengl shader 使用札记

    一 shader的使用步骤 创建shader 1 创建一个shader对象 GLuint glCreateShader GLenum shaderType 2 将shader源代码传入前面创建的shader对象 void glShaderS
  • 老嫂子的保姆级科普 选择视频剪辑软件就从阅读本文开始

    选错一款视频剪辑软件 是种什么样的体验 就好像新婚当晚 发现老婆是人妖一样 浪费了感情 又错付了青春 新手在学习视频剪辑的初期 需要花费大量精力去熟悉剪辑软件的基础功能 而软件挑选本身没有对错可言 适合自己的才是最好的 因此 本文仅从事实与
  • 初识Java(一)

    Java开发语言 前言 一 Java是什么 二 应用领域 特点及核心机制 1 应用领域 2 特性及特点 特性 特点 3 两种核心机制 三 JDK JRE JVM的关系 四 Java环境变量配置 五 编写我的第一个程序 总结 前言 计算机语言
  • C# 实现rabbitmq 延迟队列功能(不堵塞)

    最近在研究rabbitmq 项目中有这样一个场景 在用户要支付订单的时候 如果超过30分钟未支付 会把订单关掉 当然我们可以做一个定时任务 每个一段时间来扫描未支付的订单 如果该订单超过支付时间就关闭 但是在数据量小的时候并没有什么大的问题