RabbitMQ:work结构

2023-11-08

> 只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange

>  消费者指定Qoa和手动ack

生产者

package com.qf.mq2302.work;

import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {
    public static final String QUEUE_NAME="work";

    public static void main(String[] args) throws Exception {

        //1.获取连接对象
        Connection conn = MQUtils.getConnection();

        //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
        Channel channel = conn.createChannel();

        //3.声明了一个队列
        /**
         * queue – the name of the queue
         * durable – true代表创建的队列是持久化的(当mq重启后,该对立依然存在)
         * exclusive – 该队列是不是排他的 (该对立是否只能由当前创建该队列的连接使用)
         * autoDelete – 该队列是否可以被mq服务器自动删除
         * arguments – 队列的其他参数,可以为null
         */
     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello doubleasdasda!";

        //生产者如何发送消息,使用下面的方法即可
        /**
         * exchange – 交换机的名字 ,如果是空串,说明是把消息发给了默认交换机
         * routingKey – 路由的key,当发送消息给默认交换机时,routingkey代表队列的名字
         * other properties - 消息的其他属性,可以为null
         * body – 消息的内容,注意,要是有 字节数组
         */
        for (int i = 0; i < 21; i++) {
            channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes());
        }
        System.out.println(" [x] Sent '" + message + "'");

        //关闭资源
        channel.close();
        conn.close();
    }
}

消费者一

package com.qf.mq2302.work;

import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;

public class Recv {
   private  final  static  String QUEUE_NAME="work";

    public static void main(String[] args) throws Exception {
        //1.获取连接对象
        Connection conn = MQUtils.getConnection();

        //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
        Channel channel = conn.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中
        DeliverCallback deliverCallback =new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {

            //从Delivery对象中可以获取到生产者,发送的消息的字节数组
                byte[] body = message.getBody();
                String msg = new String(body, "utf-8");

                try {
                    Thread.sleep(400);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                //在这里写消费者的业务逻辑,例如,发送邮件
                System.out.println("消费者01:"+msg);


                //手动ack
                //从message对象中取
                long deliveryTag = message.getEnvelope().getDeliveryTag();
                /**
                 * 第一个参数:消息编号
                 * 第二个参数: false,代表只确认这一个消息
                 */
                channel.basicAck(deliveryTag,false);
            }
        };

        //设置该消费者,每次只能从mq中获取一条消息
        channel.basicQos(1);
        //4.让当前消费者开始消费(QUEUE_NAME)队列中的消息
        /**
          *把消费者的确认模式,设置为 手动 ack
         *
         */
      channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag -> {});





    }





}

消费者二

package com.qf.mq2302.work;

import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;

public class Recv02 {
   private  final  static  String QUEUE_NAME="work";

    public static void main(String[] args) throws Exception {
        //1.获取连接对象
        Connection conn = MQUtils.getConnection();

        //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
        Channel channel = conn.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中
        DeliverCallback deliverCallback =new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {

            //从Delivery对象中可以获取到生产者,发送的消息的字节数组
                byte[] body = message.getBody();
                String msg = new String(body, "utf-8");

                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                //在这里写消费者的业务逻辑,例如,发送邮件
                System.out.println("消费者02:"+msg);


                long deliveryTag = message.getEnvelope().getDeliveryTag();
                channel.basicAck(deliveryTag,false);
            }
        };
        //注意:这个是可以存三个,而不是一次发三个
        channel.basicQos(3);
        //4.让当前消费者开始消费(QUEUE_NAME)队列中的消息
        /**
         * queue – the name of the queue
         * autoAck – true 代表当前消费者是不是自动确认模式。true代表自动确认。
         * deliverCallback – 当有消息发送给该消费者时,消费者如何处理消息的逻辑
         * cancelCallback – 当消费者被取消掉时,如果要执行代码,写到这里
         */
      channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag -> {});





    }





}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ:work结构 的相关文章

随机推荐

  • python : Numpy: numpy.c_

    numpy c numpy c
  • BigDecimal 除法运算提示:java.lang.ArithmeticException: Non-terminating decimal expansion; no exact repres

    业务场景 今天在计算重点工程总数占比工程总数 百分比 的时候 遇到一个错误 java lang ArithmeticException Non terminating decimal expansion no exact repres 异常
  • lambda函数

    文章目录 一 定义 二 格式 一 定义 lambda函数是一种匿名函数 是一种通过单个语句生成函数的方式 其结果是返回值 使用lambda关键字定义 该关键字仅表达 我们声明一个匿名函数 的意思 二 格式 冒号前是参数 可以有多个 用逗号隔
  • flex-wrap 后内容高度被撑开

    问题 布局时出现换行后 高度异常 box height calc 100vh 100px background fff overflow auto padding 76px 0 0 17px display flex flex wrap w
  • MySQL中的锁

    数据库中的锁 锁分类 按锁的粒度划分 表级锁 行级锁 页级锁 按锁级别划分 共享锁 排它锁 意向锁 按加锁方式划分 自动锁 显示锁 按使用方式划分 乐观锁 悲观锁 MySQL中的行级锁 表级锁和页级锁 行级锁 行级锁分为共享锁和排他锁 行级
  • 经典小题目2(进制转换,递归真是个好东西啊!C语言)

    递归真是个好东西 进制转换 十进制 gt R进制 除基取余 倒序排列 1 十进制转二进制 十进制转二进制 void convert1 int n if n 0 return else convert1 n 2 printf d n 2 2
  • 【AI面试】CrossEntropy Loss 、Balanced Cross Entropy、 Dice Loss 和 Focal Loss 分类损失横评

    在实际任务中 数据的不均衡一直是深度学习领域一个不可忽略的问题 常说的长尾效应 说的就是这个问题 少而多的误差 最终造成的结果 是不容忽视的 长尾效应 在正态分布中 曲线中间凸起的是 头 两边相对平缓的部分叫做 尾 对于绝大部分的需求来说
  • 测试工程师面试之设计测试用例

    以下的问题答案 仅供参考 如小伙伴们有更好的答案 欢迎大家评论区留言 谢谢大家 测试工程师面试之设计测试用例 1 请说一说简单用户界面登陆过程都需要做哪些分析 2 请对此系统设计测试用例 一个系统 多个摄像头 抓拍车牌 识别车牌 上传网上
  • 【基于Proteus 8 Professional和Keil uVision5简单共阴极数码管点亮】

    1 前面的一些Keil uVision5环境搭建具体的操作我已经省略 可以参照我前面写的博客 2 Main c代码 include stm32f10x h uint16 t table 0x3f 0x06 0x5b 0x4f 0x66 0x
  • 模拟电路设计(13)--- 振荡器电路原理简介

    概述 所谓振荡器电路就是一种在没有外界输入信号的情况下能自行产生周期性交变信号输出的电子电路 可以作为信号源 定时源 能量变换电路 频谱变换电路等等 普遍应用于通信电子系统 振荡器的种类很多 按原理分 反馈振荡器和负阻振荡器 按输出频率分
  • JavaBean转有序的Json字符串

    前言 随着国密算法普及 接口json加签传输对字段顺序有要求 处理代码 public static void main String args throws IllegalAccessException User user new User
  • chatgpt配合xmind制作思维导图

    原理 xmind支持将markdown文件转化成思维导图的形式 提示词 我将提供以下文章 请帮我使用Xmind工具创建一个 的思维导图 其中包含多个主题和子主题 以及叶子节点 请你提供一些Markdown格式的文本 以便与Xmind兼容 在
  • Oracle监控的关键指标(一)

    先收集一些Oracle的关键指标 最近有空的话再考虑将一些比较有代表性的监控点进行指标化 指标化的数据在通过python脚本对进行性能上监控 最终部署在目前维护的oracle数据库上 0 找使用CPU多的用户session select a
  • WSL安装教程

    wsl安装教程 引言 前期准备工作 安装wsl 第一步 第二步 检测系统版本 第三步 确定虚拟机特性 第四步 下载Linux内核的更新包 第五步 设置WSL 2作为默认版本 第六步 选择Linux发行版本并设置Linux账号 小TIPS 引
  • CocoaPods导入第三方库,提示找不到头文件的解决方法

    最近一直在了解MVVM架构模式 也知道了ReactiveCocoa框架对MVVM实现的便利与优雅 但是CocoaPods导入ReactiveCocoa框架后 却出现一个问题 就是引入头文件的时候说找不到头文件 如下图 解决方法如下 1 找到
  • Fragment详解

    Fragment有自己的生命周期 Fragment依赖于Activity Fragment通过getActivity 可以获取所在的Activity Activity通过FragmentManager的findFragmentById 或f
  • QT5.15以及QT VS TOOL安装教程

    QT5 15以及QT VS TOOL安装教程 1 QT5 15下载安装教程 点击这个链接 https download qt io 在official release online installers目录下选择exe文件下载windows
  • vue重新进页面重新加载mounted或者created中的内容

    vue的项目中 如果再次进入当前页面需要重新加载mounted方法可以使用 activated 这个方法内就可以执行需要进入页面重新加载的方法来替代mounted或者created方法 这样就可以满足不重新加载页面就可以直接将方法重新执行一
  • 打印机错误0x00000bc4的解决办法

    共享打印机在使用过程中难免会出现一些问题 比如连接共享打印机错误 提示代码0x00000bc4 这该如何解决 共享打印机出现问题是件非常麻烦的事 下面就来看看小编整理的解决办法吧 Win11连接共享打印机错误0x00000bc4解决方法 1
  • RabbitMQ:work结构

    gt 只需要在消费者端 添加Qos能力以及更改为手动ack即可让消费者 根据自己的能力去消费指定的消息 而不是默认情况下由RabbitMQ平均分配了 生产者不变 正常发布消息到默认的exchange gt 消费者指定Qoa和手动ack 生产