SpringBoot+redis实现消息队列(发布/订阅)

2023-10-28

1.引入依赖

<!-- 整合redis -->
<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
      <version>3.0.0</version>
</dependency>

2.添加redis配置

spring:
  #缓存服务
  redis:
    client-name: redis
    host: ip
    port: 6379
    password: you password
    timeout: 3000
    jedis:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0

3.redistemplate配置

package com.hhmt.delivery.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * 辉煌明天
 * FileName: RedisConfiguration
 * Author:   huachun
 * email: huachun_w@163.com
 * Date:     2021/11/5 15:57
 * Description: Redis配置类
 */
@Configuration
public class RedisConfiguration {

    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();

        // 配置连接工厂
        template.setConnectionFactory(factory);

        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();


        template.setKeySerializer(stringRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

}

4.消息发布

使用 StringRedisTemplate或者RedisTemplate的convertAndSend(channel, message)方法即可,

其中channel代表消息信道也可以理解为主题,message表示发布的内容。

消息发布类 ,模拟service层的逻辑处理

package com.hhmt.delivery.aop;

import com.alibaba.fastjson.JSON;
import com.hhmt.delivery.model.ServiceMessageInfo;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

/**
 * @author huachun
 * @version 1.0
 * @description: 拦截持久化操作推送消息切面
 * @email huachun_w@163.com
 * @date 2023-02-03 14:18
 */
@Aspect
@Component
@Slf4j
public class ServiceMessageAop {

    @Autowired
    private RedisTemplate redisTemplate;

    public void doAfter(String channel, String message) {
        redisTemplate.convertAndSend("mq_01", message);
        redisTemplate.convertAndSend("mq_02", "hello");
        redisTemplate.convertAndSend("mq_03", "mq");
    }

}

5.消息接受,处理业务

消息监听注册配置,把消息监听注册到容器里面

package com.hhmt.delivery.config;

import com.hhmt.delivery.mq.MessageReceiver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/**
 * @author huachun
 * @version 1.0
 * @description: TODO
 * @email huachun_w@163.com
 * @date 2023-02-03 17:53
 */
@Configuration
public class RedisConsumeConfig {

    /**
     * 注入消息监听容器
     *
     * @param connectionFactory 连接工厂
     * @param listenerAdapter   监听处理器1
     * @param listenerAdapter   监听处理器2 (参数名称需和监听处理器的方法名称一致,因为@Bean注解默认注入的id就是方法名称)
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter,
                                            MessageListenerAdapter listenerAdapter2) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅一个叫mq_01 的信道
        container.addMessageListener(listenerAdapter, new PatternTopic("mq_01"));
        //订阅一个叫mq_02 的信道
        container.addMessageListener(listenerAdapter2, new PatternTopic("mq_02"));
        //这个container 可以添加多个 messageListener
        return container;
    }

    /**
     * 消息监听处理器1
     *
     * @param messageReceiver 处理器类
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter(MessageReceiver messageReceiver) {
        //给messageListenerAdapter 传入一个消息接收的处理器,利用反射的方法调用“receiveMessage”
        return new MessageListenerAdapter(messageReceiver, "receiveMessage"); //receiveMessage:接收消息的方法名称
    }

    /**
     * 消息监听处理器2
     *
     * @param receiver 处理器类
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter2(MessageReceiver receiver) {
        //给messageListenerAdapter 传入一个消息接收的处理器,利用反射的方法调用“receiveMessage2”
        return new MessageListenerAdapter(receiver, "receiveMessage2"); //receiveMessage:接收消息的方法名称
    }
}

6.自定义消息处理器

package com.hhmt.delivery.mq;

import org.springframework.stereotype.Component;

/**
 * @author huachun
 * @version 1.0
 * @description: TODO
 * @email huachun_w@163.com
 * @date 2023-02-03 18:08
 */
@Component
public class MessageReceiver {

    /**
     * 接收消息的方法1
     **/
    public void receiveMessage(String message) {
        System.out.println("receiveMessage接收到的消息:" + message);
    }

    /**
     * 接收消息的方法2
     **/
    public void receiveMessage2(String message) {
        System.out.println("receiveMessage2接收到的消息:" + message);
    }
}

第二种方式:

通过上面方式虽然可以发布和订阅消息,但是消息的内容局限与String类型,有时候我们需要发布自定义类型的数据,需要用到下面这种方式

1.更改容器配置

package com.hhmt.delivery.config;

import com.hhmt.delivery.mq.RedisConsumerSubscribe;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/**
 * 辉煌明天
 * FileName: RedisConfig
 * Author:   huachun
 * email: huachun_w@163.com
 * Date:     2021/11/15 17:19
 * Description: redis配置类
 */
@Configuration
public class RedisConfig {

    @Value("${spring.profiles.active}")
    private String env;

    @Autowired
    private RedisConsumerSubscribe redisConsumerSubscribe;

    /**
     * @Description: redis消息监听配置
     * @Author: huachun
     * @Date: 2021/11/15 17:20
     * @return: org.springframework.data.redis.listener.RedisMessageListenerContainer
     **/
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        //订阅频道,通配符*表示任意多个占位符
        container.addMessageListener(redisConsumerSubscribe, new PatternTopic("ocpx"));
        return container;
    }

    @Bean
    public MessageListenerAdapter listenerAdapter() {
        return new MessageListenerAdapter(redisConsumerSubscribe);
    }
}

2.更改redis配置

@Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
        template.setConnectionFactory(factory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//        ObjectMapper om = new ObjectMapper();
//        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
//        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
//        jackson2JsonRedisSerializer.setObjectMapper(om);
        jackson2JsonRedisSerializer.setObjectMapper(ObjectMapperConfig.objectMapper);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        template.setKeySerializer(stringRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

3.消息接受与数据转换

package com.hhmt.delivery.mq;

import com.hhmt.delivery.config.ObjectMapperConfig;
import com.hhmt.delivery.model.ServiceMessage;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

/**
 * @author huachun
 * @version 1.0
 * @description: TODO
 * @email huachun_w@163.com
 * @date 2023-04-21 14:32
 */
public class RedisConsumerSubscribe implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] bytes) {

        Jackson2JsonRedisSerializer<ServiceMessage> jacksonSerializer = new Jackson2JsonRedisSerializer<>(ServiceMessage.class);
        jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper);
        ServiceMessage message1 = jacksonSerializer.deserialize(message.getBody());
        System.out.println("ServiceMessage:" + message1);

        System.out.println("订阅频道:" + new String(message.getChannel()));
        System.out.println("接收数据:" + new String(message.getBody()));
    }
}

原文参考:springboot:整合redis之消息队列_springboot redis 队列_yololee_的博客-CSDN博客

原文参考:Redis 消息订阅(MessageListener接口) - Spring Data Redis 教程

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SpringBoot+redis实现消息队列(发布/订阅) 的相关文章

随机推荐

  • 跳过交货单ATP CHECK的方法

    物料启用ATP CHECK会影响到交货 对于那些启用了ATP CHECK而又没有做合理规划的系统 想要搞清楚ATP是如何计算的 我只能说呵呵 你会发现批次拆分 过账的BAPI会有莫名其妙的 报错 搞不清楚库存里明明有东西为什么还不让交货 这
  • 【深度学习】数据集打标签:生成train.txt和val.txt

    当我们在Github上下载一篇论文的代码后 我们如何在自己的数据集上进行复现呢 准备自己的数据集 这是在百度爬的十分类的服装数据集 其中train文件夹下每类大概300张 val文件夹下每类大概100张 总共在4000张左右 设置目录 我们
  • mysql二进制文件下载教程_mysql 二进制文件增量备份

    1 首先在my cnf下添加二进制文件路径 windows下文件名称为my ini 在 mysqld 下添加 log bin mysql bin 2 centos下默认安装mysql 5 6 数据默认文件夹下为 var lib mysql
  • (新手向)在matlab中运用SMOTE和前馈神经网络对wilt(枯萎)数据集进行机器学习

    目录 一 概述 二 数据集描述 三 方法 数据预处理 SMOTE算法 Feed forward网络 四 结果 后记 2021年5月 一 概述 近日 有位同学因为搞不懂matlab中的神经网络来问我怎么做 我说你把数据集发来给我看看 我稍微一
  • apt安装包报错解决办法:下列软件包有未满足的依赖关系,依赖。。。但是。。。正要被安装

    apt安装包报错解决办法 下列软件包有未满足的依赖关系 依赖 但是 正要被安装 文章目录 apt安装包报错解决办法 下列软件包有未满足的依赖关系 依赖 但是 正要被安装 几种可能的情况 1 镜像源版本代码的问题 1 查看版本代码 2 修改镜
  • Redis设计与实现---Sentinel

    Sentinel Redis的高可用性解决方案 由一个或多个Sentinel实例组成的系统可以监视任意多个主服务器 以及这些主服务器属下的所有从服务器 并在被监视的主服务器进入下线状态时 自动将下线主服务器属下的某个从服务器升级为新的主服务
  • 考研复试数据库原理课后习题(十)——数据库恢复技术

    数据库恢复技术 1 事务是用户定义的一个数据库操作序列 这些操作要么全做 要么不做 是一个不可分隔的工作单位 事务具有四个特性 ACID 原子性 一致性 隔离性 持续性 原子性 事务是数据库的逻辑工作单位 一个事务中包括的操作要么全做 要么
  • Swagger的常用配置

    一 可在项目中创建SwaggerConfig配置类 对文档详细信息进行配置 swagger配置类 用于配置swagger的详细信息 比如标题 网站 邮箱 Configuration public class SwaggerConfig 返回
  • linux 大量的TIME_WAIT解决办法

    原文地址 http www cnblogs com softidea p 6062147 html 统计在一台前端机上高峰时间TCP连接的情况 统计命令 netstat n awk tcp S NF END for a in S print
  • 计算机科学想象作文500,六年级想象作文600字

    第一篇 描写月亮的作文 你们听说过 超级月亮 吗 什么 没听说过 那么 我就带你们去看看前几天的超级月亮吧 今天 我像往常一样去广场滑滑板 忽然 我看一栋楼房的左 作文500字 未来的城市一场暴雨过后 阳光洒在充满绿意的城市 街道上没有积水
  • 09_Pandas从多个条件(AND,OR,NOT)中提取行

    09 Pandas从多个条件 AND OR NOT 中提取行 使用Pandas从多个条件 AND OR NOT 中提取行的方法 有以下2点需要注意 的使用 and or not的错误 使用比较运算符时 请将每个条件括在括号中 以下数据为例
  • 驱动名、设备名和设备文件名的关系

    编写一个驱动文件的时候生成一个name1 ko文件 这个name1就是驱动名 使用insmod name1 ko指令之后 用lsmod能看见一个名为name1的驱动 在调用了alloc chrdev region函数或register ch
  • 满分回答教你如何应对面试中项目经验这一难关

    给前端瓶子君加星标 提升前端技能 作者 亦逊 https juejin im post 5e7aed9c6fb9a07cac1d872d 前言 本篇文章的作者是来自阿里淘系用户增长前端团队的 亦逊 18年作为双非本科生通过层层面试 校招进入
  • CSS下划线与文字间距,下划线粗细以及下划线颜色的设置

    最开始的时候了解下划线的属性是 text decoration underline 1 但是 很遗憾的是 对于设计做的下划线用浏览器默认属性样式很难调整 使用这个属性并不能调整下划线与文字的间距 而且对于下划线的颜色也不好调整 而使用 u
  • 2014年仍然是DX11设备仿真和软引擎年

    先说点题外话 2014年1月比较浮躁 2月过年 回家后 与亲戚家的同龄人一比较 发现自己很废 不知道他们是不是在吹牛 想急功近利挣点钱 所以一度想进行OSG或者COCOS2DX 诚然 以后会进行这样的一种或两种 但是 今年不是时候 因为水平
  • MFC之创建插入符,写字,换行与退格11

    概述 我们按照前面文章根据向导创建项目 1 创建插入符 由于插入符是在创建窗口后并且做我们用户操作前需要使用 所以我们将插入符的创建放在OnCreate函数中即WM CRATE信号 int CInsertFuView OnCreate LP
  • reacthook的ref循环多个子组件

    父组件 ref值挂在这里 父子和兄弟都可以使用 const bodyRefs useRef
  • sklearn 随机森林(Random Forest)多分类问题

    模型 随机森林是集成学习算法的一种 sklearn更多的集成学习算法 RandomForestClassifier 参数详解 重要的参数有基分类器的个数 n estimators 特征选择算法 critirion 单个决策树的最大深度 ma
  • 使用wps2019快速翻译视频文字

    问题 视频中的英文如何翻译 如图 方法如下 1 使用wps 2019 截屏工具截取屏幕 2 使用 翻译文字 3 结果
  • SpringBoot+redis实现消息队列(发布/订阅)

    1 引入依赖