教你如何基于Redis来实现高性能延时消息队列!

2023-11-16

最近在倒腾自建博客后端系统,需要用到延时任务的功能,但手头只有一套MySQL和Redis,如果搞一套MQ成本有点大,于是想着用redis实现延时消息队列。有些场景用数据库的定时扫表也能简单实现延时消息的功能,不过对于我这边的实际场景(比如计数系统)其实是把数据存到redis中,如果用数据库实现延时消息会对数据库有比较大的压力。

系统设计

这里参考了有赞的延迟队列设计

数据结构设计

事件消息体

type EventEntity struct {
	EventId    int64
	Topic      string
	Body       string
	EffectTime time.Time
}
复制代码
  • EVENT_POOL: 使用redis的hash,里面存储了任务事件的完整信息,key=prefix+namespace+topic,field=EventId, val=EventEntity;
  • EVENT_BUCKET: 使用redis的zset,里面存储了任务事件的有序集合,key=prefix+namespace+topic,score=EffectTime, member=EventId;
  • EVENT_QUEUE: 使用redis的list, list中存储了到期待消费任务的EventId。

延迟队列的执行流程

1、当有新增延时任务过来时,会在EVENT_POOL对应的topic中添加一条记录,同时也会把任务添加到EVENT_BUCKET中,按生效时间排序;

2、搬运线程会定时扫描EVENT_BUCKET中已经到期的任务,将这些任务push到EVENT_QUEUE对应topic的队列当中,之后将这些任务从EVENT_BUCKET中删除;

3、EVENT_QUEUE每个topic会有一个监听线程,当发现当前topic队列中有待消费的任务,则会将任务pop出来,并从EVENT_POOL中查询任务详情,交给consumer消费。

代码实现

核心代码

发布延时任务

func (q *DelayQueue) PublishEvent(ctx context.Context, event *EventEntity) error {
	pipeline := q.redisClient.WithContext(ctx).Pipeline()
	defer pipeline.Close()

    // 向EVENT_POOL中添加任务
	pipeline.HSet(q.genPoolKey(event.Topic), strconv.FormatInt(event.EventId, 10), util.ToJsonString(event))
	// 将任务id添加到EVENT_BUCKET中,按生效时间排序
	pipeline.ZAdd(q.genBucketKey(event.Topic), redis.Z{
		Member: strconv.FormatInt(event.EventId, 10),
		Score:  float64(event.EffectTime.Unix()),
	})
	_, err := pipeline.Exec()
	if err != nil {
		logs.CtxWarn(ctx, "pipeline.Exec", logs.String("err", err.Error()))
		return err
	}
	return nil
}
复制代码

搬运线程扫描到期任务

func (q *DelayQueue) carryEventToQueue(topic string) error {
	ctx := context.Background()
	// 扫描zset中到期的任务
	members, err := q.redisClient.WithContext(ctx).ZRangeByScoreWithScores(q.genBucketKey(topic), redis.ZRangeBy{Min: "0", Max: util.ToString(time.Now().Unix())}).Result()
	if err != nil && err != redis.Nil {
		logs.CtxWarn(ctx, "[carryEventToQueue] ZRangeByScoreWithScores", logs.String("err", err.Error()))
		return err
	}
	if len(members) == 0 {
		return nil
	}

	errMap := make(map[string]error)
	// 将任务添加到对应topic的待消费队列里
	for _, m := range members {
		eventId := m.Member.(string)
		err = q.redisClient.WithContext(ctx).LPush(q.genQueueKey(topic), eventId).Err()
		if err != nil {
			logs.CtxWarn(ctx, "[carryEventToQueue] LPush", logs.String("err", err.Error()))
			errMap[eventId] = err
		}
	}

	// 从Bucket中删除已进入待消费队列的事件
	var doneMembers []interface{}
	for _, m := range members {
		eventId := m.Member.(string)
		if _, ok := errMap[eventId]; !ok {
			doneMembers = append(doneMembers, eventId)
		}
	}
	if len(doneMembers) == 0 {
		return nil
	}

	err = q.redisClient.WithContext(ctx).ZRem(q.genBucketKey(topic), doneMembers...).Err()
	if err != nil {
		logs.CtxWarn(ctx, "[carryEventToQueue] ZRem", logs.String("err", err.Error()))
	}
	return nil
}

复制代码

监听线程消费任务

这里使用了List的BLPop命令,当有数据时会立即返回,没有数据则会一直阻塞直到有数据过来;这样可以避免定时扫描列表浪费资源。

func (q *DelayQueue) runConsumer(topic string, subscriberList []IEventSubscriber) error {
	for {
		ctx := context.Background()
		kvPair, err := q.redisClient.WithContext(ctx).BLPop(60*time.Second, q.genQueueKey(topic)).Result()
		if err != nil {
			logs.CtxWarn(ctx, "[InitOnce] BLPop", logs.String("err", err.Error()))
			continue
		}
		if len(kvPair) < 2 {
			continue
		}

		eventId := kvPair[1]
		data, err := q.redisClient.WithContext(ctx).HGet(q.genPoolKey(topic), eventId).Result()
		if err != nil && err != redis.Nil {
			logs.CtxWarn(ctx, "[InitOnce] HGet", logs.String("err", err.Error()))
			if q.persistFn != nil {
				_ = q.persistFn(&EventEntity{
					EventId: util.String2Int64(eventId),
					Topic:   topic,
				})
			}
			continue
		}
		event := &EventEntity{}
		_ = jsoniter.UnmarshalFromString(data, event)

		for _, subscriber := range subscriberList {
			util.Retry(3, 0, func() (success bool) {
				err = subscriber.Handle(ctx, event)
				if err != nil {
					logs.CtxWarn(ctx, "[InitOnce] subscriber.Handle", logs.String("err", err.Error()))
					return false
				}
				return true
			})
		}

		err = q.redisClient.WithContext(ctx).HDel(q.genPoolKey(topic), eventId).Err()
		if err != nil {
			logs.CtxWarn(ctx, "[InitOnce] HDel", logs.String("err", err.Error()))
		}
	}
}
复制代码

其他

1、优雅关闭

DelayQueue对象中使用wg、isRunning、stop三个变量来实现优雅关闭,具体可参考源码。

type DelayQueue struct {
	namespace   string
	redisClient *redis.Client
	once        sync.Once
	wg          sync.WaitGroup
	isRunning   int32
	stop        chan struct{}
	persistFn   PersistFn
}
复制代码
// gracefully shudown
func (q *DelayQueue) ShutDown() {
	if !atomic.CompareAndSwapInt32(&q.isRunning, 1, 0) {
		return
	}
	close(q.stop)
	q.wg.Wait()
}
复制代码

2、消费失败后持久化任务

可为DelayQueue对象设置持久化方法persistFn,用来在监听线程消费任务失败时将任务id持久化以便人工处理。

...

q.redisClient.WithContext(ctx).HGet(q.genPoolKey(topic), eventId).Result()
if err != nil && err != redis.Nil {
	logs.CtxWarn(ctx, "[InitOnce] HGet", logs.String("err", err.Error()))
	if q.persistFn != nil {
		_ = q.persistFn(&EventEntity{
			EventId: util.String2Int64(eventId),
			Topic:   topic,
		})
	}
	continue
}

...
复制代码

源码地址

redis_delay_queue: github.com/hudingyu/re…

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

教你如何基于Redis来实现高性能延时消息队列! 的相关文章

随机推荐

  • 乐学python靠谱吗_【乐学100】-乐学100怎么样

    花开梦未蓝nice 今天刚给孩子注册成功 孩子马上被乐学100吸引住了 首先 是看到礼品店里有许多他心仪的奖品 他希望通过努力能够得到礼品 然后 他开始为自己挣学分 他一口气做了有理数单元11个小模块的练习 正确率较高 又做了错题总结 改正
  • C++函数模板基础

    今天给大家带来函数模板的内容 当我们面对 逻辑相同但类型不同 的问题时 比如整型相加和浮点型相加 使用函数模板可以提高代码利用率 起到事半功倍的效果 今天给大家分享蛋类型变量的函数模板 include
  • 关于C++对象模型(下)

    下篇主要讨论调用成员函数 访问成员变量的开销 及其特殊成员函数 数组 异常处理的讨论 这篇文章中出现的对象定义都出现在上篇中 全文在这里下载 文章内容转自 http tb blog csdn net TrackBack aspx PostI
  • 数据链路层六大协议详解

    数据链路层六大协议详解 一些假设 1 无限制的单工协议 乌托邦协议 五点假设 发送方 接收方 接受方 2 单工停 等协议 3 有噪声信道的单工协议 本文图片截取自 学堂在线 华南理工大学的计算机网络课程 一些假设 物理层 数据链路层和网络层
  • 视频格式无损/快速转换——ffmpeg(mkv等转mp4)

    非商业用途转载请务必注明出处 https blog csdn net qq 40491305 article details 103272651 最近用PR发现不支持mkv格式 需要转化为mp4 考虑到快速 无损以及我仅仅需要转码 懒得使用
  • java.lang.ArrayIndexOutOfBoundsException(数组越界)处理方法

    转发自https blog csdn net qq 34646449 article details 76146659 当你使用不合法的索引访问数组时会报数组越界这种错误 数组arr的合法错误范围是 0 arr length 1 当你访问这
  • 使用反射动态校验后台配置某字段是否允许为空

    背景 使用easyExcel读取数据 title不固定 后台可以配置title必填项 title顺序可改变 用户可以自定义title 不存储自定义信息即可 不报错 思路 不能使用easyExcel使用index的方式读取数据 改成value
  • 文件在使用FileChannel.map后不能被删除(Windows上)

    同事发现在Windows上使用FileChannel的map方法之后 不能够删除掉文件 我在Linux上试了一下 发现没这个问题 做个笔记 记录一下 import java io File import java io RandomAcce
  • Unity笔记之获取鼠标停留的UI和删除按键触发后引用、判断鼠标是否在UI上

    需求 鼠标放在UI上 需要获取这个UI物体 以方便进行其他操作 百度学习了半天 最终拿了一个大哥 添加链接描述 的内容 本文仅作为个人笔记 建议大家直接去这大哥的博客看 不过我记得好像也可以通过继承unity内部的鼠标事件接口获取到物体 但
  • java.io.StreamCorruptedException: invalid type code: AC错误解决

    最近做IO时 出现了一个我百思不得其解的错误 虽然经过一番 解决的bug 但是对于这一方面的底层知识还是有待去深入了解 借这个机会 好好学习一下 一般 可以使用ObjectInputStream把对象写出到文件 再使用ObjectOutpu
  • 五分钟告诉你什么是爬虫?

    1 什么是爬虫 把互联网比喻成一张网 那么爬虫就是网上爬行的蜘蛛 把网的节点比喻成一个个网页 爬虫爬取到就相当于访问了该页面 获取了其信息 爬虫可以通过一个节点之后 顺着节点连线 链接 继续爬行到下一个节点 即通过一个网页继续获取后续的网页
  • 直播 RTM 推流在抖音的应用与优化

    动手点关注 干货不迷路 背景 随着互联网技术以及网络基建的快速发展和普及 视频直播已经成为了一种越来越普遍的娱乐和社交方式 无论是个人还是企业 都可以通过视频直播平台进行直播活动 向观众展示自己的生活 工作或者产品 同时 视频直播也成为了一
  • sqlmap自动注入1(Target完整的超级详细 如有错误望指出)

    SQLmap的自动注入学习之路 1 是通过五种sql注入漏洞的检测技术 and select from select sleep 20 a 这是基于时间的盲注检测 看他返回的时间 可以在DVWA试试 sqlmap支持非常全面的 数据库管理系
  • 咬了一口苹果死去的计算机之父——图灵

    艾伦 麦席森 图灵 Alan Mathison Turing 1912年6月23日 1954年6月7日 英国数学家 逻辑学家 被称为计算机科学之父 人工智能之父 1910年左右的伦敦 1912年生于英国伦敦帕丁顿 家族成员里有三位当选过英国
  • 星网宇达-组合导航在ros系统中的使用方法

    1 安装差分天线 两个 组合导航主机 DTU模块 不使用基站GPS的定位精度是米级的 园区里面自己搭建基站用DTU进行数据传输精度可达到厘米级 也可以利用千寻基站进行定位 效果不清楚 2 根据 杆臂以及天线安装说明文档 进行天线安装和杆臂配
  • 数据分析03——矩阵常用计算方法和函数

    0 前言 数组 计算机领域的概念 矩阵 数学领域的概念 对于Numpy而言 矩阵是数组的分支 1 创建矩阵 字符串创建矩阵 mat1 np matrix 1 2 3 4 列表形式创建矩阵 mat2 np matrix 5 6 7 8 通过数
  • QTextDocument和QTextBlock

    QTextDocument QTextDocument是用于结构化富文本文档的容器 为样式文本和各种类型的文档元素 如列表 表格 框架和图像 提供支持 可以创建它们以在QTextEdit中使用 也可以独立使用 每个文档元素均由关联的格式对象
  • 可视化工具Netron介绍

    Netron是一种用于神经网络 深度学习和机器学习模型的可视化工具 它可以为模型的架构生成具有描述性的可视化 descriptive visualization 源码在 https github com lutzroeder netron
  • Taro安装、启动命令、创建项目、修改端口号以及如何在微信开发者工具内运行

    1 安装Taro开发工具 npm install g tarojs cli 或者yarn global add tarojs cli 使用npm可能会有一些报错的信息 建议使用cnpm安装 2 使用命令创建模板 taro init myAp
  • 教你如何基于Redis来实现高性能延时消息队列!

    最近在倒腾自建博客后端系统 需要用到延时任务的功能 但手头只有一套MySQL和Redis 如果搞一套MQ成本有点大 于是想着用redis实现延时消息队列 有些场景用数据库的定时扫表也能简单实现延时消息的功能 不过对于我这边的实际场景 比如计