RabbitMQ中的限流、return机制、死信队列

2023-11-06

目录

优点

缺点

1、限流

2、return机制

3、死信队列


优点

  1. 高可用性: RabbitMQ支持集群和镜像队列等多种方式实现高可用性,保证系统稳定运行。

  2. 可靠性强: RabbitMQ使用AMQP协议作为消息传递的标准,能够确保消息传递的可靠性和有序性。

  3. 灵活性强: RabbitMQ支持多种消息模式,包括点对点、发布订阅、路由、RPC等,可以根据业务需求选择合适的模式。

  4. 性能优异: RabbitMQ采用基于Erlang语言开发,具有高并发、低延迟等特点,支持快速处理大量数据和消息。

  5. 易于部署和管理: RabbitMQ提供了丰富的管理工具和API接口,可以方便地进行配置、监控和管理。同时也提供了可视化界面使得操作更加简单易懂。

  6. 开源免费:RabbitMQ是一款开源软件,并且完全免费使用。可以在任何环境下轻松部署和使用。

缺点

  1. 学习成本高: RabbitMQ涉及到很多概念和技术,对于初学者而言,学习起来可能会比较困难。

  2. 配置复杂: RabbitMQ的配置比较复杂,需要根据实际需求进行相应的设置。如果配置不当,可能会影响系统的性能和稳定性。

  3. 资源占用高: RabbitMQ使用Erlang语言开发,需要占用大量内存和CPU资源,如果部署在低配服务器上可能会导致系统出现延迟等问题。

  4. 单点故障: 尽管RabbitMQ支持集群模式,但是在某些情况下仍然存在单点故障的问题。

  5. 消息堆积: 如果消费者处理消息速度过慢或者出现异常情况时,RabbitMQ可能会导致消息堆积、内存溢出等问题。

1、限流

所谓限流,就是限制该消费者一次最多消费多少条消息

// 每次只消费1000条消息,RabbitMQ放过来的这1000条消息没有处理完,就不会再放进来新的消息
$channel->basic_qos(null, 1000, null);

该设置在平时用不上,但是在消息积压比较严重时会用到,如果在消息积压严重时不设置该值,则很可能会导致这个消费者挂掉。

 

2、return机制

return Listener 用于处理一些不可路由的消息!

生产者通过指定一个 exchange 和 routingkey 把消息送达到某个队列中去,然后消费者监听队列,进行消费处理。但是在某些情况下,如果我们在发送消息时,当前的 exchange 不存在或者指定的 routingkey 路由不到,这个时候如果要监听这种不可达的消息,就要使用 return Listener。

# 虽然定义了交换机,但是指定了一个没有绑定队列的路由键
<?php
declare(strict_types = 1);

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

require __DIR__ . '/../vendor/autoload.php';

try {
    $connection = new AMQPStreamConnection('192.168.78.200', 5672, 'root', '123456');
    $channel = $connection->channel();
    // 定义一个交换机
    $channel->exchange_declare('test_return_exchange', AMQPExchangeType::DIRECT);
    $message = new AMQPMessage("Hello World!");

    // 这里我们指定了一个没有绑定队列的路由键,会导致rabbitmq找到不队列
    $channel->basic_publish($message, 'test_return_exchange', 'xxxxxx');

    while (true) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
} catch (Exception $e) {
    print_r($e->getMessage());
}

该代码虽然执行成功了,并且创建了我们声明的交换机,但是因为我们指定的路由键找不到与之绑定的队列,所以消息并不会推送进rabbitmq,但是因为rabbitmq并没有报错,所以我们会误以为推送成功了。

即使我们使用confirm机制来监听推送状态也没用,因为confirm监听不到因为没有找到交换机或者路由不到队列的情况,这里我们只能使用rabbitmq的另一个高级特性return机制,下面代码就引入了return机制:

<?php
declare(strict_types = 1);

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

require __DIR__ . '/../vendor/autoload.php';

try {
    $connection = new AMQPStreamConnection('192.168.78.200', 5672, 'root', '123456');
    $channel = $connection->channel();
    // 定义一个交换机
    $channel->exchange_declare('test_return_exchange', AMQPExchangeType::DIRECT);
    $message = new AMQPMessage("Hello World!");

    // 指定第三个参数mandatory为true,表示当rabbitmq无法找到路由或队列时,将消息返回给生产者
    // 这里我们指定了一个没有绑定队列的路由键,会导致rabbitmq找到不队列,进而触发return
    $channel->basic_publish($message, 'test_return_exchange', 'xxxxxx', true);

    // 监听消息未找到交换机或者未找到路由键对应的路由
    $channel->set_return_listener(function ($replyCode, $replyText, $exchange, $routingKey, $message) {
        $msg  = 'oh hoo!发生错误了'.PHP_EOL;
        $msg .= '错误码:'.$replyCode.PHP_EOL;
        $msg .= '错误信息:'.$replyText.PHP_EOL;
        $msg .= '指定的交换机:'.$exchange.PHP_EOL;
        $msg .= '指定的路由键:'.$routingKey.PHP_EOL;
        $msg .= '投递的消息:'.$message->body.PHP_EOL;
        print_r($msg);
    });

    while (true) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
} catch (Exception $e) {
    print_r($e->getMessage());
}

这时再运行程序,就会进行报错,我们这里是输出了错误,实际生产中应该是将错误记录到指定的日志数据表中

# 程序返回
root@204d5054860c:/data/test-yii/web# php testReturn.php 
oh hoo!发生错误了
错误码:312
错误信息:NO_ROUTE
指定的交换机:test_return_exchange
指定的路由键:xxxxxx
投递的消息:Hello World!

 

3、死信队列

死信队列其实只是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。

死信消息是RabbitMQ为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中或记录到日志表中等待后续专门处理。

一个消息在下列场景下会被rabbitmq判定为死信,然后投入到死信队列中。

  • 消息被拒绝

  • 消息过期

  • 队列超载

下面模拟一条消息显示被投入普通队列,这条消息被设置过期时间是10秒,在这10秒内没有消费者来处理,因此这条消息就过期了,变成了死信,这时,RabbitMQ会将它放到死信队列里,也就是我们在代码中声明的死信队列。

注意:rabbitmq中不是只可以有一个死信队列,而是可以有多个死信队列,如果我们没有指定死信队列,过期的消息将被rabbitmq遗弃。

<?php
declare(strict_types = 1);

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;

require __DIR__ . '/../vendor/autoload.php';

try {
    $connection = new AMQPStreamConnection('192.168.78.200', 5672, 'root', '123456');

    // 设置交换机名称
    $deadLetterExchangeNameName = 'dead_letter_exchange';
    $normalExchangeName = 'exchange_6';
    // 设置队列名称
    $deadLetterQueueName = 'dead_letter_queue';
    $normalQueueName = 'queue_6';
    // 设置路由键
    $deadLetterRoutingKey = 'dead_letter_key';
    $normalRoutingKey = 'route_6';

    // 创建通道
    $channel = $connection->channel();
    // 声明交换器,作为死信交换器
    $channel->exchange_declare($deadLetterExchangeNameName, AMQPExchangeType::DIRECT, false, true);
    // 声明交换器,作为普通交换器
    $channel->exchange_declare($normalExchangeName, AMQPExchangeType::DIRECT, false, true);

    $args = new AMQPTable();
    // 设置消息过期时间为25s,25秒后消息会进入死信队列
    $args->set('x-message-ttl', 25000);
    // 设置死信交换器
    $args->set('x-dead-letter-exchange', $deadLetterExchangeNameName);
    // 设置死信路由键
    $args->set('x-dead-letter-routing-key', $deadLetterRoutingKey);

    // 声明队列,作为普通队列,同时将死信队列相关参数传入
    $channel->queue_declare($normalQueueName, false, true, false, false, false, $args);
    // 声明死信队列
    $channel->queue_declare($deadLetterQueueName, false, true, false, false);

    // 将普通队列与普通交换机绑定
    $channel->queue_bind($normalQueueName, $normalExchangeName, $normalRoutingKey);
    // 将死信队列与死信交换机绑定,同时指定死信路由键
    $channel->queue_bind($deadLetterQueueName, $deadLetterExchangeNameName, $deadLetterRoutingKey);
    // 设置传递的消息
    $message = new AMQPMessage('Hello DLX Message queue_6');

    // confirm监听消息投递【与死信队列无关,仅用于排错】
    $channel->confirm_select();
    $channel->set_ack_handler(function ($message) {
        print_r('投递成功啦,消息是'.$message->body);
    }) ;
    $channel->set_nack_handler(function ($message) {
        print_r('投递失败啦,消息是'.$message->body);
    }) ;

    // return监听未找到交换机或路由问题【与死信队列无关,仅用于排错】
    $channel->set_return_listener(function ($replyCode, $replyText, $exchange, $routingKey, $message) {
        $msg  = 'oh hoo!发生错误了'.PHP_EOL;
        $msg .= '错误码:'.$replyCode.PHP_EOL;
        $msg .= '错误信息:'.$replyText.PHP_EOL;
        $msg .= '指定的交换机:'.$exchange.PHP_EOL;
        $msg .= '指定的路由键:'.$routingKey.PHP_EOL;
        $msg .= '投递的消息:'.$message->body.PHP_EOL;
        print_r($msg);
    });

    // 发送消息到RabbitMQ
    $channel->basic_publish($message, $normalExchangeName, $normalRoutingKey, true);

    while (true) {
        $channel->wait();
    }

    // 关闭通道和连接
    $channel->close();
    $connection->close();

} catch (Exception $e) {
    print_r($e->getMessage());
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ中的限流、return机制、死信队列 的相关文章

随机推荐

  • 如何在虚拟机中安装ikuai软路由系统

    首先访问ikuai官网下载固件固件下载 爱快 iKuai 商业场景网络解决方案提供商 ikuai8 com 根据需求下载 然后创建一个虚拟机 点击下一步 选择更下载的ISO映像文件 点击下一步 点击下一步 设置一下名称和储存位置 点击下一步
  • 学习记录:C语言源文件在编译时产生multiple definition of xxx; xxx: first defined here 相关报错的解决方法

    问题描述 在ubunt18 04 gcc 7 5 0 上可以正常编译的程序在树莓派 gcc 10 2 0 上编译报错 报错如图 问题原因 在头文件内定义全局变量 在多个源文件中引用且未声明 解决方法一 在树莓派上安装7 5 0版本的gcc
  • CPU性能测试及Coremark简介

    衡量处理器的一个重要指标是功耗 另外一个重要指标便是性能 在处理器领域的 Benchmarks 非常众多 有某些个人开发的程序 也有某些标准组织 或者商业公司开发的Benchmarks 本文在此不加以一一枚举 在嵌入式处理器领域最为知名和常
  • 198. House Robber

    You are a professional robber planning to rob houses along a street Each house has a certain amount of money stashed the
  • Docker与DevOps的无敌组合,引爆你的创新潜能

    荣誉认证 51CTO博客专家博主 TOP红人 明日之星 阿里云开发者社区专家博主 技术博主 星级博主 微信公众号 iOS开发上架 本文由iOS开发上架原创 欢迎关注 点赞 收藏 留言 首发时间 2023年8月7日 坚持和努力一定能换来诗与远
  • 代码检查工具选型

    源码分析工具选型 1 目前各种主流源码分析工具简单介绍 1 1 checkstyle checkstyle产生于2001年 是以antlr作为java语法分析器的静态源码分析工具 通过checkstyle的xml配置文件可指定源码分析规则
  • 面试篇:虚拟机栈5连问,一听心里就乐了

    面试路上 滴 滴滴 师傅我们到哪了 我还要赶着面试呢 师傅 快了快了 下个路口就到了 真是服了这帮人了 不会开车净往里凑 听着司机师傅的抱怨声 不禁想起首打油诗 满目尾灯红 耳盈刺笛声 心忧迟到久 颓首似雷轰 一下车赶紧小跑就进了富丽堂皇的
  • Android 控件 RecyclerView 看这篇就够了

    Android 控件 RecyclerView 概述 RecyclerView是什么 从Android 5 0开始 谷歌公司推出了一个用于大量数据展示的新控件RecylerView 可以用来代替传统的ListView 更加强大和灵活 Rec
  • 记录生活(一)

    我为什么要写这篇文章呢 主要是想记录自己的生活 我今天刚学css HTML以前学过一点 2022年1月17日 当日下午做的这两个模板 素材文件夹是两个模板共用的的 布局分明 是用百分比 布局的 灰色部分是导航栏 白色部分是用户登录的头像 绿
  • 数据库课后习题

    数据库练习 1 在Course表中添加 教师 列 20个长度的变长字符串 2 为每门课程添加教师信息 3 将教师列修改为非空列 4 查询选修了刘老师的课程的学生 5 检索选修了课程号为C01或C02课程 且成绩高于或等于70分的学生的姓名
  • 如何区分物联网的三大系统

    物联网是基于Internet的各种物理产品信息服务的综合 它主要由三个系统组成 一个是运营支撑系统 即相关应用服务软件 门户 管道 终端等的管理 其二是传感器网络系统 即通过现有的Internet 广电网络 通信网络等实现数据传输和计算 三
  • 最短路经算法简介(Dijkstra算法,A*算法,D*算法)

    转自 http www embhelp com drew algorithm shortpath htm 作者 Drew 据 Drew 所知最短路经算法现在重要的应用有计算机网络路由算法 机器人探路 交通路线导航 人工智能 游戏设计等等 美
  • C# 结构体的使用

    先说一下结构体和类的区别 1 结构体定义的是变量 保存在栈当中 类的对象 实例 保存在堆当中 引用保存在栈当中 结构体是值类型 类是引用类型 2 不能在结构体中定义默认的构造方法 无参 类中可以定义 3 结构体中自定义构造方法后 编译器会提
  • C++基础:for循环

    美好的知识点从出题开始 输出1 100所有的奇数 看到这道题 你可能有点懵 回顾标题 你找到办法了 但你不知道怎么写 来看看for循环的代码框架吧 for 控制变量初始化表达式 条件表达式 增量表达式 语句1 刚看到这 你肯定不太懂 我实际
  • 面试:Tomcat如何优化

    一 增大tomcat运行内存 例如 从默认的 256M增大到2G SET CATALINA OPTS Xms2048m Xmx4096m XX MaxNewSize 512m XX MaxPermSize 256m set JAVA OPT
  • “此帐户并未得到从这个工作站登录的授权”问题

    此帐户并未得到从这个工作站登录的授权 问题 转自 http blog 163 com yumin wang 126 blog static 36293550201210303413140 问题 访问网络共享文件夹时 出错提示为 此帐户并未得
  • [创业-33]:股权、期权、期股的区别

    目录 1 基本概念 1 1 股权 1 2 期权 1 3 期股 二 比较 2 1 享有的权益 2 2 在退出机制 2 3 兑现机制 2 4 分配方式 2 5 获利方式 附录 雷军关于创业公司的股权解读 1 基本概念 1 1 股权 股权 是有限
  • 数据包协议设计(通讯协议的设计)

    一 为什么要设计通讯协议 通常 多设备之间进行通讯多使用数据包的方式 如何从一堆的数据中确定哪些是有效数据 以及这些数据要表达什么意思 为解决这些问题 通常我们需要设计一个通讯协议 依照通讯协议对数据进行解析 就能够正确的找到并使用这些数据
  • 上下文相关音素-状态绑定

    在发音过程中 因为协同发音的影响 同一个音素在不同的位置 其发音变化很大 如下图所示 同样的元音 eh 在不同的单词中的发音在频域上区分非常明显 因为单音素monophone 是上下文独立的 context independent 为了能够
  • RabbitMQ中的限流、return机制、死信队列

    目录 优点 缺点 1 限流 2 return机制 3 死信队列 优点 高可用性 RabbitMQ支持集群和镜像队列等多种方式实现高可用性 保证系统稳定运行 可靠性强 RabbitMQ使用AMQP协议作为消息传递的标准 能够确保消息传递的可靠