redis 过期消息订阅实现(java实现)

2023-10-31

一、开启redis消息通知功能

方法1: 修改conf文件

编辑/etc/redis/redis.conf文件,添加或启用以下内容(key过期通知):

notify-keyspace-events Ex

方法2: 使用命令

  1. 登陆redis-cli
  2. 输入下列命令
config set notify-keyspace-events Ex

关键字介绍:

上面Ex就是其中的关键字之一
  • K:keyspace事件,事件以__keyspace@__为前缀进行发布
  • E:keyevent事件,事件以__keyevent@__为前缀进行发布
  • g:一般性的,非特定类型的命令,比如del,expire,rename等
  • $:字符串特定命令
  • l:列表特定命令
  • s:集合特定命令
  • h:哈希特定命令
  • z:有序集合特定命令
  • x:过期事件,当某个键过期并删除时会产生该事件
  • e:驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件
  • A:g$lshzxe的别名,因此AKE意味着所有事件

订阅者介绍

  • onMessage: 收到消息回调
  • onSubscribe: 订阅频道(channel)
  • onUnsubscribe: 取消订阅频道(channel)
  • onPMessage: 收到消息回调-p模式
  • onPSubscribe: 订阅频道(channel)p模式
  • onPUnsubscribe: 取消订阅频道(channel)p模式

带P的就是可以在订阅的时候支持表达式, 一次性订阅多个频道,

例如:

__keyevent@*__:expired ```

其中的*标识订阅所有db的key过期事件

二、在pom文件中引入需要的redis依赖

        <!--添加redis依赖-->
         <dependency>
           <groupId>org.springframework.data</groupId>
           <artifactId>spring-data-redis</artifactId>
           <version>1.8.4.RELEASE</version>
         </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

三、编写基本配置文件 redis.properties

redis.hostName=192.168.6.138
redis.port=6379
redis.password=123321
# 连接超时时间
redis.timeout=10000

#最大空闲数
redis.maxIdle=300
#控制一个pool可分配多少个jedis实例,用来替换上面的redis.maxActive,如果是jedis 2.4以后用该属性
redis.maxTotal=1000
#最大建立连接等待时间。如果超过此时间将接到异常。设为-1表示无限制。
redis.maxWaitMillis=1000
#连接的最小空闲时间 默认1800000毫秒(30分钟)
redis.minEvictableIdleTimeMillis=300000
#每次释放连接的最大数目,默认3
redis.numTestsPerEvictionRun=1024
#逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1
redis.timeBetweenEvictionRunsMillis=30000
#是否在从池中取出连接前进行检验,如果检验失败,则从池中去除连接并尝试取出另一个
redis.testOnBorrow=true
#在空闲时检查有效性, 默认false
redis.testWhileIdle=true

四、编写配置类 RedisConfig

package com.lanqiaobei.ssm.yjk.config;


import com.lanqiaobei.ssm.yjk.util.RedisKeyExpirationListener;
//import com.liuyanzhao.ssm.blog.util.RedisLockUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.jcache.config.JCacheConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.nio.charset.StandardCharsets;

@Configuration
@PropertySource("classpath:redis.properties")
public class RedisConfig extends JCacheConfigurerSupport {
    @Autowired
    private Environment environment;

    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        JedisConnectionFactory fac = new JedisConnectionFactory();
        fac.setHostName(environment.getProperty("redis.hostName"));
        fac.setPort(Integer.parseInt(environment.getProperty("redis.port")));
        fac.setPassword(environment.getProperty("redis.password"));
        fac.setTimeout(Integer.parseInt(environment.getProperty("redis.timeout")));
        fac.getPoolConfig().setMaxIdle(Integer.parseInt(environment.getProperty("redis.maxIdle")));
        fac.getPoolConfig().setMaxTotal(Integer.parseInt(environment.getProperty("redis.maxTotal")));
        fac.getPoolConfig().setMaxWaitMillis(Integer.parseInt(environment.getProperty("redis.maxWaitMillis")));
        fac.getPoolConfig().setMinEvictableIdleTimeMillis(
                Integer.parseInt(environment.getProperty("redis.minEvictableIdleTimeMillis")));
        fac.getPoolConfig()
                .setNumTestsPerEvictionRun(Integer.parseInt(environment.getProperty("redis.numTestsPerEvictionRun")));
        fac.getPoolConfig().setTimeBetweenEvictionRunsMillis(
                Integer.parseInt(environment.getProperty("redis.timeBetweenEvictionRunsMillis")));
        fac.getPoolConfig().setTestOnBorrow(Boolean.parseBoolean(environment.getProperty("redis.testOnBorrow")));
        fac.getPoolConfig().setTestWhileIdle(Boolean.parseBoolean(environment.getProperty("redis.testWhileIdle")));
        return fac;
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory){
        // 创建RedisTemplate对象
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 设置连接工厂
        template.setConnectionFactory(connectionFactory);
        // 创建JSON序列化工具
        GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
        // 设置Key的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(StandardCharsets.UTF_8);
        template.setKeySerializer(stringRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);
        // 设置Value的序列化
        template.setValueSerializer(jsonRedisSerializer);
        template.setHashValueSerializer(jsonRedisSerializer);
        template.setDefaultSerializer(stringRedisSerializer);
        // 返回
        return template;
    }
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory redisConnectionFactory,
            RedisKeyExpirationListener redisKeyExpirationListener) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);

        // 监听 __keyevent@0__:expired 频道,这里的0指数据库编号为 0;
        container.addMessageListener(redisKeyExpirationListener,
                new PatternTopic("__keyevent@0__:expired"));

        return container;
    }

    @Bean
    public RedisKeyExpirationListener redisKeyExpirationListener() {
        return new RedisKeyExpirationListener();
    }
    // 其他 Bean 定义
}

五、实现监听类 RedisKeyExpirationListener

package com.lanqiaobei.ssm.yjk.util;

import cn.hutool.core.util.StrUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static cn.hutool.core.util.IdUtil.randomUUID;
@Component
public class RedisKeyExpirationListener implements MessageListener {
    @Autowired
    private RedisTemplate<String,Object> redisTemplate;
    @Autowired
    private PublisherMQ publisherMQ;

    //分布式锁过期时间 s  可以根据业务自己调节
    private static final Long LOCK_REDIS_TIMEOUT = 2000L;

    @Override
    public void onMessage(Message message, byte[] pattern) {

        // 获取过期的 Key,需要利用byte[]录入和接收,不然会出现中文乱码
        byte[] body = message.getBody();
        String allKey = redisTemplate.getStringSerializer().deserialize(body);
        String expiredKey = StrUtil.removePrefix(allKey, "todo:");//hutool工具里面去掉首位字符
//        System.out.println(expiredKey);

        // 处理相应的业务逻辑
//————————————————————————————————————————————————————————
        String key = "todolock:"+expiredKey;
        String value = randomUUID();
        //redis尝试获取锁,加锁
        Boolean getLock = getLock(key,value);

        if(getLock){
            publisherMQ.sendMessage("lqb.direct","queueLQBKey",expiredKey);
            releaseLock(key,value);
        }
    }

    /**
     *  加锁
     **/
    public Boolean getLock(String key,String value){
        Boolean lockStatus = redisTemplate.opsForValue().setIfAbsent(key,value);
        if (lockStatus) {
            System.out.println("Set key-value successfully!");
            redisTemplate.expire(key, LOCK_REDIS_TIMEOUT, TimeUnit.MILLISECONDS);//毫秒级
        } else {
            System.out.println("Key already exists!");
        }
        return lockStatus;
    }
    /**
     *  释放锁
     **/
    public Long releaseLock(String key,String value){
        String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        RedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript,Long.class);
        Long releaseStatus = (Long)this.redisTemplate.execute(redisScript, Collections.singletonList(key),value);
        return releaseStatus;
    }
}

监听redis过期消息提醒,同一个数据(键)过期会有多次通知提醒。原因是:可能是由于 Redis 的主从复制或者分片集群等机制导致的。在主从复制或者分片集群中,可能会发生多个节点同时订阅了相同的键空间通知,从而导致同一个键空间事件被多次触发。
我的解决方法是:给键过期后提醒的回调函数加锁,收到多个通知提醒,回调函数加锁后最终只会有一个执行,其他没有获得锁的回调不会执行,这样就避免了重复执行任务代码。

这里的实现方法在另一个文章:
https://blog.csdn.net/m0_46652188/article/details/130394484

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

redis 过期消息订阅实现(java实现) 的相关文章

随机推荐

  • 华为OD机试真题-查找充电设备组合【2023Q1】【JAVA、Python、C++】

    题目描述 某个充电站 可提供n个充电设备 每个充电设备均有对应的输出功率 任意个充电设备组合的输出功率总和 均构成功率集合P的1个元素 功率集合P的最优元素 表示最接近充电站最大输出功率p max的元素 输入描述 输入为3行 第1行为充电设
  • 时序预测

    时序预测 MATLAB实现Bayes贝叶斯优化LSTM 长短期记忆神经网络 时间序列预测 预测效果一览
  • React - Websocket

    组件didMount调用 Store createWebSocket Math random Store url ws window backend server slice 7 apronMapWebsocket 这个要与后端提供的相同
  • C++函数重载、重写与重定义

    演示代码 include
  • 探索Java8——CompletableFuture: 组合式异步编程

    文章目录 Future接口 Future接口的局限性 使用 CompletableFuture 使用并行流对请求进行并行操作 使用 CompletableFuture 发起异步请求 如果你的意图是实现并发 而非并行 或者你的主要目标是在同一
  • https到底是如何防篡改的

    1 前言 https是一个老生常谈的话题了 也是面试过程种经常甚至必然会问到的一个问题 但当问到https为什么安全的时候 很多人的回答就是简单的回一句 因为他加密了 然后就没然后了 你也相当于啥都没回答出来 2 我为什么要写这篇文章呢 网
  • select底部增加固定按钮

  • 基于SSM的校园快递一站式服务系统设计与实现

    末尾获取源码 开发语言 Java Java开发工具 JDK1 8 后端框架 SSM 前端 采用JSP技术开发 数据库 MySQL5 7和Navicat管理工具结合 服务器 Tomcat8 5 开发软件 IDEA Eclipse 是否Mave
  • 统计学R语言 第五章课后练习 置信区间

    5 1 计算一个总体均值的置信区间 大样本 gt exercise5 1 lt read csv D 289250 统计学 基于R 第4版 例题和习题数据 统计学 基于R 第4版 例题和习题数据 公开资源 exercise chap05 e
  • 用python网络爬虫爬取英雄联盟英雄图片

    用python爬虫爬取lol皮肤 这也用python网络爬虫爬取lol英雄皮肤 忘了是看哪个大神的博客 由于当时学了下就一直放在这儿 现在又才拿出来 再加上马上要考二级挺忙的 代码基本上是没改 还望大神原谅 本人小白 没学过Python 只
  • Pay Cycle related record

    select delete from sysadm ps pycycl stat where pay cycle ACH and pay cycle seq num gt 30 select delete from sysadm ps PY
  • Golang初入编程-踩坑笔记(3)- 并发,优雅关闭

    主函数也是线程 在不使用sync WaitGroup的情况下 需要main等待goroutine完成 不然main完成就没了 sync是synchronizing 使 同步 的缩写 wg done 最好加defer chan需要make c
  • Sqli-labs之Less-26和Less-26a

    Less 26 GET 基于错误 您所有的空格和注释都属于我们 根据提示我们知道这一关过滤了空格和注释 实际上过滤的远远不止这些 我们来看下源码 可以确认一下 id 231 确认过滤了 id or1 确认过滤了or id 1 确认过滤了多行
  • C语言 fstat函数

    系列文章目录 文章目录 系列文章目录 前言 一 stat系统调用 二 fstat 1 功能 2 相关函数 3 头文件 4 函数声明 5 描述 6 返回值 7 例子 三 struct stat结构体 前言 一 stat系统调用 stat系统调
  • Linux学习之expect操作详解

    一 expect安装介绍 1 expect命令安装 安装语句 yum install expect 2 expect命令含义 expect是一种脚本语言 它能够代替人工实现与终端的交互 主要应用于执行命令和程序时 系统以交互形式要求输入指定
  • Python全栈开发【基础-03】编程语言的分类

    专栏介绍 本专栏为Python全栈开发系列文章 技术包括Python基础 函数 文件 面向对象 网络编程 并发编程 MySQL数据库 HTML JavaScript CSS JQuery bootstrap WSGI Django Flas
  • usb鼠标驱动(一)

    Linux USB 鼠标驱动程序详解 注册一个usb driver 这个drvier不是usb设备driver 而是接口driver use a define to avoid include chaining to get THIS MO
  • 微信小程序消息订阅Java

    前言 编写日期 2022 11 04 写这篇文章原因 公司给政府做一个订餐系统 需要在员工在小程序上发起订餐后经过部门领导和书记的审批后 再由食堂确认订餐结果 在订餐审批单在各个节点流转的过程中 需要给每一个节点的审批人发送微信订阅消息和手
  • 【目标检测】目标检测的评价指标(七个)

    目录 目标检测的评价指标 一 正样本与负样本 二 真正 TP 假正 FP 真负 TN 假负 FN 1 正确的正向预测 True Positive TP 正样本被正确检测的数量 2 错误的正向预测 False Positive FP 3 错误
  • redis 过期消息订阅实现(java实现)

    一 开启redis消息通知功能 方法1 修改conf文件 编辑 etc redis redis conf文件 添加或启用以下内容 key过期通知 notify keyspace events Ex 方法2 使用命令 登陆redis cli