3.rabbitmq轮询和不公平分发

2023-05-16

rabbitmq轮询和不公平分发

rabbitmq轮询分发

rabbitmq默认是使用轮询来分发消息的。测试代码如下所示

生产者代码

/**
 * 生产者 task  rabbitmq 轮询演示
 */
public class Task {

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitUtil.getChannel();
        channel.queueDeclare(QueueNames.HELLO,false,false,false,null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QueueNames.HELLO,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("task 发送消息完成,消息内容:"+message);
        }
    }

}

消费者1代码

/**
 * 消费者01
 */
public class Worker01 {

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitUtil.getChannel();
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String received = new String(message.getBody());
            System.out.println("接收到消息:"+received);
        };
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("worker01 消费者正在等待消费");
        channel.basicConsume(QueueNames.HELLO,true,deliverCallback,cancelCallback);
    }
}

消费者2代码

/**
 * 消费者02
 */
public class Worker02 {

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitUtil.getChannel();
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String received = new String(message.getBody());
            System.out.println("接收到消息:"+received);
        };
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(consumerTag+"消费者取消消费消息的回调接口");
        };
        System.out.println("worker02 等待接收消息");
        channel.basicConsume(QueueNames.HELLO,true,deliverCallback,cancelCallback);
    }
}

  • 启动生产者代码,多次输入消息。
  • 启动多个消费者代码,发现多个消费者依次接收消息,且每条消息只执行一次

不公平分发

如果消息是轮询分发,如果每个消费者执行消费代码的时长不同,处理速度不同,那么很容易造成执行速度快的消费者会闲置,而执行比较慢的消费者会一直处在高消耗的状态。这样显然是不合理的,为了解决这个问题,我们可以采用不公平分发

在消费者中指定prefetch的值

Integer prefetch = 10
channel.basicQos(prefetch);

该值定义通道上允许的未确认消息的最大数量,即我这条通道允许未确认的消息数量是10个,一旦你分配给我的消息数量达到上限,那么rabbitmq就停止在这条通道上传递更多消息,转而去给别的通道分配消息。

perfetch的取值是根据业务来具体测试的,如果取值过高,那么内存中未确认的消息数量会很多,徒增内存消耗,如果取值太低,那么系统吞吐量下降,用户体验会很差。

生产者代码

/**
 * 不公平分发测试
 */
public class UnfairProducer {

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitUtil.getChannel();
        channel.queueDeclare(QueueNames.UNFAIR,true,false,false,null);
        for (int i = 0; i < 100; i++) {
            String message = String.valueOf(i);
            channel.basicPublish("",QueueNames.UNFAIR, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发出消息:"+message);
        }
    }
}

消费者1代码

/**
 * 不公平消费者1
 */
public class UnfairConsumer1 {

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitUtil.getChannel();
        System.out.println("c1 等待接收消息处理时间较短");
        //指定prefetch的值
        channel.basicQos(10);
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String received = new String(message.getBody());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("c1 收到消息:"+received);
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消费者取消消费");
        };
        boolean autoAck = false;
        channel.basicConsume(QueueNames.UNFAIR,autoAck,deliverCallback,cancelCallback);
    }
}

消费者2代码

/**
 * 不公平消费者2
 */
public class UnfairConsumer2 {

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitUtil.getChannel();
        channel.basicQos(2);
        System.out.println("c2 等待接收消息处理时间较短");
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String received = new String(message.getBody());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("c2 收到消息:"+received);
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消费者取消消费");
        };
        Boolean autoAck = false;
        channel.basicConsume(QueueNames.UNFAIR,autoAck,deliverCallback,cancelCallback);
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

3.rabbitmq轮询和不公平分发 的相关文章

  • 使用RabbitMQ(Java客户端),有没有办法确定消费期间网络连接是否关闭?

    我在 RHEL 5 3 上使用 Java 客户端使用 RabbitMQ 我有 2 个节点 机器 Node1 正在使用 Java 帮助器类 QueueingConsumer 消费 Node2 上队列中的消息 QueueingConsumer
  • RabbitMQ 中的并发

    经过一周的编码和搜索论坛后 似乎是时候问 我有一个 C 应用程序 它使用 EventingBasicConsumer 处理 RabbitMQ 发送的消息 我想同时处理多个消息 因此我在同一连接上实例化了几个通道 本例中为 8 个 每个通道都
  • amqp 与 amqplib - 哪个 Node.js amqp 客户端库更好?

    这些 amqp 客户端库之间有什么区别 哪一款最值得推荐 主要区别是什么 我会推荐amqp node https github com squaremo amqp node and bramqp https github com bakke
  • 如何覆盖 MassTransit 默认交换和队列拓扑约定?

    正如 在我关于SO的一个问题中 所指出的 为什么 MassTransit 中的简单配置会创建 2 个队列和 3 个交换机 https stackoverflow com questions 56064182 why a simple con
  • 我应该在 Django 项目中使用 Celery 还是 Carrot?

    我有点困惑我应该使用哪一个 我认为两者都可以 但其中一个比另一个更好或更合适吗 http github com ask carrot tree master http github com ask carrot tree master ht
  • 如何触发 IModel.BasicAcks?

    我第一次使用 RabbitMQ 的 NET API 我想出了一个对我来说似乎合理的用例 我想创建发布消息并在消息被确认后执行某些操作的发布者 IModel BasicAcks 事件似乎是了解这一点的好方法 所以 我给出版商写了一封信 pri
  • Celery 3.0.1 中的框架错误

    我最近从 2 3 0 升级到 Celery 3 0 1 所有任务都运行良好 很遗憾 我经常收到 帧错误 异常 我还运行主管来重新启动线程 但由于这些线程从未真正被杀死 主管无法知道 celery 需要重新启动 有没有人见过这个 2012 0
  • 如何在 Node js 中保持分叉的子进程处于活动状态

    我想创建一个像带有node的foreverjs一样运行的rabbitmq cli 它可以生成 child process 并使其在后台运行 并且可以随时与 child process 进行通信 我面临的问题是 当主 cli 程序退出时 ch
  • 保持鼠兔 BlockingConnection 存活而不禁用心跳

    我正在使用 pika 0 10 0 和 python 2 7 版本开发 RabbitMQ 消费者 在我的消费者客户端中 我有一个根据输入消息运行一段时间的进程 时间可能从 3 到 40 分钟不等 我不想禁用心跳 相反 我正在寻找一些回滚机制
  • 如何使用自动装配的 Spring Boot 监听多个队列?

    我是 Spring Boot 的新手 正在尝试它 目前我已经构建了一些应用程序 我希望能够通过队列相互通信 我目前有一个侦听器对象 可以从特定队列接收消息 Configuration public class Listener final
  • Django、RabbitMQ 和 Celery - 为什么在我更新开发中的 Django 代码后,Celery 会运行旧版本的任务?

    所以我有一个 Django 应用程序 它偶尔会向 Celery 发送任务以进行异步执行 我发现 当我在开发中处理代码时 Django 开发服务器知道如何自动检测代码何时发生更改 然后重新启动服务器 以便我可以看到我的更改 然而 我的应用程序
  • Spring AMQP RabbitMQ 如何直接发送到Queue而不需要Exchange

    我正在使用 Spring AMQP 和 Rabbitmq 模板 如何直接将消息发送到队列而不使用Exchange 我该怎么做 我该怎么做 你不能 发布者不知道队列 只是交换和路由密钥 但是 所有队列都绑定到默认交换器 以队列名称作为其路由键
  • 在rabbitmq配置spring boot中在AMQP中配置多个Vhost

    我正在实现一个项目 我必须在rabbitmq中的不同虚拟主机之间发送消息 使用 SimpleRoutingConnectionFactory 但得到 java lang IllegalStateException 无法确定查找键的目标 Co
  • 如何在多租户系统中的 RabbitMQ 中使队列私有/安全?

    我已阅读开始使用 http www rabbitmq com getstarted htmlRabbitMQ 提供的指南 甚至还贡献了第六个示例暴风雨 amqp https github com paolo losi stormed amq
  • 使用 Celery(RabbitMQ、Django)检索队列长度

    我在 django 项目中使用 Celery 我的代理是 RabbitMQ 我想检索队列的长度 我浏览了 Celery 的代码 但没有找到执行此操作的工具 我在 stackoverflow 上发现了这个问题 从客户端检查 RabbitMQ
  • 如何使用 Celery、RabbitMQ 和 Django 确保每个用户的任务执行顺序?

    我正在运行 Django Celery 和 RabbitMQ 我想要实现的是确保与一个用户相关的任务按顺序执行 具体来说 一次执行一个 我不希望每个用户执行任务并发 每当为用户添加新任务时 它应该取决于最近添加的任务 如果此类型的任务已为此
  • 使用 RabbitMq 锁定和批量获取消息

    我正在尝试以一种更非常规的方式使用 RabbitMq 尽管此时我可以根据需要选择任何其他消息队列实现 消费者不会将 Rabbit 推送消息留给我的消费者 而是连接到一个队列并获取一批 N 条消息 在此期间它会消费一些消息 并可能拒绝一些消息
  • Amazon EC2 实例上和本地的 RabbitMQ?

    是否可以设置一个RabbitMQ服务器上的Amazon EC2 instance 并将我办公室的机器连接到此RabbitMQ服务器并向其发送 接收消息 我会被收取费用吗Amazon对于流入 流出我的带宽 消息RabbitMQ EC2 ins
  • 面向服务的架构 - AMQP 或 HTTP

    一点背景 非常大的整体 Django 应用程序 所有组件都使用相同的数据库 我们需要分离服务 以便我们可以独立升级系统的某些部分而不影响其余部分 我们使用 RabbitMQ 作为 Celery 的代理 现在我们有两个选择 使用 REST 接
  • 生产者/消费者的不同语言

    我想知道是否可以通过 AMQP 和 RabbitMQ 对生产者和消费者使用不同的语言 例如 Java 代表生产者 python php 代表消费者 或者反之亦然 是的 AMQP 与语言无关 这意味着只要您有可以连接到 AMQP 的客户端sa

随机推荐

  • git diff如何退出

    git diff 对比两次文件修改了什么 但如何退出呢 xff1f 按q即可
  • 数据结构,计算机网络,数据库,计算机组成原理,操作系统有哪些好的网课值得推荐?

    大家好 xff0c 我是小林哥 作为自学CS过来的老学长 xff0c 看过中国mooc b站 网易云课堂很多视频 xff0c 期间踩了不少坑 xff0c 这次掏心掏肺前来跟分享下 xff0c 网上的资源是免费的 xff0c 但是找到质量好的
  • MATLAB中im2bw函数-将图像转换为二值图像

    matlab中DIP工具箱函数im2bw使用阈值 xff08 threshold xff09 变换法把灰度图像 xff08 grayscale image xff09 转换成二值图像 所谓二值图像 xff0c 一般意义上是指只有纯黑 xff
  • Ubuntu18.04使用RPLIDAR A2M12雷达出错的解决办法

    最近领导要我用A2M12雷达搞SLAM xff0c 但是用电脑连上这个雷达捣鼓了两三天才能够拿到数据 就把踩的坑记录一下 软硬件平台 Nvidia Jetson Nano xff08 4GB版本的 xff09 Ubuntu 18 04 报错
  • workerman 连接失败可能的原因

    刚开始使用workerman时很常见的一个问题是客户端连接服务端失败 原因一般如下 xff1a 1 服务器防火墙 包括云服务器安全组 阻止了连接 xff08 50 几率是这个 xff09 2 客户端和服务端使用的协议不一致 xff08 30
  • 排序算法:冒泡排序和选择排序的思路,区别与优缺点。

    一 xff0c 冒泡排序 xff1a 冒泡排序的定义就不提了 xff0c 总结起来就一句话 xff08 划重点 xff09 xff1a xff0c 从左到右 xff0c 数组中相邻的两个元素进行比较 xff0c 将较大的放到后面 算法思路
  • ROS创建功能包并自定义消息

    ROS有时需要自定义消息 xff0c 本文叙述如何通过创建功能包并自定义消息 创建ROS工作空间具体实现 xff1a https blog csdn net qq 34911636 article details 100103448 创建一
  • 卡尔曼滤波详细推导

    卡尔曼滤波 xff08 Kalman filtering xff09 是一种利用线性系统状态方程 xff0c 通过系统输入输出观测数据 xff0c 对系统状态进行最优估计的算法 xff0c 由于观测数据中包括系统中的噪声和干扰的影响 xff
  • ROS tf工具与消息查看命令

    TF工具坐标系统是一个基础理论 xff0c 但是涉及到多个空间的变换 xff0c 不容易进行想象所以TF工具给开发者调试提供很多方便 1 tf monitor xff1a 将当前的坐标系转换关系打印到终端控制台 rosrun tf tf m
  • melodic 打开gazebo出现[Err] [REST.cc:205] Error in REST request错误解决方法

    ROS melodic版本下打开gazebo出现 Err REST cc 205 Error in REST request错误解决方法 输入以下命令打开文件 sudo gedit ignition fuel config yaml 然后将
  • 技术资源汇总(一)

    1 Ubuntu技术论坛 xff1a https askubuntu com 2 树莓派资源 https www yahboom com study raspberry3B 密码 xff1a cf0p 汇总资料提取码 xff1a hdy7
  • docker常用命令

    1 配置docker阿里云镜像 1 打开daemon json文件 xff08 若没有此文件 xff0c 则创建 etc docker daemon json xff09 xff1a vi etc docker daemon json 2
  • 网络调试助手UDP广播问题

    用直接广播地址 xff08 192 168 xxx 255 端口 xff09 可以进行广播 xff1b 用受限广播地址 xff08 255 255 255 255 端口 xff09 显示没有指定有效的远程主机端口 xff0c 搞了好久发现是
  • “平衡小车之家”家的STM32F103最小系统源代码分享

    在网上寻找了好久 xff0c 因为他家的开发板自带有mpu6050模块 故想测试其精准度以及z轴漂移程度 发现也有很大的漂移 代码如下 main c部分 xff1a span class token macro property span
  • 使用PMW3901和VL53L1X 实现室内定点悬停

    使用PMW3901和VL53L1X 实现室内定点悬停 使用PMW3901 光流传感器进行水平方向定位Pixhawk连接PMW3901传感器PX4源代码加入PMW3901驱动后重新编译QGroundControl中的配置 使用气压计和VL53
  • 使用 QGroundControl 地面站更新 PixHawk飞控的Bootloader

    安装最新版本的PX4固件 启动QGroundControl并且使用USB连接到Pixhawk飞控 选择 Q icon gt Vehicle Setup gt Firmware sidebar 打开固件设置 安装最新版本的PX4固件 更新Bo
  • 自制DIY 机器狗 完全教程 - MIT猎豹Cheetah

    自制DIY 机器狗 完全教程 MIT猎豹Cheetah 背景结构设计模块化关节电机性能考虑关节结构 四足平台设计腿部设计身体设计脚部设计 硬件设计关节驱动器通信总线板供电系统 控制系统人工智能 背景 3年前 xff0c MIT开源了世界上跑
  • centos安装wxWidgets,erlang,RabbitMq

    centos安装wxWidgets erlang RabbitMq 默认已经安装了java环境 而安装RabbitMq需要安装erlang xff0c 安装erlang又需要安装wxWidgets 安装wxWidgets 更新系统 yum
  • 2.rabbitmq概述和helloworld

    rabbitmq概述 rabbitmq中的几个概念 BROKER 接收和分发消息的应用 xff0c RabbitMQ Server 就是 Message Broker Virtual Host 出于多租户和安全因素设计的 xff0c 把 A
  • 3.rabbitmq轮询和不公平分发

    rabbitmq轮询和不公平分发 rabbitmq轮询分发 rabbitmq默认是使用轮询来分发消息的 测试代码如下所示 生产者代码 span class token comment 生产者 task rabbitmq 轮询演示 span