【RabbitMQ教程】Work queues 工作队列模式

2023-11-09

目录

前言

Work queues工作模式介绍

消息模型

适用场景

消费策略(重要)

消费策略选择

消费策略代码示例

轮询分发

(1)定义生产者和消费者

(2)运行程序

(3)结果分析

公平分发

(1)定义生产者和消费者

(2)运行程序

(3)结果分析

总结


前言

将‘work queues工作队列模式’单独抽出来细讲,目的是借助这个模式好好讲一下rabbitmq的‘轮询分发’和‘公平分配’。

Work queues工作模式介绍

rabbitmq六大工作模式架构图:

消息模型

1、竞争式消费消息。与‘广播模式’区分开,同一个队列中的消息只能被一个消费者进行消费。该消息模型有一个生产者和 多个消费者,多个消费者可以 同时消费 同一个队列消息

  • 比如生产者可将5000条数据放到「队列 」中,然后可以启动5个消费者,在默认策略下(默认是轮询分发消息),平均每个消费者消费1000条来分担压力;

2、如何让程序有 多个消费者同时消费同一个队列消息呢?

  1. 在程序中,自己手动创建多个消费者;(我个人认为除了写测试case,在实际生产中应该没有人这么干);
  2. 实际生产中,集群部署程序;那么一台机器就有一个消费者,多台机器就有多个消费者;
  3. rabbitmq中,提供了一个参数,可以配置 一台机器 消费者实例个数;

适用场景

Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
例如:短信服务部署多个,只需要有一个节点成功发送即可。

消费策略(重要)

这个消息模型对应着前面说过的轮寻分发 和公平分发, 默认是轮训分发。

1、轮询分发(自动ack)-不推荐

  • 轮询分发采用 自动ack机制
  • 默认是轮询分发(即平均分发消息给消费者不考虑消费者的性能差异 和处理消息的能力);
  • 不推荐使用轮询分发,因为轮询分发不考虑消费者性能差异,追求 平均分配;

2、公平分发(手动ack)-推荐

  • 公平分发采用手动ack机制

消费策略选择

轮询模式下,Work Queue是将生产者生产的消息一次性平均分配给消费者,当分配完消息后,它的自动确认机制会一次性全部确认,在官方文档中有这么一段解释:

    Message acknowledgment
    Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.
    But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.

当生产者生产了10个消息,2个消费者平均分到了5个消息,当消费者一消费完3个消息时不明原因宕机了,剩余的2个消息则会丢失,而我们希望由其他的消费者来对这些剩余的消息消费,要是在业务中出现消息丢失可能会造成很严重的后果,所以官方不推荐使用自动消息确认。下面我们通过代码的形式,分别来测试轮询分发(自动签收消息)和 公平分配(手动消息确认)。
 

消费策略代码示例

 基于maven采用java原生写法。不需要写properties或者yml配置文件

pom依赖:

     <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.3.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>
    </dependencies>

rabbitutils工具类:

package com.baiqi.rabbitmq.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;



public class RabbitUtils {
    private static ConnectionFactory connectionFactory = new ConnectionFactory();
    static {
        connectionFactory.setHost("127.0.0.1");
        //5672是RabbitMQ的默认端口号
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("cms");
        connectionFactory.setPassword("cms");
        //相当于表
        connectionFactory.setVirtualHost("/cms_vm");
    }
    public static Connection getConnection(){
        Connection conn = null;
        try {
            // TCP的长连接
            conn = connectionFactory.newConnection();
            return conn;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

轮询分发

(1)定义生产者和消费者

生产者:

public class Provider {
    @Test
    public void test() throws IOException, InterruptedException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        for(int i=0;i<10;i++){
            channel.basicPublish("","work", MessageProperties.PERSISTENT_TEXT_PLAIN,(i+1+": hello workqueues").getBytes());
        }
        RabbitMqUtil.close(channel,connection);
    }
}

消费者1:

public class Customer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Customer1消费消息:"+new String(body));
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

消费者2:

public class Customer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Customer2消费消息:"+new String(body));
            }
        });
    }
}

(2)运行程序

先对两个消费者进行开启,进入异步监听模式,然后让生产者生产10条消息,将消费者一线程休眠2秒,模拟该业务慢的情况。

消费者1:

消费者2:

(3)结果分析

无论是否当某个消费者处理缓慢时,还是一样地平均消费。

刚才的实现有以下问题:

  • 消费者1比消费者2的效率要低,一次任务的耗时较长

  • 然而两人最终消费的消息数量是一样的

  • 消费者2大量时间处于空闲状态,消费者1一直忙碌

现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。

怎么实现呢?

通过 BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。相反,它会将其分派给不是仍然忙碌的下一个Consumer。

值得注意的是:prefetchCount在手动ack的情况下才生效,自动ack不生效
 

公平分发

(1)定义生产者和消费者

生产者:

public class Provider {
    @Test
    public void test() throws IOException, InterruptedException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        for(int i=0;i<10;i++){
            channel.basicPublish("","work", MessageProperties.PERSISTENT_TEXT_PLAIN,(i+1+": hello workqueues").getBytes());
        }
        RabbitMqUtil.close(channel,connection);
    }
}

消费者1:

public class Customer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        //每次只确认一条消息
        channel.basicQos(1);
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Customer1消费消息:"+new String(body));
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

消费者一通过线程进行了2秒的休眠,模拟处理业务慢的情况。

消费者2:

public class Customer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicQos(1);
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Customer2消费消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

这里我们通过basicQos()设置了每次拉取一条消息;

消息被消费完后通过basicAck()手动确认,第一个参数为消息的标识,用来标识信道中投递的消息,RabbitMQ 推送消息给消费者时,会附带一个 Delivery Tag,以便 消费者可以在消息确认时告诉RabbitMQ到底是哪条消息被确认了;第二个参数为是否多消息确认;

当某个消费者宕机了,也不会丢失消息,剩余的则分担到其他的消费者身上,这样的设置可以防止消息的丢失,保证了数据的完整性。

(2)运行程序

消费者1:

消费者2:

(3)结果分析

体现了能者多劳,处理效率快的消费者可以处理较多的消息;

并且,如果当消费者1宕机了(其实宕机也可以认为是处理效率慢的一种,只不过有点极端),其余的消息也可以被消费者2消费;

总结

work queues是竞争式消费;

消费策略:轮询分发、公平分发;前者是自动ack,后者是手动ack。

prefetchCount在手动ack的情况下才生效,自动ack不生效

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【RabbitMQ教程】Work queues 工作队列模式 的相关文章

  • RabbitMQ 用户在预先创建的队列上发布/订阅的权限

    我有一个用例 我需要创建一个用户并授予他仅在现有队列上发布 订阅的权限 这是一个示例 虚拟主机 mainvhost 对于所有用户都相同 在虚拟主机内 我有 A foo 和 Q bar 队列 用户 foo 只能发布 订阅到 Q foo 用户
  • Camel 中的无限循环 - Rabbitmq

    我有一个小型服务器路由 它从queue in 获取消息并放入queue out 当我输入一条消息时queue in 服务器无限循环运行 我想知道我在配置方面缺少什么 这看起来是一条非常简单的路线 小服务器路由
  • 发送消息到任意虚拟主机/与 RabbitMQ / Spring AMQP 交换

    I use RabbitMQ and Spring AMQP发送消息 我有这个片段 rabbitTemplate convertAndSend exchange key object 当我对单个操作进行操作时 这有效VHOST 我必须从 1
  • 使用spring-amqp和rabbitmq实现带退避的非阻塞重试

    我正在寻找一种使用 spring amqp 和 Rabbit MQ 的退避策略来实现重试的好方法 但要求是侦听器不应被阻止 因此可以自由地处理其他消息 我在这里看到了类似的问题 但它不包括 后退 的解决方案 RabbitMQ 和 Sprin
  • 在 django 中使用 pika 的 Rabbitmq 监听器

    我有一个 django 应用程序 我想使用来自rabbit mq 的消息 我希望监听器在启动 django 服务器时开始使用 我正在使用 pika 库连接到rabbitmq 提供一些代码示例确实会有帮助 首先 您需要在 django 项目开
  • 在点网核心应用程序中使用 RabbitMQ 跳过 MassTransit 中的队列

    我有三个项目 一个是Dot net core MVC 两个是API项目 MVC 正在调用一个 API 来获取用户详细信息 当询问用户详细信息时 我通过 MassTransit 向队列发送消息 我看到跳过队列 第三个项目中有消费者 即API项
  • 使用AWS SQS作为Aurora数据库的写入队列来提高系统性能是否有效

    我正在 AWS 上开发一个 Web 应用程序服务器 需要支持高吞吐量的读写 我的老板给了我这样的高级设计 我被困在 写入队列 上 团队告诉我 我们需要它来提高写入性能 因为我们只能有 1 个可以写入的主副本 我对 SQS 和 RabbitM
  • 在rabbitmq配置spring boot中在AMQP中配置多个Vhost

    我正在实现一个项目 我必须在rabbitmq中的不同虚拟主机之间发送消息 使用 SimpleRoutingConnectionFactory 但得到 java lang IllegalStateException 无法确定查找键的目标 Co
  • MongoDB 架构设计 - 实时聊天

    我正在启动一个项目 我认为该项目特别适合 MongoDB 因为它提供的速度和可扩展性 我目前感兴趣的模块是与实时聊天有关的 如果我要在传统的 RDBMS 中执行此操作 我会将其分为 频道 一个频道有很多用户 用户 一个用户有一个频道但有多条
  • 死信交换 RabbitMQ 丢弃消息

    我正在尝试在 RabbitMQ 中实现 dlx 队列 场景很简单 我有 2 个队列 1 活着 2 死亡 x dead letter exchange 立即 x message ttl 5000 以及 立即 交换 这必然是 1 活着 我尝试运
  • 在 Windows 10 和 PHP 7.3 中安装 AMQP

    我想在 Windows 10 中使用 PHP 7 3 安装 AMQP 以便在 symfony 4 中使用 Windows 不使用任何 apache iis nginx 并直接由 symfony 运行 一切还好 直到 我决定在项目中使用rab
  • RabbitMQ 上的 Nack 和拒绝

    我想处理消费者从队列中获取的不成功的消息并将它们重新排队 想象一下我有这样的情况 P gt foo bar baz gt C 其中 foo bar 和 baz 是消息 如果消费者读到baz但出了问题 我可以使用basic reject or
  • AMQP如何克服直接使用TCP的困难?

    AMQP如何克服直接使用TCP发送消息时的困难 或者更具体地说 在发布 订阅场景中 在 AMQP 中 有一个代理 该代理接收消息 然后完成将消息路由到交换器和队列的困难部分 您还可以设置持久队列 即使客户端断开连接 也可以为客户端保存消息
  • 如何重置rabbitmq管理用户

    使用rabbitmq 我们可以安装管理插件 然后我们通过浏览器访问http localhost 55672 使用访客 访客 问题是 我无法再登录 因为我更改了密码并为角色输入了空白 有没有办法重置rabbitmq管理的用户 您可以通过以下方
  • AMQPRuntimeException:读取数据时出错。收到 0 而不是预期的 7 字节

    它曾经有效 但现在不再有效了 我正在使用 php amqplib 和 RabbitMQ 当我尝试创建新的 AMQP 连接时 connection new AMQPConnection localhost 5672 username pass
  • RabbitMQ 管理插件窗口呈现为空白页面

    I have installed Erlang RabbitMQ and configured the management plugin as per the instructions on the website https www r
  • RabbitMQ:如何创建和恢复备份

    我是 RabbitMQ 的新手 我需要一些帮助 如何备份和恢复到RabbitMQ 以及我需要保存哪些重要数据 谢谢 如果您安装了管理插件 您可以在Overview页 在底部你会看到导入 导出定义您可以使用它来下载代理的 JSON 表示形式
  • 生产者/消费者的不同语言

    我想知道是否可以通过 AMQP 和 RabbitMQ 对生产者和消费者使用不同的语言 例如 Java 代表生产者 python php 代表消费者 或者反之亦然 是的 AMQP 与语言无关 这意味着只要您有可以连接到 AMQP 的客户端sa
  • RabbitMQ - 无法联系统计数据库。消息速率和队列长度将不会显示

    我已经设置了一个兔子经纪人集群 并且在管理门户插件中我收到以下消息 无法联系统计数据库 消息速率和队列长度将不会显示 我已经搜索过这个错误 但谷歌并不友善 任何人都可以阐明这一点吗 我最近在旧安装的RabbitMQ 2 8 7 上遇到了同样
  • 从 Java/Spring 检索 RabbitMQ 队列中未确认消息的数量

    有没有办法返回未确认的消息数 我正在使用此代码来获取队列中的消息数 DeclareOk declareOk amqpAdmin getRabbitTemplate execute new ChannelCallback

随机推荐