Redis的延时队列

2023-05-16

延时队列

redis的list数据结构用来做一部消息队列,使用rpush/lpush操作入队列。 使用lpop/rpop来出队列。

> rpush notify-queue apple banana pear 
(integer) 3 
> llen notify-queue 
(integer) 3 
> lpop notify-queue 
"apple"
> llen notify-queue 
(integer) 2 
> lpop notify-queue 
"banana"
> llen notify-queue 
(integer) 1 
> lpop notify-queue 
"pear"
> llen notify-queue 
(integer) 0 
> lpop notify-queue 
(nil)

如果队列空了,客户端会陷入pop的死循环,不停的pop,没有数据,接着pop,又没有数据。空轮询会拉高客户端的CPU,redisde QPS也会被拉高。

阻塞读在队列没有数据时,会立即进入休眠状态,一旦数据到来,则立刻醒来,消息的延迟几乎为0,用blpop/brpop替代前面的lpop/rpop。可解决上面问题。

如果线程一直阻塞,redis客户端链接成了闲置链接,久了服务器会自动断开链接,以减少资源的占用。此时blpop/brpop会抛出异常

实现

演示队列可以通过redis的zset(有序列表)来实现。我们将消息序列化成一个字符串作为zset的value,这个消息的到期处理时间作为score,然后用多个线程轮询zset活到期任务进行处理,多个线程是为了保障可用性,万一挂了一个线程还有其他线程可以继续处理。因为有多个线程,所以需要考虑并发争抢任务,确保任务不能被多次执行。

import java.lang.reflect.Type; 
import java.util.Set; 
import java.util.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference; 
import redis.clients.jedis.Jedis; 
public class RedisDelayingQueue<T> { 
static class TaskItem<T> { 
public String id; 
public T msg; 
} 
// fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference 
private Type TaskType = new TypeReference<TaskItem<T>>() { }.getType(); 
private Jedis jedis; 
private String queueKey; 
public RedisDelayingQueue(Jedis jedis, String queueKey) { 
this.jedis = jedis; 
this.queueKey = queueKey; 
} 
public void delay(T msg) { 
TaskItem task = new TaskItem(); 
task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid 
task.msg = msg; 
String s = JSON.toJSONString(task); // fastjson 序列化
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
} 
public void loop() { 
while (!Thread.interrupted()) {
// 只取一条
Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); 
if (values.isEmpty()) { 
try { 
Thread.sleep(500); // 歇会继续
} 
catch (InterruptedException e) { 
break;
} 
continue; 
} 
String s = values.iterator().next(); 
if (jedis.zrem(queueKey, s) > 0) { // 抢到了
TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化
this.handleMsg(task.msg); 
} 
} 
} 
public void handleMsg(T msg) { 
System.out.println(msg); 
} 
public static void main(String[] args) { 
Jedis jedis = new Jedis(); 
RedisDelayingQueue queue = new RedisDelayingQueue<>(jedis, "q-demo"); 
Thread producer = new Thread() { 
public void run() { 
for (int i = 0; i < 10; i++) { 
queue.delay("codehole" + i); 
} 
} 
}; 
Thread consumer = new Thread() { 
public void run() { 
queue.loop(); 
} 
}; 
producer.start(); 
consumer.start(); 
try { 
producer.join(); 
Thread.sleep(6000); 
consumer.interrupt(); 
consumer.join(); 
} 
catch (InterruptedException e) { 
} 
} 
}

Redis 作为消息队列为什么不能保证 100% 的可靠性?

因为没有ask机制,当消费端崩溃后消息丢失。pop出消息后,list 中就没这个消息了,如果处理消息的程序拿到消息还未处理就挂掉了,那消息就丢失了,所以是不可靠队列

Redis延时队列的优势

1.Redis zset支持高性能的 score 排序。

2.Redis是在内存上进行操作的,速度非常快。

3.Redis可以搭建集群,当消息很多时候,我们可以用集群来提高消息处理的速度,提高可用性。

4.Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性

Redis延时队列劣势

  1. 使用 Redis 实现的延时消息队列也存在数据持久化, 消息可靠性的问题
  2. 没有重试机制 - 处理消息出现异常没有重试机制, 这些需要自己去实现, 包括重试次数的实现等
  3. 没有 ACK 机制 - 例如在获取消息并已经删除了消息情况下, 正在处理消息的时候客户端崩溃了, 这条正在处理的这些消息就会丢失, MQ 是需要明确的返回一个值给 MQ 才会认为这个消息是被正确的消费了,如果对消息可靠性要求较高, 推荐使用 MQ 来实现.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Redis的延时队列 的相关文章

  • ubuntu20.04 桌面图标显示异常及解决方法

    前言 更新至ubuntu20 04后 xff0c 出现了一些以前没有的问题 桌面上有些图标不显示 文章目录 前言一 具体表现二 原因三 解决方法总结 一 具体表现 例如有一次我在做备忘录时 我习惯地打开终端 span class token
  • Java类名的命名规则

    1 类名必须使用有意义的名字 xff1b 2 类名的每个单词的首字母必须大写 帕斯卡命名法 xff1b 3 类名不能使用数字 除了 和 之外的任何符号 xff0c 中间不能添加空格 xff0c 不能使用java关键字 xff1b 如 xff
  • firewalld高级配置

    1 IP地址伪装 masquerade xff1a 伪装 通过地址伪装 xff0c NAT设备将经过设备的包转发到指定接收方 xff0c 同时将通过的数据包的原地址更改为NAT的接口地址转发到不同步目的地 当是返回数据包是 xff0c 会将
  • Java中关于JSON格式数据的操作

    对于java格式数据的处理 xff1a 1 xff1a 先创建java实体类 xff0c 例如 xff1a public class Brand private String id private String brandName publ
  • 线程常用调度方法

    目录 一 线程等待 二 线程通知 三 线程休眠 四 请求让出CPU执行权 五 线程中断 一 线程等待 1 wait xff08 xff09 xff1a 当一个线程调用了wait xff08 xff09 方法后 xff0c 这个线程会被阻塞挂
  • centos7 安装jdk详细教程

    一 前言 本文主要介绍的是Centos7 Linux环境下安装jdk 8u333的详细图文教程 xff0c 用过linux服务器的开发人员都知道 xff0c JDK是作为日常开发常用的基础环境 xff0c 所以安装jdk是必要的 xff0c
  • KDE 美化(Manjaro)-记录

    KDE 美化 Manjaro 要想在不同的工具包之间获得相似的外观 xff0c 你很可能需要修改以下内容 xff1a 主题 包含一套风格 图标主题和颜色主题 风格 图形布置 xff0c 观感 图标主题 一套整体的图标 颜色主题 一套连接风格
  • spring容器对Bean组件的管理

    spring容器对Bean组件的管理 1 Bean对象创建时机 默认是随着容器创建 xff0c 可以使用lazy init 61 true xff08 在调用getBean创建 xff09 延迟创建 xff0c 也可以使用 lt beans
  • nginx平滑升级(添加echo功能)配置和状态监控

    添加echo模块 配置 1 先去github或者gitee中找到nginx module echo master zip包 2 将原来的ngin 1 20 1删除 重新编译安装 span class token punctuation sp
  • 字节对齐的原理和方法

    Pragma是什么 小发猫的博客 CSDN博客 pragma是什么 Pragma是什么 Pragma是什么 翻译 SkyJacker后附英文原文 译者注 一句话 xff0c pragma就是为了让编译器编译出的C或C 43 43 程序与机器
  • 【Android】Banner2.1的使用

    com youth banner Banner 2 1的使用 与前版本不同的是 xff0c 2 1版本是用的适配器 设置适配器和点击事件 banner span class token punctuation span span class
  • linux系统中安装Java环境

    Ubuntu安装Java环境 步骤1 xff1a 下载jdk 我选择的jdk版本文件 xff1a jdk 8u131 linux x64 tar gz 步骤2 xff1a 创建单独的目录 sudo mkdir usr local java
  • SpringMVC --01.2023Idea搭建全注解式开发的SpringMVC

    1 创建项目 打开Idea xff0c 并点击新建项目 注 xff1a 使用的是2022 2的商业版 xff0c 该版本跟2021 2的商业版创建Maven项目不一样 点击右侧的新建项目 gt 取名 gt 创建 这样我们就创建了一个空依赖的
  • Java中的异常及异常处理

    目录 1 什么是异常 2 为什么要处理异常 3 异常分类 4 如何进行异常处理 4 1 捕获异常 4 2 手动抛出异常 4 3 自定义异常 4 4 debug调试模式 5 其他异常 1 什么是异常 程序中 在代码编译或运行过程中 xff0c
  • Spring之配置文件

    目录 什么是配置文件 配置文件作用 配置文件的格式 properties 配置文件说明 properties 基本语法 三种读取properties的方法 yml 配置文件 yml 基本语法 总结 什么是配置文件 首先我们知道我们的程序是
  • 已解决:前、后端打包部署至服务器后,背景图片不显示并且一些图标都变成了方块

    将打包好的jar包部署至服务器后 xff0c 输入项目网址后 xff0c 发现背景图片没有显示出来并且一些图标变成了方块 解决办法 xff1a 在前端找到bulid文件目录下的utils js文件 xff0c 添加以下语句 xff1a pu
  • 分布式 Redis & RabbitMQ 终极秒杀

    本篇文章记录的为RabbitMQ知识中企业级项目中秒杀相关内容 xff0c 适合在学Java的小白 帮助新手快速上手 也适合复习中 xff0c 面试中的大佬 x1f649 x1f649 x1f649 如果文章有什么需要改进的地方还请大佬不吝
  • Ubuntu字符界面输入密码始终提示错误 login incorrect 解决办法

    首先要明确自己的用户名 xff0c 可以在设置中看到 其次是密码 xff1a 如果密码有数字 xff0c 只能使用字母上面的数字输入 xff0c 数字键盘无法识别 xff0c 会导致login incorrect
  • Unix环境高级编程代码(实时更新)

    实例1 3 列出一个目录中所有文件 xff08 ls c xff09 include 34 apue h 34 include lt dirent h gt int main int argc char argv DIR dp struct
  • nginx-rewrite和if使用

    rewrite 常见的flag flag作用last基本上都用这个flag xff0c 表示当前的匹配结束 xff0c 继续下一个匹配 xff0c 最多匹配10个到20个 一旦此rewrite规则重写完成后 xff0c 就不再被后面其它的r

随机推荐

  • ZYNQ7020 FPGA 如何生成从Flash和SD卡启动的镜像文件

    ZYNQ7020 FPGA 生成从Flash和SD卡启动的镜像文件 xff08 BOOT bin xff09 创建BOOT bin 工具vivado 2017 4 1 创建工程 包括创建工程 xff0c 编写程序 xff0c 添加约束 2
  • 解析Android自带的SettingActivity——>Proference

    1 在res xml中 的代码解析 Preference是采用SharedPreference保存数据的 这里的属性key表示默认存储的键 xff0c defaultValue表示默认存储的值 如果是使用preference的话 xff0c
  • Android中使用lottie资源

    lottie资源的使用 1 下载lottie文件的网址 xff1a https lottiefiles com xff0c 下载的文件为 json的文件 2 存放在Android的位置为 3 在应用级别的 build gradle 文件中添
  • 【实验七】Linux生产者消费者问题(线程)

    目录 一 问题介绍 二 代码 1 prod cons cpp 2 producer h 3 producer cpp 4 consumer h 5 consumer cpp 6 mq h 7 mq cpp 8 message h 9 mes
  • HTTP-响应数据格式及常见地状态响应码(403,404,405)

    HTTP 响应数据格式 响应数据分为3部分 1 响应行 响应数据的第一行 其中HTTP 1 1表示协议版本 xff0c 200表示响应状态码 xff0c OK表示状态码描述 2 响应头 第二行开始 xff0c 格式为键值对的形式 3 响应体
  • 【Linux】TigerVNC安装指导

    1 以单一用户远程访问桌面 1 1 服务端中启用桌面共享 在统信服务器操作系统V20 1020a 上配置为启用单一客户端的远程桌面连接 1 2 配置远程桌面服务端 1 配置防火墙规则来启用对服务端的VNC访问或关闭防火墙 xff1a fir
  • 使用console.log输出特殊字符图案或自定义图片

    最近看到一篇比较有趣的文章 程序员的浪漫 console log 在浏览器控制台输出特殊字符编码的图案 想自己动手试一试 xff0c 很明显我做的效果不好 xff0c 弄了很久还是没弄出来 下面介绍另外一种方法 xff0c 方法来自 使用c
  • IDEA中添加了vue.js插件后setting就打不开;添加vue.js报错Requires plugin ‘intellij.webpack‘ to be installed

    IDEA版本要和vue js版本对应 查看IDEA版本 xff0c help about 然后再去官网查找和自己IDEA版本对应的vue js Versions Vue js IntelliJ IDEs Plugin Marketplace
  • yml配置文件简单语法及小坑

    yml文件使用方法 1 语法 K 空格 V 表示一对键值对 xff0c 以空格缩进来控制层级关系 xff0c 只要左对齐的一列数据 xff0c 都是一个层级的 属性和值是大小写敏感 2 写法 普通值 字符串默认不加单引号或者双引号 xff1
  • 抽象工厂模式

    工厂模式 工厂方法模式 xff08 Fatory Method Pattern xff09 提供一个接口 xff0c 一个可创建一系列相关对象的 无需指定他们的具体类 一个抽象工厂类 xff0c 不同的具体工厂产生不同的对象实体 eg 冰箱
  • docker简介--01

    官方文档 xff1a https docs docker com engine reference commandline docker 官方仓库 xff1a https hub docker com docker基本组成 image 镜像
  • docker网络配置和名称空间管理

    docker容器虚拟化 虚拟化网络 Network Namespace 是 Linux 内核提供的功能 xff0c 是实现网络虚拟化的重要功能 xff0c 它能创建多个隔离的网络空间 xff0c 它们有独自网络栈信息 不管是虚拟机还是容器
  • CentOS系统下安装docker简易步骤

    docker 官网地址 https www docker com docker 开发文档 https docs docker com manuals 手册 gt install gt Linux xff08 centos xff09 环境为
  • linux环境安装jdk

    linux环境安装jdk 1 查看本环境下是否java环境 java version 不存在 已存在 2 如果不存在 xff0c 先去下载jdk 到官网下载jdk 注 xff1a 一定要根据具体的linux系统按需下载对应的安装包 我的是l
  • linux环境下安装tomcat

    配置tomcat 到官网下载tar包 将tar包上传到服务器 并解压 span class token function tar span zxvf apache tomcat 9 0 65 tar gz 重命名tomcat9 span c
  • docker基础命令以及常用命令

    docker 基本命令 1 其他命令 span class token comment 查看版本 span docker version span class token comment 查看信息 span docker info span
  • Redis基础数据结构及其使用

    Redis 一 xff0c docker方式安装redis span class token comment 拉取 redis 镜像 span span class token operator gt span span class tok
  • 设计模式之--原型模式

    原型模式 原型实例指定创建对象的种类 xff0c 并且通过复制这些原型创建新对象 使用场景 xff1a 类初始化消耗资源比较多 使用new生成一个对象需要非常繁琐的过程 构造函数比较复杂 在循环体中产生大量对象 浅克隆 浅克隆只是完整复制了
  • redis分布式锁使用方式

    为什么使用锁 xff1f 锁的作用是要解决多线程对共享资源的访问而产生的线程安全问题 当多个线程并发操作某个对象时 xff0c 可以通过synchronized来保证同一时刻只能有一个线程获取到对象锁进而处理synchronized关键字修
  • Redis的延时队列

    延时队列 redis的list数据结构用来做一部消息队列 xff0c 使用rpush lpush操作入队列 使用lpop rpop来出队列 span class token operator gt span rpush notify que