SpringBoot使用Rabbit多消费者削峰

2023-11-10

场景

前端系统推送大批量数据进入我方系统进行处理, 为了减轻我方系统的压力, 并且充分发挥服务器的性能, 提高处理效率, 于是使用 Rabbit 做了限流处理, 同时有多线程运行多个消费者处理任务, 来提高效率

配置

Rabbit配置类, 其余的基础配置配置都维护在配置文件或者配置中心


/***
 * Rabbit配置类
 * @author yanqiang.jiang
 * @version 1.0
 * @date 2019/08/26
 **/

@Configuration
@Slf4j
public class RabbitConfig {


    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    /**
     * 数据队列
     *
     * @return 队列
     */
    @Bean
    public Queue accoflowHs() {
        return new Queue("testQueue");
    }


    /**
     * 单一消费者
     *
     * @return SimpleRabbitListenerContainerFactory
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }

    /**
     * 多个消费者
     *
     * @return SimpleRabbitListenerContainerFactory
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        // 多消费者进行手工确认
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 默认消费者数量
        factory.setConcurrentConsumers(10);
        // 最大消费者数量
        factory.setMaxConcurrentConsumers(15);
        // 最大投递数
        factory.setPrefetchCount(5);
        return factory;
    }

    /**
     * RabbitTemplate 配置
     *
     * @return RabbitTemplate
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
                    log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                }
        );
        rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
                    log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
                }
        );
        return rabbitTemplate;
    }


}

此处注意点:
factory.setMessageConverter(new Jackson2JsonMessageConverter()); 发送的消息将会使用它来序列化
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); 必须开启手动确认模式
factory.setConcurrentConsumers(10);factory.setMaxConcurrentConsumers(15); 这个表示消费者的数量, 也就是消费多线程运行的线程数目.
factory.setPrefetchCount(5); 每次取的消息的数目, 数目大效率高, 但是顺序越得不到保证

生产者发送消息

/***
 * rabbit 生产者
 * @author yanqiang.jiang
 * @version 1.0
 * @date 2019/08/26
 **/
@Component
@Slf4j
public class AccoflowHsProducer {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    /**
     * 发送信息
     *
     * @param batchNum 流水号
     */
    public void stringSend(String batchNum) {
        log.info("消息队列:{},发布消息:{}", "testQueue", batchNum);
        // 第一个参数为刚刚定义的队列名称
        this.rabbitTemplate.convertAndSend("testQueue",
                MessageBuilder.withBody(batchNum.getBytes(StandardCharsets.UTF_8)).build());
    }
}

消费者处理消息

/***
 * rabbit 消费者
 * @author yanqiang.jiang
 * @version 1.0
 * @date 2019/08/26
 **/
@Component
@Slf4j
public class AccoflowHsConsumer {

    @Autowired
    private AccoflowInfHsHandler accoflowInfHsHandler;

    /**
     * 监听消费用户日志
     *
     * @param msg 消息
     */
    @RabbitListener(queues = "testQueue" containerFactory = "multiListenerContainer")
    public void recievedString(Message msg, Channel channel) throws Exception {
        try {
            log.info("来源事务落地通用消费者{}收到消息", channel.getChannelNumber());
            PaTransactionTask task = JSON.parseObject(msg.getBody(), PaTransactionTask.class);
            log.info("来源事务落地通用消费者{}解析消息:{}", channel.getChannelNumber(), task.getPlanControlId());
            // 这里最好添加重复执行判断
            // 处理来源事务
            runner.excuteTransaction(task);
        } catch (Exception e) {
            log.info("来源事务落地通用消费者{}出错", channel.getChannelNumber());
            e.printStackTrace();
        }
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);
        log.info("来源事务落地通用消费者{}确认消", channel.getChannelNumber());
    }
}

注意:
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true); 处理完毕后一定要确认消息, 不然不会继续处理下个消息. 同时考虑异常的情况,也要手工确认
采取手动确认后处理完成后才会确认,这里处理时间可能比较长, 这个时候消息超时server会向消费者再次发送消息, 所以这里建议添防重复处理。防止重复消费消息。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SpringBoot使用Rabbit多消费者削峰 的相关文章

  • Java中String类的isEmpty方法、null以及""的区别

    一直以来对String的这三个空挺晕的 刚好同事问我 我也学习下 从别人博客上看到的是这样的 isEmpty 分配了内存空间 值为空 是绝对的空 是一种有值 值 空 分配了内存空间 值为空字符串 是相对的空 是一种有值 值 空字串 null
  • 交换两个变量的值+int*[]与int(*)[]的辨析

    交换两个变量的值 不使用第三个变量 即 a 3 b 5 交换之后 a 5 b 3 有两种解法 一种用算术算法 一种用 异或 a a b b a b a a b or a a b 只能对 int char b a b a a b or a b
  • 这个效果是怎么做的?

    http higkoo i sohu com blog view 81445953 htm 想要那个红狐狸效果
  • 【论文阅读】MPViT : Multi-Path Vision Transformer for Dense Prediction

    发表年份 2021 12 发表单位 Electronics and Telecommunications Research Institute ETRI South Korea 期刊 会议 CVPR 论文链接 https arxiv org
  • Servlet_01_介绍

    Servlet的作用是处理请求 服务器会把接收到的请求交给Servlet来处理 在Servlet中通常需要 1 接收请求数据 2 处理请求 3 完成响应 例如客户端发出登录请求 或者输出注册请求 这些请求都应该由Servlet来完成处理 S
  • ubuntu下学习c++(输出定义变量)

    2022 6 11 学习cout cin的使用方法以及变量的定义 include
  • 接口和类有啥区别:

    接口和类有啥区别 接口是一系列抽象方法的集合 接口中只有抽象方法 只有的意思就是 没有成员变量 除了静态常量 没有构造方法 因此不能被实例化 类只是一种抽象的数据类型 接口没有构造方法 一个类只能继承一个类 但是可以实现多个接口 接口中不能
  • 远处是风景,近处是人生

    常听到有人说 最近好累 想出去走走 换个地方透透气 如果去到大理 心情一定如花儿一般灿烂 如果去到内蒙 心情一定如草原一般辽阔 如果去到三亚 心情一定如大海一般坦荡 但实际上 改变你心情的从不是新的城市或者美丽的景色 是你愿意与生活和解的态
  • 网络常考题

    45 当数据接收者不能处理更多数据时 哪一层发出停止信息给发送者 A 网络层 B 传输层 C 会话层 D 表示层 B 49 在传输层采用了以下哪些方法来保证接收缓冲区不溢出 多选 A 数据分段 B 确认机制 C 流量控制 D 滑动窗口 E
  • Spring中@JsonFormat与@DateTimeFormat注解使用简介说明

    转自 Spring中 JsonFormat与 DateTimeFormat注解使用简介说明 下文笔者讲述Spring中JsonFormat和DateTimeFormat注解的简介说明 如下所示 JsonFormat和DateTimeForm
  • C语言编译执行的全过程

    编译 编译程序读取源程序 字符流 对之进行词法和语法的分析 将高级语言指令转换为功能等效的汇编代码 再由汇编程序转换为机器语言 并且按照操作系统对可执行文件格式的要求链接生成可执行程序 C源程序头文件 gt 预编译处理 cpp gt 编译程
  • 高光谱遥感数值建模技术及在植被、水体、土壤信息提取领域应用

    在高光谱影像中 结合纹理 表面粒度 风化程度 作物密度等辅助信息 能估计出多种地物及其上覆作物的状态参量 提高遥感高定量分析的精度和可靠性 如何通过构建遥感光谱反射信号与地表参数之间的关系模型来实现数值计算 是举办本次培训班的主要目的 针对
  • 校园网络系统服务器配置摘要,校园网网络应用服务器配置

    浅谈校园网网络应用服务器配置 摘要 以某校园网为例 在网络应用服务器设计这个环节中 我们分别用到了web服务器 ftp服务器 dns服务器 dhcp服务器 mail服务器 并且在一台已经安装了windows 2003 server的计算机上
  • 前端框架——React 学习总结,这篇7000字全解决

    React组件复用 React组件复用 把多个组件中部分功能相似或者相同的状态或者逻辑进行复用 复用 state和操作state的方法 复用的方式 render props模式 高阶组件 HOC render props模式 用childr
  • Mysql 时间转换 && 时间函数

    1 时间转换 涉及的函数 DATE FORMAT date format MySQL日期格式化函数 STR TO DATE str format MySQL字符串格式化为日期 UNIX TIMESTAMP MySQL其他数据转换为时间戳 F
  • Vue的antd多选下拉框增加全选操作

    因为antd的多选下拉框没有提供全选操作 我做了一个简易的全选操作 data return categoryList 存放获取到的分选数据 category 已选分类数据

随机推荐

  • QT信号槽的5种连接方式

    在面试中 这是一个经常被问到的问题点 也是刚刚上qt的工程师不会去注意的一个点 qt源代码定义的连接方式如下 1 Qt AutoConnection 一般信号槽不会写第五个参数 其实使用的默认值 使用这个值则连接类型会在信号发送时决定 如果
  • markdown编辑数学公式

    在输入数学公式的时候 需要在数学公式的前后加入 符号 将需要输入的公式加入到 中间 上下标 上标 下标 名称 数学表达式 markdown公式 上标 ab a b a b 下标 ab a b a b 分数 frac 第一个 写分子 第二个
  • React Native(RN)-组件生命周期

    生命周期简介 像 Android 开发一样 React Native RN 中的组件也有生命周期 Lifecycle 借用大神流程图 这张图很简洁直观地告诉我们 生命周期的整个流程以及阶段划分 第一阶段 getDefaultProps gt
  • 目标检测入门

    目录 R CNN 1 1提取候选区域 1 1 1合并规则 1 1 2多样化与后处理 1 2特征提取 1 2 1预处理 2 Fast RCNN 2 1RoI Pooling Layer Faster RCNN 结构 RPN anchor 目标
  • Junit中使用线程池不执行任务代码

    1 在test中使用线程池发送MQ 没有报错 没有执行线程池中的代码 2 查资料 junit框架只要主线程结束完成 单元测试就会关闭 导致线程池中的线程没有执行代码就被销毁关闭了 可以在主线程中sleep一段时间 或者用main方法
  • 稳定ORACLE的执行计划

    很多时候可能我们都希望CBO能够帮我们生成正确 高效的执行计划 但是很多时候事实并非如此 可能因为各种各样的原因 如 统计信息不正确或者CBO天生的缺陷等 都会导致生成的执行计划特别的低效 之前的一家公司有一台专门用于批量做数据校验清洗的数
  • 大学四年自学走来,这些私藏的实用工具/学习网站我贡献出来了

    大学四年 看课本是不可能一直看课本的了 对于学习 特别是自学 善于搜索网上的一些资源来辅助 还是非常有必要的 下面我就把这几年私藏的各种资源 网站贡献出来给你们 主要有 电子书搜索 实用工具 在线视频学习网站 非视频学习网站 软件下载 面试
  • 'vue-cli-service' 不是内部或外部命令,也不是可运行的程序 或批处理文件。

    vue时 报 vue cli service 不是内部或外部命令 也不是可运行的程序 或批处理文件 罪该万死 怎么能忘记 npm install 如果你下载的淘宝镜像 也可以cnpm install 转载于 https www cnblog
  • Java设计模式-状态模式

    1 概述 定义 对有状态的对象 把复杂的 判断逻辑 提取到不同的状态对象中 允许状态对象在其内部状态发生改变时改变其行为 例 通过按钮来控制一个电梯的状态 一个电梯有开门状态 关门状态 停止状态 运行状态 每一种状态改变 都有可能要根据其他
  • STM32F031串口(RS485)中断+DMA发送(预备知识)

    STM32F031串口 RS485 中断 DMA发送 前言 GPIO移植过程 与F1系列的一些区别 串口 DMA 前言 最近在搞STM32F031的项目 F0系列与常用的F1系列有一定区别 在开发过程中遇到一些问题 而且花了好长花间在搜寻解
  • js操作剪贴板讲解

    文章目录 复制 剪切 到剪贴板 Document execCommand Clipboard复制 Clipboard writeText Clipboard write copy cut事件 从剪贴板进行粘贴 document execCo
  • 【E2EL5】A Year in Computer Vision中关于图像增强系列部分

    http www themtank org a year in computer vision 部分中文翻译汇总 https blog csdn net chengyq116 article details 78660521 The M T
  • eclipse修改文字显示大小及html乱码修改编码格式

    1 修改字体大小 2 修改编码格式 html文件出现乱码时需要修改编码格式 备注 有时候修改后还会是乱码 重启eclipse即可
  • 2022年7月3日leetcode每日一题打卡——112.路径总和

    一 题目描述与要求 112 路径总和 力扣 LeetCode 题目描述 给你二叉树的根节点 root 和一个表示目标和的整数 targetSum 判断该树中是否存在 根节点到叶子节点 的路径 这条路径上所有节点值相加等于目标和 target
  • 基于YOLO-V5的结核杆菌目标检测系统【毕业设计,AI+医疗】

    项目背景 结核病 Tuberculosis TB 是由结核分枝杆菌 Mycobacterium tuberculosis 引起的一种慢性人畜共患病 它不受年龄 性别 种族 职业 地区的影响 人体许多器官 系统均可患结核病 其中以肺结核最为常
  • HBase Java 编程

    一 环境配置 1 引入Maven 库
  • JavaScript 中使用Ajax进行网络post请求和get请求

    博主前些天发现了一个巨牛的人工智能学习网站 通俗易懂 风趣幽默 忍不住也分享一下给大家 点击跳转到网站 前言 使用Ajax进行网络请求 默认是异步请求 而且不需要刷新页面 就可以发送请求 获取服务端返回来的数据 一 Ajax的get请求 做
  • apache kafka配置中request.required.acks含义

    Kafka producer的ack有3中机制 初始化producer时的producerconfig可以通过配置request required acks不同的值来实现 0 这意味着生产者producer不等待来自broker同步完成的确
  • 算法--大数开方

    之前已找到比较好的大数乘法算法 现在我们来解决大数开方问题 如有大数n 求其开方x 则x与n必满足x x n 也就是说我们能遍历x找到n的开方 但是问题在于我们是不可能对大数遍历的 如果我们可以确定它的大致范围 仅仅测试几个不容易直接判断的
  • SpringBoot使用Rabbit多消费者削峰

    文章目录 场景 配置 生产者发送消息 消费者处理消息 场景 前端系统推送大批量数据进入我方系统进行处理 为了减轻我方系统的压力 并且充分发挥服务器的性能 提高处理效率 于是使用 Rabbit 做了限流处理 同时有多线程运行多个消费者处理任务