Golang RabbitMQ实现的延时队列

2023-11-14


前言

之前做秒杀商城项目的时候使用到了延时队列来解决订单超时问题,本博客就总结一下Golang是如何利用RabbitMQ实现的延时队列的。

一、延时队列与应用场景

延迟队列是一种特殊类型的消息队列,用于在一定时间后将消息投递给消费者。它可以用于处理需要延迟执行的任务或者具有定时特性的业务场景。使用延迟队列可以灵活地控制消息的发送和处理时间,适用于很多场景,如订单超时处理、提醒任务等

具体应用场景有如下:

  1. 订单取消:当订单生成时,将订单消息发送到延迟队列中,并设置延迟时间为十分钟。消费者在十分钟后接收到订单消息并进行关闭操作。

  2. 店铺商品提醒:在店铺创建时,将提醒消息发送到延迟队列中,并设置延迟时间为十天。消费者在十天后接收到消息并发送提醒通知。

  3. 用户登录提醒:用户注册成功后,将提醒消息发送到延迟队列中,并设置延迟时间为三天。消费者在三天后接收到消息并发送短信提醒。

  4. 退款通知:当用户发起退款时,将通知消息发送到延迟队列中,并设置延迟时间为三天。消费者在三天后接收到消息并通知相关运营人员。

  5. 会议提醒:在会议预定时,将提醒消息发送到延迟队列中,并设置延迟时间为预定时间前十分钟。消费者在指定时间点前十分钟接收到消息并发送会议参加通知。

通过使用延迟队列,可以在指定的时间点触发任务,避免了轮询的低效方式,并且能够满足大量数据和时效性的需求。这种方法提供了更高的性能和实时性,并有效减轻了系统的负载。
下图是订单超时处理的流程图。
在这里插入图片描述

二、RabbitMQ如何实现延时队列

虽然 rabbitmq 没有延时队列的功能,但是稍微变动一下也是可以实现的。
通过设置消息的 TTL 和 DLX 等参数,可以将消息转发到一个指定的队列中,以便在一定的时间后再进行处理。

实现延时队列的基本要素

1、存在一个倒计时机制:Time To Live(TTL)
2、当到达时间点的时候会触发一个发送消息的事件:Dead Letter Exchanges(DLX)

基于第一点,我利用的是消息存在过期时间这一特性, 消息一旦过期就会变成dead letter,可以让单独的消息过期,也可以设置整个队列消息的过期时间 而rabbitmq会有限取两个值的最小
**基于第二点,**是用到了rabbitmq的过期消息处理机制: . x-dead-letter-exchange 将过期的消息发送到指定的 exchange 中 . x-dead-letter-routing-key 将过期的消息发送到自定的 route当中

整体的实现原理如下

发送者将消息发送到延时队列上并设置过期时间,当过期时间到达时,消息会被自动转发到指定的交换机和队列中供接收者消费。
1、建立与 RabbitMQ 服务器的连接并创建通道。
2、发送者通过 ch.Publish 方法将消息发送到延时队列(“test_delay”)上,设置消息的过期时间。
3、延时队列中的消息在到达过期时间后会自动被发送到 “logs” 交换机,由交换机将消息广播给所有绑定的队列。
4、接收者通过监听 “test_logs” 队列接收并处理消息。当有消息到达时,会触发回调函数进行处理。
也就是说要实现延时队列,消费者必须试实现两个队列。
一个是延时队列(“test_delay”),另一个是接收延时消息的队列(“test_logs”)。

这两个队列的作用如下:
延时队列(“test_delay”):这个队列用于接收需要延时发送的消息。发送者通过将消息发送到延时队列,设置消息的过期时间。当消息过期时,RabbitMQ 会自动将消息转发到指定的交换机和队列中。
接收延时消息的队列(“test_logs”):这个队列用于接收延时消息。在示例中,这个队列是通过将 “test_logs” 队列绑定到 “logs” 交换机上来实现的。交换机会将消息广播给所有绑定的队列,因此当延时消息到达过期时间后,会被发送到这个队列中供消费者进行处理。
通过使用两个队列,消息可以被延时发送到指定的队列,并在过期后自动转发到接收队列,实现了延时发送和消费的功能。

三、Go语言实战

生产者

首先建立与 RabbitMQ 服务器的连接,并创建一个通道。然后,通过 ch.Publish 方法将消息发送到延时队列上。这里使用的是空字符串作为交换机(exchange),表示不选择任何交换机,只将消息发送到指定的队列(“test_delay”)。在消息的属性中,设置了消息的过期时间为 5 秒。


func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	body := "hello"
	// 将消息发送到延时队列上
	err = ch.Publish(
		"", 				// exchange 这里为空则不选择 exchange
		"test_delay",     	// routing key
		false,  			// mandatory
		false,  			// immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
			Expiration: "5000",	// 设置五秒的过期时间
		})
	failOnError(err, "Failed to publish a message")

	log.Printf(" [x] Sent %s", body)
}

消费者

同样建立与 RabbitMQ 服务器的连接,并创建一个通道。然后,声明了一个名为 “logs” 的交换机,类型为 “fanout”,并且可持久化,表示该交换机会将消息广播给所有绑定的队列。接着,声明了一个常规的队列 “test_logs”,并将其绑定到 “logs” 交换机上。之后,声明了一个延时队列 “test_delay”,并设置了该队列的 x-dead-letter-exchange 参数为 “logs”,即当消息过期时将消息发送到 “logs” 交换机。最后,通过 ch.Consume 方法监听 “test_logs” 队列,并在回调函数中处理接收到的消息。


func main() {
	// 建立链接
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明一个主要使用的 exchange
	err = ch.ExchangeDeclare(
		"logs",   // name
		"fanout", // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	failOnError(err, "Failed to declare an exchange")

	// 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列
	q, err := ch.QueueDeclare(
		"test_logs",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		nil,   // arguments
	)
	failOnError(err, "Failed to declare a queue")

    /**
     * 注意,这里是重点!!!!!
     * 声明一个延时队列, ß我们的延时消息就是要发送到这里
     */
	_, errDelay := ch.QueueDeclare(
		"test_delay",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		amqp.Table{
			// 当消息过期时把消息发送到 logs 这个 exchange
			"x-dead-letter-exchange":"logs",
		},   // arguments
	)
	failOnError(errDelay, "Failed to declare a delay_queue")

	err = ch.QueueBind(
		q.Name, // queue name, 这里指的是 test_logs
		"",     // routing key
		"logs", // exchange
		false,
		nil)
	failOnError(err, "Failed to bind a queue")

	// 这里监听的是 test_logs
	msgs, err := ch.Consume(
		q.Name, // queue name, 这里指的是 test_logs
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf(" [x] %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Golang RabbitMQ实现的延时队列 的相关文章

随机推荐

  • 批处理替换修改文件名

    echo off echo echo title 批量替换文件名中的部分字符串 color 3f echo Note echo 本批处理可批量替换本文件所在文件夹下的所有文件名的相同字符 echo echo echo echo echo e
  • CentOS 8 通过二进制安装 MySQL

    需求 CentOS8下采用二进制安装包的形式安装MySQL 并且指定数据库文件存放的路径地址 步骤如下 在 MySQL下载地址 中下载 MySQL 二进制安装包 注意 在版本选择的时候 版本号在8 0 11及以下包后缀都是 tar gz 但
  • Python中from from __future__ import *的用法

    from future import 参考 https blog csdn net zzc15806 article details 81133045 我们在读代码的时候 总是会看到代码开头会加上from future import 这样的
  • NoClassDefFoundError/ClassNotFoundException 到底从哪引用到了这个类?排查思路

    1 背景 公司内网登录改造升级 使用方需要配合升级 jar 包 本以为很简单的事情 升级版本上线就 OK 了 没想到升级头一个服务 部署到测试环境就有问题 2 表象 访问所有页面报 404 3 排查思路 3 1 排除法 确定是不是升级 ja
  • cmd简单游戏代码_制作一个猜数字的游戏

    十一节假日 我在敲代码 外甥女突然问我 舅舅 你能不能给我编个游戏啊 看着外甥女期盼的眼神 我当然不好拒绝啊 而且如果写一点代码 能让小朋友对编程有个简单的了解 甚至激发她对编程学习的兴趣 那也是极好的啊 话不多说 我立马敲一个猜数字的文字
  • api-ms-win-core-path-l1-1-0.dll丢失怎么解决?

    api ms win core path l1 1 0 dll文件可以帮助用户快速的启动一些相关的应用程序 让应用程序可以正常的使用 但是近期有用户在电脑的使用中 遇到了系统提示提示 api ms win core path l1 1 0
  • Mysql的分布式(XA)真面目

    Mysql XA 一 XA是什么 二 MySQL中XA实现 1 内部XA事务 两阶段提交PC 2 外部XA事务 总结 一 XA是什么 XA 协议本就是为一个分布式事务协议 它规定了 XA PREPARE XA COMMIT XA ROLLB
  • [React] markdown以及markdown-navbar实现方案

    React markdown以及markdown navbar实现方案 1 前言 心血来潮 想在自己的项目中实现 Markdown 文件的渲染 以下是我当前的实现方式以及遇到的一些问题的记录 本人水平很拉 有更好的方法欢迎在下面讨论 2 m
  • C++ Primer阅读笔记--万能引用和引用折叠

    目录 1 万能引用 1 1 万能引用的实现 1 2 万能引用与右值引用的区别 2 引用折叠 1 万能引用 1 1 万能引用的实现 万能引用可以向其传递任何类型的参数 其会自动进行参数类型的推断 万能引用的两种实现如下 基于模板实现 temp
  • 【Ribbon路由规则器】服务筛选,过滤服务基础组件AbstractServerPredicate

    前言 Ribbon在进行Server过滤的时候 用到了一个重要的基础组件 AbstractServerPredicate 它的作用就是在众多Server的列表中通过一定的过滤策略踢除不合格的Server 留下来合格的Server列表 负载均
  • Matlab中值滤波

    medfilt2 是 MATLAB 中的一个函数 用于对二维图像进行中值滤波 中值滤波是一种非线性滤波方法 它将每个像素的值替换为该像素周围邻域内像素的中值 该函数语法如下 B medfilt2 A m n padopt 其中 A 是需要进
  • 三菱指令大全

    一 顺控指令 1 触点指令 00 LD 逻辑操作开始 01 LDI 逻辑非操作开始 02 AND 逻辑乘 03 ANI 逻辑乘非 04 OR 逻辑加 05 ORI 逻辑加非 2 连接指令 06 ANB AND逻辑块与 07 ORB OR逻辑
  • 嵌入式数据库Sqlite3.3.6移植教程

    本文介绍的内容都是基于Fedora10平台的 一 PC机编译安装 请阅读在安装包里的 INSTALL 文件 或者使用PEAR installer with pear install sqlite SQLite已经内置了 你不需要安装任何附加
  • 字符串数组中,strlen与sizeof的比较

    char str1 15 hello 用 赋值系统会在字符串结尾自动添 0 printf strlen str1 ld n strlen str1 printf sizeof str1 ld n sizeof str1 gt gt gt s
  • Ubuntu下,dpkg安装出错的修复

    参考 http www khattam info 2009 08 04 solved subprocess pre removal script returned error exit status 2 error 我在ubuntu上安装l
  • 【数学建模集训系列】公交查询系统的matlab实现-公交站点和线路对应矩阵

    功能 求站点S和路线L矩阵 表示通过S的所有线路 日期 8 9 2011 clear clc fid fopen Bus txt r 打开数据 if fid gt 0 disp 数据文件打开成功 else disp 打开失败 return
  • 【目标检测】yolov5模型详解

    文章目录 一 Yolov5网络结构 1 1 Input 1 2 Backbone 1 2 1 Conv模块 1 2 2 C3模块 1 2 3 SPPF模块 1 3 Neck 1 4 Head 1 4 1 head 1 4 2 目标框回归 1
  • react函数组件

    react 创建组件有三种方式 函数式定义的无状态组件 es5原生方式React createClass定义的组件 es6形式的extends React Component定义的组件 这篇我们主要讲函数组件的使用 在函数组件里面是没有生命
  • python selenium爬虫自动登录实例

    一 概述 我们要先安装selenium这个库 使用pip install selenium 命令安装 selenium这个库相当于机器模仿人的行为去点击浏览器上的元素 这时我们要用到一个浏览器的驱动 这里我用的是谷歌浏览器 二 安装驱动 确
  • Golang RabbitMQ实现的延时队列

    文章目录 前言 一 延时队列与应用场景 二 RabbitMQ如何实现延时队列 实现延时队列的基本要素 整体的实现原理如下 三 Go语言实战 生产者 消费者 前言 之前做秒杀商城项目的时候使用到了延时队列来解决订单超时问题 本博客就总结一下G