RabbitMq(一) RabbitMq工作模型

2023-05-16

RabbitMq工作模型

    • Mq基础
    • RbbitMq工作模型
    • RabbitMq基本使用
      • 原生api
      • Spring集成
      • Springboot集成
    • RabbitMq进阶知识
      • 订单延迟关闭
      • 队列满了
    • 总结

Mq基础

message queue 消息队列
特点:
1、独立部署,解耦
2、数据结构是队列,FIFO
3、具有发布订阅模型

为什么使用MQ:
1、异步
2、解耦
3、削峰
4、能广播通信

带来的问题:
1、增加运维成本
2、系统可用性降低
3、系统复杂性提高

AMQP

AMQP协议,所有的MQ都遵循这个协议

RbbitMq工作模型

在这里插入图片描述

Broker

	服务器

Connection

	生产者和消费者都和Broker建立连接,这是一个TCP长链接

Channel

	消息通道,为了减少TCP长链接的创建和释放

Queue

	消息存放的地方,队列其实有自己的数据库

Consumer

消费者消费有两种模式:
	pull 消费者主动拉取,消息存放咋服务端
	push 推送给消费者,消息存放在消费端
RabbitMq两种都实现了,kafka和RocketMq只实现了pull
消费者-队列 多对多,一般使用时一个消费者只取一个队列的消息

Vhost

虚拟主机,不同的系统可以使用不同的vhost,建自己的用户,交换机和队列
rabitmq安装时有默认的vhost,名字是 "/"

Exchange 交换机

负责分发消息,交换机跟队列有绑定关系
交换机有三种
1、direct 直联,类似于配置写死 basicPublish("交换机名","binding-key",message)
2、topic主题,模糊匹配,#和*,#是不限制,*是代表一个单词
3、fanout广播,不需要binding-key,给所有队列发

RabbitMq基本使用

原生api

交换机有三种类型,channel.exchangeDeclare 时可以创建三种交换机

消费者

	配置ip
	端口:5672
	虚拟机:VHost
	设置用户密码:
	建立链接:
	创建消息通道
	创建交换机:可以创建三种
	声明队列
	绑定交换机和队列
	创建消费者
	获取消息
ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("127.0.0.1");
        // 默认监听端口
        factory.setPort(5672);
        // 虚拟机
        factory.setVirtualHost("/");

        // 设置访问的用户
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 声明交换机
        // String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
        channel.exchangeDeclare(EXCHANGE_NAME,"direct",false, false, null);

        // 声明队列
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" Waiting for message....");

        // 绑定队列和交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"gupao.best");

        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException
            {
                String msg = new String(body, "UTF-8");
                System.out.println("Received message : '" + msg + "'");
                System.out.println("consumerTag : " + consumerTag );
                System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
            }
        };

        // 开始获取消息
        // String queue, boolean autoAck, Consumer callback
        channel.basicConsume(QUEUE_NAME, true, consumer);

生产者

	配置ip
	端口:5672
	虚拟机:VHost
	设置用户密码:
	建立链接:
	创建消费通道
	发送消息:basicPublish(交换机,bindling-key,消息)
ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("127.0.0.1");
        // 连接端口
        factory.setPort(5672);
        // 虚拟机
        factory.setVirtualHost("/");
        // 用户
        factory.setUsername("guest");
        factory.setPassword("guest");

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 发送消息
        String msg = "Hello world, Rabbit MQ,111";

        // String exchange, String routingKey, BasicProperties props, byte[] body
        channel.basicPublish(EXCHANGE_NAME, "gupao.best", null, msg.getBytes());

        channel.close();
        conn.close();

Spring集成

依赖

<!--rabbitmq依赖 -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>

基于xml配置rabbitMq的依赖关系

<!--配置connection-factory,指定连接rabbit server参数 -->
    <rabbit:connection-factory id="connectionFactory" virtual-host="/" username="guest" password="guest" host="127.0.0.1" port="5672" />

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />

    <!--######分隔线######-->
    <!--定义queue -->
    <rabbit:queue name="MY_FIRST_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />

    <!--定义direct exchange,绑定MY_FIRST_QUEUE -->
    <rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
        <rabbit:bindings>
            <rabbit:binding queue="MY_FIRST_QUEUE" key="FirstKey">
            </rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--定义rabbit template用于数据的接收和发送 -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="MY_DIRECT_EXCHANGE" />

    <!--消息接收者 -->
    <bean id="messageReceiver" class="com.gupaoedu.consumer.FirstConsumer"></bean>

    <!--queue listener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="MY_FIRST_QUEUE" ref="messageReceiver" />
    </rabbit:listener-container>

消费者

public class ThirdConsumer implements MessageListener {
    private Logger logger = LoggerFactory.getLogger(ThirdConsumer.class);

    public void onMessage(Message message) {
        logger.info("The third cosumer received message : " + message);
    }
}

发送消息

@Autowired
    @Qualifier("amqpTemplate")
    private AmqpTemplate amqpTemplate;

	.....

	// Exchange 为 direct 模式,直接指定routingKey
    amqpTemplate.convertAndSend("FirstKey", "[Direct,FirstKey] "+message);
    amqpTemplate.convertAndSend("SecondKey", "[Direct,SecondKey] "+message);

Springboot集成

依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置

@Configuration
public class RabbitConfig {

    // 两个交换机
    @Bean("topicExchange")
    public TopicExchange getTopicExchange(){
        return new TopicExchange("TOPIC_EXCHANGE");
    }

    @Bean("fanoutExchange")
    public FanoutExchange getFanoutExchange(){
        return  new FanoutExchange("FANOUT_EXCHANGE");
    }

    // 三个队列
    @Bean("firstQueue")
    public Queue getFirstQueue(){
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-message-ttl",6000);
        Queue queue = new Queue("FIRST_QUEUE", false, false, true, args);
        return queue;
    }

    @Bean("secondQueue")
    public Queue getSecondQueue(){
        return new Queue("SECOND_QUEUE");
    }

    @Bean("thirdQueue")
    public Queue getThirdQueue(){
        return new Queue("THIRD_QUEUE");
    }

    // 两个绑定
    @Bean
    public Binding bindSecond(@Qualifier("secondQueue") Queue queue,@Qualifier("topicExchange") TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("#.gupao.#");
    }

    @Bean
    public Binding bindThird(@Qualifier("thirdQueue") Queue queue,@Qualifier("fanoutExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }

}

消费者

@Component
@RabbitListener(queues = "FIRST_QUEUE")
public class FirstConsumer {

    @RabbitHandler
    public void process(String msg){
        System.out.println(" first queue received msg : " + msg);
    }
}

生产消息

@Component
public class MyProvider {

    @Autowired
    AmqpTemplate amqpTemplate;

    public void send(){
        // 发送4条消息

        amqpTemplate.convertAndSend("","FIRST_QUEUE","-------- a direct msg");

        amqpTemplate.convertAndSend("TOPIC_EXCHANGE","shanghai.gupao.teacher","-------- a topic msg : shanghai.gupao.teacher");
        amqpTemplate.convertAndSend("TOPIC_EXCHANGE","changsha.gupao.student","-------- a topic msg : changsha.gupao.student");

        amqpTemplate.convertAndSend("FANOUT_EXCHANGE","","-------- a fanout msg");

    }

}

RabbitMq进阶知识

订单延迟关闭

1、入库,定时扫描,太low

2、利用rabbitMq的死信队列 TTL
	下单的时候,同时发一条消息到mq,30分钟后消费这条消息
	1、消息设置过期时间,死性交换机,消息过期了,把消息转到死性交换机-死性队列上,一个队列只有一个死性交换机
	2、队列设置过期时间,如果队列跟消息过期时间都有,取小的一个
	3、延迟投递交换机
	
	流程:生产者-原交换机-原队列(超时之后)-----死信交换机----死信队列----最终消费者

死信交换机缺点
	1、如果用队列设置死信时间,存在不同时间的消息,需要很多的交换机和队列
	2、如果设置消息的TTL,可能会造成阻塞,前一条是10秒,后一条是3秒,第二条无法投递
	3、存在时间偏差

使用插件时间延迟队列功能,声明x-delayed-message的类型,所以交换机可以有4种

队列满了

	队列有俩属性控制长度
		x-max-length:存储最大消息数,超过数量,消息被丢弃
		x-max-length-bytes:存储最大消息容量,超过容量,消息丢弃
	内存控制
		检测物理内存数值,超过设置比例后,报警
	磁盘超过设置容量后,触发流控措施
	消费端限流
		basciQos(2)超过2条消息没有发送ACK,不再接受消息

总结

支持多客户端,主流语言都支持客户端实现
灵活的路由:通过交换机实现消息的灵活路由
权限管理:用户和虚拟机
支持插件拓展
与spring集成
高可靠性
集群与拓展性
高可用队列
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMq(一) RabbitMq工作模型 的相关文章

  • 【HttpRunner】学习准备

    1 安装python 3 7及以上版本 xff1a 2 安装fastapi xff1a pip install fastapi all 3 把如下代码粘贴复制到main py文件中 xff1a span class token keywor
  • Android中锁定文件的方法

    androidSDK中并没有锁定文件相关的api 但是android是基于linux操作系统的 linux比较底层 灵活性也更大 为了实现锁定文件的效果 大概有以下几种办法 用chmod命令修改文件读写权限利用linux中的多线程独占锁 启
  • 远程控制Ubuntu

    远程控制Ubuntu 在Ubuntu上安装team viewer或者向日葵 xff0c 进行远程控制 xff0c 这里记录采用team viewer方式的配置过程 xff0c 向日葵等远程控制类似 安装Ubuntu 官方下载Ubuntu系统
  • 信号降噪方法

    傅里叶变换 只能获取一段信号总体上包含哪些频率的成分 xff0c 但是对各成分出现的时刻并无所知 对非平稳过程 xff0c 傅里叶变换有局限性 短时傅里叶变换 xff08 Short time Fourier Transform STFT
  • C++ 带通滤波

    Butterworth Filter Coefficients The following files are for a library of functions to calculate Butterworth filter coeff
  • python之collections

    collections是日常工作中的重点 高频模块 xff0c 包含了一些特殊的容器 xff0c 针对Python内置的容器 xff0c 例如list dict set和tuple xff0c 常用类型有 xff1a namedtuple
  • git 指定下载文件,目录

    1 创建路径 mkdir gitfile cd lt 路径 gt eg xff1a cd home gitfile 2 创建一个空的本地仓库 git init 3 连接远程仓库GitHub git remote add f origin l
  • Ubuntu v4l2 视屏流花屏问题

    之前用的好好解析YUV xff0c MJPEG 换了个核心板就不好使了 xff0c opencv3 4 6 gt gt gt opencv4 5 5 xff0c Mat xff0c cvMat xff0c IplImage 的类型转换也不好
  • qt qmake .qrc hasmodification time xxx in the future

    原因 xff1a 跨平台生成的 qrc 文件创建时间与目标平台时间不一致导致 xff0c 如win写的 copy 到 Linux xff0c 再编译可能会遇到该bug 导致无法qmake 与 build 解决 xff1a touch qrc
  • (转)python图像操作

    转自 xff1a zbxzc 侵删 使用PIL完成 python view plain copy span class hljs keyword import span Image span class hljs comment 打开图像
  • 关于input to reshape is a tensor 的问题

    span class hljs keyword for span index name span class hljs operator in span enumerate classes class path 61 cwd 43 name
  • mobileNet训练自己的样本

    span class hljs keyword import span matplotlib pyplot span class hljs keyword as span plt span class hljs keyword import
  • 关于屏幕适配之比例布局

    对于平板等需求场合 xff0c 它们的屏幕比例以16 xff1a 10和16 xff1a 9等为主 xff0c 但是屏幕尺寸各异 xff0c 分辨率各异 xff0c 即便是同一尺寸也有多种分辨率 xff0c 这种时候无论是使用dp还是px
  • 机器学习实战:ValueError: invalid literal for int() with base 10: '0.000000'

    在logistic回归一章中 xff0c 在将训练数据转换为int型时 xff0c 会出现日下报错 xff1a ValueError invalid literal for int with base 10 0 000000 只需将下面一句
  • cuda-8.0安装心得

    cuda 8 0安装 xff08 这两天不小心把原来的显卡驱动搞崩 xff0c 挣扎了好久 xff0c 只好重新走一遍 xff09 cuda 安装条件 gcc5 3 0 xff08 版本不能太高 xff09 sudo apt get ins
  • 在GPU刨过的坑

    这两天回学校这两天先是把自己的linux系统给强制删除了 xff0c 然后导致重启无法正常引导进入win xff0c 最后win也救不活了 xff0c 还不好重装系统 xff0c 各种文件损坏 xff0c 简单粗暴的翻车血泪史 捷径路上总是
  • [ArchLinux] 搜狗拼音输入法的安装

    配置源 在ArchlinuxCN源中有很多方便中国用户使用的包 xff0c 其中也包含了经常使用的搜狗拼音输入法 xff0c 于是我们需要先配置ArchlinuxCN源 xff0c 这样我们就可以使用自带的包管理器Pacman直接安装搜狗拼
  • [ArchLinux] 安装及KDE桌面环境安装配置

    ArchLinux 安装及KDE桌面环境安装配置 首先 xff0c 安装之前 xff0c 需要一个 启动介质 xff0c 我这里习惯使用USB设备作为启动介质 xff0c 这是由于ArchLinux滚动更新的特性 xff0c 而且占用空间很
  • 使用crontab执行定时任务时加flock独占锁防止进程堆积

    使用crontab执行定时任务 此处为每分钟执行一次 加flock独占锁防止进程堆积 注意给 var run 读写权限 xff0c 或者放到一个有读写权限的文件夹 span class token operator span span cl
  • macOs 安装liplpcap

    1 xff0c 下载liplpcap http www tcpdump org 1 在tcpdump网站下载libpcap的latest release 2 tar zxvf 3 configure make amp make instal

随机推荐

  • Android应用开发常用知识(4)

    Android string 中product的使用 Android的资源文件string xml会出现下面同名的字符串 xff1a lt string name 61 34 build type 34 product 61 34 tv 3
  • VR行业的发展现状和前景

    5G技术的应用推广 xff0c 加速推动虚拟现实不断发展和完善 xff0c VR产业迅速在各个领域和行业都得到广泛应用 xff0c 最好直观的感受就是知觉体验得到了良好的增强作用 本文的主要内容是简单概括VR技术的发展现状和发展前景 一 V
  • org.apache.ibatis.annotations不存在

    今天遇到了一个很有意思的bug 有人 xff08 还不止一个人 xff09 来问我 xff0c 为什么项目启动不了 xff0c 我说不可能啊 xff0c 我这不跑得好好的吗 xff0c 而且成功启动的也不止我一个啊 然后他就说 xff0c
  • 【学习笔记】Ubuntu双系统+搭建个人服务器

    Ubuntu双系统 43 搭建个人服务器 前言1 Ubuntu 43 Win双系统1 1 制作U盘启动盘1 2 系统分盘1 3 安装Ubuntu系统 2 搭建个人服务器2 1 设置root2 2 配置ssh2 3 向日葵连接 3 内网穿透3
  • (原创)开发微信公众平台遇到的乱码等问题的解决

    1 ngrok内网映射问题 首先这个工具是外国人写的 服务器也在国外 但是tunnel部属在国内 支持ngrok绝大多数功能 http www tunnel mobi 命令行中使用方法 在CMD命令中先切换到ngrok所在的位置再进行如下操
  • iOS给应用添加支持的文件类型/根据文件类型打开应用

    iOS给应用添加支持的文件类型 根据文件类型打开应用 之前写过类似的文章 IOS UTI 统一类型标识符 根据文件后缀打开APP 和 自定义UTI 注册你的APP所支持的文件类型 这里 再次总结说明 已经存在的UTL类型 苹果官方文档提供了
  • 编程之美 -- 中国象棋将帅问题

    下过中国象棋的朋友都知道 xff0c 双方的 将 和 帅 相隔遥远 xff0c 并且它们不能照面 在象棋残局中 xff0c 许多高手能利用这一规则走出精妙的杀招 假设棋盘上只有 将 和 帅 二子 xff08 为了下面叙述方便 xff0c 我
  • C++单元测试工具 -- CppUnit

    CppUnit 作为C 43 43 语言的一款测试工具 xff0c 其实也是一个开源项目 xff0c 与JUnit一样 xff0c 用来方便开发人员进行单元测试的工具 项目地址 xff1a http sourceforge net apps
  • 拒绝游戏!发愤图强!

    立帖为证 xff01 xff01 xff01
  • C++ STL — 第6章 STL容器(二)deque

    C 43 43 STL容器deque和vector很类似 xff0c 也是采用动态数组来管理元素 使用deque之前需包含头文件 xff1a include lt deque gt 它是定义在命名空间std内的一个class templat
  • C++ STL — 第6章 STL容器(三)list

    一 list基础 List使用一个双向链表来管理元素 图一显示了list的结构 图一 list的结构 任何型别只要具备赋值和可拷贝两种性质 xff0c 就可以作为list的元素 二 list的功能 list的内部结构和vector和dequ
  • STL list remove和sort函数

    include lt iostream gt include lt list gt include lt iterator gt using namespace std bool cmp int a int b return a gt b
  • 排序 -- 简单选择排序

    选择排序 思想 xff1a 每一趟 n i 43 1 xff08 i 61 1 2 3 n 1 xff09 个记录中选择关键字最小的记录作为有序序列的第i个记录 简单选择排序 xff1a 通过n i次关键字间的比较 xff0c 从n i 4
  • HDOJ 1106 排序

    题目地址 xff1a http acm hdu edu cn showproblem php pid 61 1106 Problem xff1a 输入一行数字 xff0c 如果我们把这行数字中的 5 都看成空格 xff0c 那么就得到一行用
  • ftp创建文件权限问题

    一 问题 有一个这样的需求 xff0c admin为一个Linux为其FTP应用创建的一个有权限限制的用户 xff0c 通过admin用户可以进行登录FTP服务 xff0c 登录FTP服务后 xff0c 创建文件夹 xff0c 该文件夹的用
  • lottie加载动画,第一次有延迟问题

    lottie是airbnb推出的一个直接将AE工程转化为动画的工具 ae project gt data json gt LottieComposition gt Lottie动画 之前做一个比较复杂的动画 xff0c 花了两天时间都在画各
  • CentOS 7防火墙快速开放端口配置方法

    CentOS 7防火墙快速开放端口配置方法 这篇文章主要为大家详细介绍了CentOS 7防火墙开放端口的快速方法 xff0c 感兴趣的小伙伴们可以参考一下 一 CentOS 7快速开放端口 xff1a CentOS升级到7之后 xff0c
  • C语言unsigned char、char与int之间的转换

    C语言unsigned char char与int之间的转换 2016年10月23日 18 40 50 bladeandmaster88 阅读数 xff1a 11347更多 个人分类 xff1a c语言基础 先来看一道题 xff1a cha
  • Android 内存分析(java/native heap内存、虚拟内存、处理器内存 )

    1 jvm 堆内存 dalvik 堆内存 不同手机中app进程的 jvm 堆内存是不同的 xff0c 因厂商在出厂设备时会自定义设置其峰值 比如 在Android Studio 创建模拟器时 xff0c 会设置jvm heap 默认384m
  • RabbitMq(一) RabbitMq工作模型

    RabbitMq工作模型 Mq基础RbbitMq工作模型RabbitMq基本使用原生apiSpring集成Springboot集成 RabbitMq进阶知识订单延迟关闭队列满了 总结 Mq基础 message queue 消息队列 特点 x