RabbitMQ狂神说笔记(RabbitMQ B站狂神说笔记、KuangStudy、学相伴飞哥)

2023-05-16

一、中间件

1. 什么是中间件

什么是中间件

我国企业从20世纪80年代开始就逐渐进行信息化建设,由于方法和体系的不成熟,以及企业业务的市场需求的不断变化,一个企业可能同时运行着多个不同的业务系统,这些系统可能基于不同的操作系统、不同的数据库、异构的网络环境。现在的问题是,如何把这些信息系统结合成一个有机地协同工作的整体,真正实现企业跨平台、分布式应用。中间件便是解决之道,它用自己的复杂换取了企业应用的简单。

中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件+平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和使用软件区分开来

为什么需要使用消息中间件

具体地说,中间件屏蔽了底层操作系统的复杂性,使程序开发人员面对一个简单而统一的开发环境,减少程序设计的复杂性,将注意力集中在自己的业务上,不必再为程序在不同系统软件上的移植而重复工作,从而大大减少了技术上的负担,中间件带给应用系统的,不只是开发的简便、开发周期的缩短,也减少了系统的维护、运行和管理的工作量,还减少了计算机总体费用的投入。

中间件特点

为解决分布异构问题,人们提出了中间件(middleware)的概念。中间件时位于平台(硬件和操作系统)和应用之间的通用服务,如下图所示,这些服务具有标准的程序接口和协议。针对不同请添加图片描述
的操作系统和硬件平台,它们可以有符合接口的协议规范的多种实现。

也很难给中间件一个严格的定义,但中间件应具有如下的一些特点:

(1)满足大量应用的需要

(2)运行于多种硬件和 OS平台

(3)支持分布计算,提供跨网络、硬件和 OS平台的透明性的应用或服务的交互

(4)支持标准的协议

(5)支持标准的接口

由于标准接口对于可移植性和标准协议对于互操作性的重要性,中间件已成为许多标准化工作的主要部分。对于应用软件开发,中间件远比操作系统和网络服务更为重要,中间件提供的程序接口定义了一个相对稳定的高层应用环境,不管底层的计算机硬件和系统软件怎样更新换代,只要将中间件升级更新,并保持中间件对外的接口定义不变,应用软件几乎不需任何修改,从而保护了企业在应用软件开发和维护中的重大投资。

简单说:中间件有个很大的特点,是脱离于具体设计目标,而具备提供普遍独立功能需求的模块。这使得中间件一定是可替换的。如果一个系统设计中,中间件时不可替代的,不是架构、框架设计有问题,那么就是这个中间件,在别处可能是个中间件,在这个系统内是引擎。

在项目中什么时候使用中间件技术

在项目的架构和重构中,使用任何技术和架构的改变我们都需要谨慎斟酌和思考,因为任何技术的融入和变化都可能人员,技术,和成本的增加,中间件的技术一般现在一些互联网公司或者项目中使用比较多,如果你仅仅还只是一个初创公司建议还是使用单体架构,最多加个缓存中间件即可,不要盲目追求新或者所谓的高性能,而追求的背后一定是业务的驱动和项目的驱动,因为一旦追求就意味着你的学习成本,公司的人员结构以及服务器成本,维护和运维的成本都会增加,所以需要谨慎选择和考虑。

但是作为一个开放人员,一定要有学习中间件技术的能力和思维,否则很容易当项目发展到一个阶段在去掌握估计或者在面试中提及,就会给自己带来不小的困扰,在当今这个时代这些技术也并不是什么新鲜的东西,如果去掌握和挖掘最关键的还是自己花时间和经历去探讨和研究。

2. 中间件技术及架构的概述

请添加图片描述

学习中间件的方式和技巧

  1. 理解中间件在项目架构中的作用,以及各中间件的底层实现
  2. 可以使用一些类比的生活概念去理解中间件
  3. 使用一些流程图或者脑图的方式去梳理各个中间件在架构中的作用
  4. 尝试用 java技术去实现中间件的原理
  5. 静下来去思考中间件在项目中设计的和使用的原因
  6. 如果找到对应的代替总结方案
  7. 尝试编写博文总结类同中间件技术的对比和使用场景
  8. 学会查看中间件的源码以及开源项目和博文

什么是消息中间件

在实际的项目中,大部分的企业项目开发中,在早起都采用的是单体的架构模式

请添加图片描述

单体架构

在企业开发当中,大部分的初期架构都采用的是单体架构的模式进行架构,而这种架构的典型的特点:就是把所有的业务和模块,源代码,静态资源文件等都放在一个工程中,如果其中的一个模块升级或者迭代发生一个很小的变动都会重新编译和重新部署项目。这种这狗存在的问题是:

  1. 耦合度太高
  2. 不易维护
  3. 服务器的成本高
  4. 以及升级架构的复杂度也会增大

这样就有后续的分布式架构系统。如下

分布式架构

请添加图片描述

何谓分布式系统:

通俗一点:就是一个请求由服务器端的多个服务(服务或者系统)协同处理完成

和单体架构不同的是,单体架构是一个请求发起 jvm调度线程(确切的是 tomcat线程池)分配线程 Thread来处理请求直到释放,而分布式系统是:一个请求时由多个系统共同来协同完成,jvm和环境都可能是独立。如果生活中的比喻的话,单体架构就像建设一个小房子很快就能够搞定,如果你要建设一个鸟巢或者大型的建筑,你就必须是各个环节的协同和分布,这样目的也是项目发展到后期的时候要去部署和思考的问题。我们也不难看出来:分布式架构系统存在的特点和问题如下:

存在问题:

  1. 学习成本高,技术栈过多
  2. 运维成本和服务器成本增高
  3. 人员的成本也会增高
  4. 项目的负载度也会上升
  5. 面临的错误和容错性也会成倍增加
  6. 占用的服务器端口和通讯的选择的成本高
  7. 安全性的考虑和因素逼迫可能选择 RMI/MQ相关的服务器端通讯

好处:

  1. 服务系统的独立,占用的服务器资源减少和占用的硬件成本减少,确切的说是:可以合理的分配服务资
  2. 源,不造成服务器资源的浪费
  3. 系统的独立维护和部署,耦合度降低,可插拔性
  4. 系统的架构和技术栈的选择可以变的灵活(而不是单纯地选择 java)
  5. 弹性的部署,不会造成平台因部署造成的瘫痪和停服的状态

3. 基于消息中间件的分布式系统的架构

请添加图片描述

从上图中可以看出来,消息中间件的是

  1. 利用可靠的消息传递机制进行系统和系统直接的通讯
  2. 通过提供消息传递和消息的派对机制,它可以在分布式系统环境下扩展进程间的通讯

消息中间件应用的场景

  1. 跨系统数据传递
  2. 高并发的流量削峰
  3. 数据的并发和异步处理
  4. 大数据分析与传递
  5. 分布式事务比如你有一个数据要进行迁移或者请求并发过多的时候,

比如你有10 W的并发请求下订单,我们可以在这些订单入库之前,我们可以把订单请求堆积到消息队列中,让它稳健可靠的入库和执行

请添加图片描述

常见的消息中间件

ActiveMQ、RabbitMQ、Kafka、RocketMQ等

消息中间件的本质及设计

它是一种接受数据、接受请求、存储数据、发送数据等功能的技术服务

MQ消息队列:负责数据的传接受,存储和传递,所以性能要高于普通服务和技术
请添加图片描述

谁来生产消息,存储消息和消费消息呢?请添加图片描述

消息中间件的核心组成部分

消息的协议
消息的持久化机制
消息的分发策略
消息的高可用,高可靠
消息的容错机制

小结

其实不论选择单体架构还是分布式架构都是项目开发的一个阶段,在什么阶段选择合适的架构方式,而不能盲目追求,最后造成的后果和问题都需要自己买单。但作为一个开发人员学习和探讨新的技术使我们每个程序开发者都应该去保持和思考的问题。当我们没办法去改变社会和世界的时候,我们为了生活和生存那就必须要迎合企业和市场的需求,发挥你的价值和所学的才能,创造价值和实现自我

4. 消息队列协议

什么是协议

请添加图片描述

所谓协议是指:

  1. 计算机底层操作系统和应用程序通讯时共同遵守的一组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流
  2. 和一般的网络应用程序的不同它主要负责数据的接受和传递,所以性能比较的高
  3. 协议对数据格式和计算机之间交换数据都必须严格遵守规范

网络协议的三要素

  1. 语法:语法是用户数据与控制信息的结构与格式,以及数据出现的顺序
  2. 语义:语义是解释控制信息每个部分的意义,它规定了需要发出何种控制信息,以及完成的动作与做出什么样的响应
  3. 时序:时序是对事件发生顺序的详细说明

比如我 MQ发送一个信息,是以什么数据格式发送到队列中,然后每个部分的含义是什么,发送完毕以后的执行的动作,以及消费者消费消息的动作,消费完毕的相应结构和反馈是什么,然后按照对应的执行顺序进行处理。如果你还是不理解:大家每天都在接触的 http请求协议:

  1. 语法:http规定了请求报文和响应报文的格式
  2. 语义:客户端主动发起请求称之为请求(这是一种定义,同时你发起的是 post/get请求)
  3. 时序:一个请求对应一个响应(一定先有请求在有响应,这个是时序)

而消息中间件采用的并不是 http协议,而常见的消息中间件协议有有:OpenWire、AMQP、MQTT、Kafka,OpenMessage协议

面试题:为什么消息中间件不直接使用 http协议

  1. 因为 http请求报文头和响应报文头是比较复杂的,包含了Cookie,数据的加密解密,窗台吗,响应码等附加的功能,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就行,一定要追求的是高性能。尽量简洁,快速
  2. 大部分情况下 http大部分都是短链接,在实际的交互过程中,一个请求到响应都很有可能会中断,中断以后就不会执行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取信息的过程,出现问题和故障要对数据或消息执行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行

AMQP协议

AMQP:(全称:Advanced Message Queuing Protocol)是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现由 RabbitMQ等

特性:

  1. 分布式事务支持
  2. 消息的持久化支持
  3. 高性能和高可靠的消息处理优势

请添加图片描述

MQTT协议

MQTT协议(Message Queueing Telemetry Transport)消息队列是 IBM开放的及时通讯协议,物联网系统架构中的重要组成部分

特点:

  1. 轻量
  2. 结构简单
  3. 传输快,不支持事务
  4. 没有持久化设计

应用场景:

  1. 适用于计算能力有限
  2. 低带宽
  3. 网络不稳定的场景

支持者:
请添加图片描述

OpenMessage协议

是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式信息中间件、流处理等领域的应用开发标准

特点:

  1. 结构简单
  2. 解析速度快
  3. 支持事务和持久化设计

Kafka协议

Kafka协议是基于 TCP/IP的二进制协议。消息内部是 通过长度来分割,由一些基本数据类型组成

特点:

  1. 结构简单
  2. 解析速度快
  3. 无事务支持
  4. 有持久化设计

小结

协议:实在 tcp/ip协议基础之上构建的一种约定俗称的规范和机制、它的主要目的可以让客户端(应用程序 java,go)进行沟通和通讯。并且这种写一下规范必须具有持久性,高可用,高可靠的性能

5. 消息队列持久化

持久化

简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存

请添加图片描述

常见的持久化方式

请添加图片描述

6. 消息的分发策略

消息的分发策略

MQ消息 队列有如下几个角色

  1. 生产者
  2. 存储消息
  3. 消费者

那么生产者生成消息以后,MQ进行存储,消费者是如何获取消息的呢?一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的 git就有推拉机制,我们发送的 http请求就是一种典型的拉取数据库数据返回的过程。而消息队列 MQ是一种推送的过程,而这些推机制会使用到很多的业务场景也有很多对应推机制策略

场景分析一

请添加图片描述

比如我在 APP上下了一个订单,我们的系统和服务很多,我们如何得知这个消息被哪个系统或者哪些服务器或者系统进行消费,那这个时候就需要一个分发的策略。这就需要消费策略。或者称之为消费的方法论

场景分析二

请添加图片描述

在发送消息的过程中可能会出现异常,或者网络的抖动,故障等等因为造成消息的无法消费,比如用户在下订单,消费 MQ接受,订单系统出现故障,导致用户支付失败,那么这个时候就需要消息中间件就必须支持消息重试机制策略。也就是支持:出现问题和故障的情况下,消息不丢失还可以进行重发
消息分发策略的机制和对比

请添加图片描述

7. 消息队列高可用和高可靠

什么是高可用机制

所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力

当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署,来达到高可用的目的

集群模式1 - Master-slave主从共享数据的部署方式

请添加图片描述

集群模式2 - Master-slave主从同步部署方式

请添加图片描述

解释:这种模式写入消息同样在 Master主节点上,但是主节点会同步数据到 slave节点形成副本,和 zookeeper或者 redis主从机制很雷同。这样可以达到负载均衡的效果,如果消费者有多个这样就可以去不同的节点进行消费,以为消息的拷贝和同步会占用很大的带宽和网络资源。在后去的 rabbitmq中会有使用

集群模式3 - 多主集群同步部署模式

请添加图片描述

解释:和上面的区别不是特别的大,但是它的写入可以往任意节点去写入

集群模式4 - 多主集群转发部署模式

请添加图片描述

解释:如果你插入的数据是 broker-1中国,元数据信息会存储数据的相关描述和记录存放的位置(队列)。它会对描述信息也就是元数据信息进行同步,如果消费者在 broker-2中进行消费,发现自己节点没有对应的信息,可以从对应的元数据信息中去查询,然后返回对应的消息信息,场景:比如买火车票或者黄牛买演唱会门票,比如第一个黄牛有顾客说要买的演唱会门票,但是没有但是他回去联系其他的黄牛询问,如果有就返回

集群模式5 Master-slave与 Broker-cluster组合的方案

请添加图片描述

解释:实现多主多从的热备机制来完成消息的高可用以及数据的热备机制,在生产规模达到一定的阶段的时候,这种使用的频率比较高

什么是高可靠机制

所谓高可靠是指:系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠

在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的

如何保证中间件消息的可靠性呢,可以从两个方面考虑:

  1. 消息的传输:通过协议来保证系统间数据解析的正确性
  2. 消息的存储区可靠:通过持久化来保证消息的可靠性

二、入门及安装

1. RabbitMQ入门及安装

https://www.bilibili.com/video/BV1dX4y1V73G?p=27

01 概述

简单概述:

RabbitMQ是一个开源的遵循 AMQP协议实现的基于 Erlang语言编写,支持多种客户端(语言),用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征


我作为读者这里补充下,个人觉得B站飞哥关于安装过程讲的比较模糊,略过很多内容。关于’RabbitMQ’和’RabbitMQ+docker’的安装过程,我自己整理了一下,其他同为读者的你们不妨参考:

Rabbit安装教程
RabbitMQ+docker安装教程


02下载RabbitMQ

请添加图片描述

  1. 下载地址:https://www.rabbitmq.com/download.html
  2. 环境准备:CentOS7.x + /Erlang

RabbitMQ是采用 Erlang语言开发的,所以系统环境必须提供 Erlang环境,第一步就是安装 Erlang

请添加图片描述

03 安装Erlang

查看系统版本号

请添加图片描述

安装下载

mkdir -p /usr/rabbitmq
ca /usr//rabbitmq
# 将安装包上传到linux系统中
erlang-22.0.7-1.el7.x86_64.rpm
rabbitmq-server-3.7.18-1.el7.noarch.rpm

rpm -Uvh erlang-solutions-2.0-1.noarch.rpm
yum install -y erlang
erl -v

04 安装socat

安装下载

yum install -y socat

05 安装rabbitmq

请添加图片描述

安装下载

rpm -Uvh rabbitmq-server-3.7.18-1.el7.noarch.rpm
yum install rabbitmq-server -y

启动服务

# 启动服务
systemctl start rabbitmq-server
# 查看服务状态,如图
systemctl status rabbitmq-server.service
# 开机自启动
systemctl enable rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server

请添加图片描述

2. RabbitMQWeb管理界面及授权操作

01 RabbitMQ管理界面

默认情况下,是没有安装web端的客户端插件,需要安装才可以生效

rabbitmq-plugins enable rabbitmq_management

说明:rabbitmq有一个默认账号和密码是:guest默认情况只能在 localhost本计下访问,所以需要添加一个远程登录的用户

安装完毕以后,重启服务即可

systemctl restart rabbitmq-server

一定要记住,在对应服务器(阿里云,腾讯云等)的安全组中开放15672端口

在浏览器访问

请添加图片描述

# 10.关闭防火墙服务
systemctl disable firewalld
Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
systemctl stop firewalld   
# 11.访问web管理界面
http://10.15.0.8:15672/

02 授权账号和密码

新增用户

rabbitmqctl add_user admin admin

设置用户分配操作权限

rabbitmqctl set_user_tags admin administrator

用户级别:

  1. administrator:可以登录控制台、查看所有信息、可以对 rabbitmq进行管理
  2. monitoring:监控者 登录控制台,查看所有信息
  3. policymaker:策略制定者 登录控制台,指定策略
  4. managment 普通管理员 登录控制台

为用户添加资源权限

rabbitmqctl set_permissions -p / admin ".*"".*"".*"

网页登录成功

请添加图片描述

03小结:

请添加图片描述

3. RabbitMQ之Docker安装

01 Dokcer安装RabbitMQ

虚拟化容器技术 - Docker的安装

请添加图片描述

docker的相关命令

请添加图片描述

安装rabbitmq

请添加图片描述

可以直接走图中代码,不用走下面两项!

获取rabbit镜像

docker pull rabbitmq:management

创建并运行容器

docker run -id --name=myrabbit -p 15672:15672 rabbitmq:management
--hostname:指定容器主机名称
--name:指定容器名称
-p:将mq端口号映射到本地
或者运行时设置用户和密码

请添加图片描述

启动

请添加图片描述

访问网页,访问成功!

4. RabbitMQ的角色分类

请添加图片描述

请添加图片描述

三、入门案例

1. RabbitMQ入门案例 - Simple 简单模式

https://www.bilibili.com/video/BV1dX4y1V73G?p=44 实现步骤

  1. jdk1.8
  2. 构建一个 maven工程
  3. 导入 rabbitmq的 maven依赖
  4. 启动 rabbitmq-server服务
  5. 定义生产者
  6. 定义消费者
  7. 观察消息的在 rabbitmq-server服务中的进程

01 构建一个maven工程

请添加图片描述

02 导入依赖

java原生依赖


<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

03 第一种模型

请添加图片描述

在上图的模型中,有以下概念:

  1. 生产者,也就是要发送消息的程序
  2. 消费者:消息的接受者,会一直等待消息到来。
  3. 消息队列:图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

生产者

package com.chen.rabbitmq.simple;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
 
/**
 * @description: 简单模式Simple
 */
public class Producer {
 
 
    public static void main(String[] args) {
 
        // 所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
        // ip port
 
        // 1: 创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("128.197.157.151");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");//rabbitmq登录的账号
        connectionFactory.setPassword("admin");//rabbitmq登录的密码
        connectionFactory.setVirtualHost("/");
 
        //springboot ---rabbitmq
 
        Connection connection = null;
        Channel channel = null;
        try {
            // 2: 创建连接Connection Rabbitmq为什么是基于channel去处理而不是链接? 长连接----信道channel
            connection = connectionFactory.newConnection("生产者");
            // 3: 通过连接获取通道Channel
            channel = connection.createChannel();
            // 4: 通过通创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
            String queueName = "queue1";
 
            /*
             * @params1 队列的名称
             * @params2 是否要持久化durable=false 所谓持久化消息是否存盘,如果false 非持久化 true是持久化? 非持久化会存盘吗? 会存盘,但是会随从重启服务会丢失。
             * @params3 排他性,是否是独占独立
             * @params4 是否自动删除,随着最后一个消费者消息完毕消息以后是否把队列自动删除
             * @params5 携带附属参数
             */
            channel.queueDeclare(queueName, true, false, false, null);
            // 5: 准备消息内容
            String message = "Hello chenjinxian!!!";
            // 6: 发送消息给队列queue
            // @params1: 交换机  @params2 队列、路由key @params 消息的状态控制  @params4 消息主题
            // 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机。
            channel.basicPublish("", queueName, null, message.getBytes());
 
            System.out.println("消息发送成功!!!");
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            // 7: 关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            // 8: 关闭连接
 
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
 
 
    }
}

消费者

package com.chen.rabbitmq.simple;
 
import com.rabbitmq.client.*;
import java.io.IOException;
 
public class Consumer {
 
 
    public static void main(String[] args) {
 
        // 所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
        // ip port
 
        // 1: 创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("128.197.157.151");//服务器IP
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
 
        Connection connection = null;
        Channel channel = null;
        try {
            // 2: 创建连接Connection
            connection = connectionFactory.newConnection("消费者");
            // 3: 通过连接获取通道Channel
            channel = connection.createChannel();
            // 4: 通过通创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
 
 
            // true = ack 正常的逻辑是没问题 死循环 rabbit 重发策略
            // false = nack 消息这在消费消息的时候可能会异常和故障
            final  Channel channel2 = channel;
            channel2.basicConsume("queue1", false, new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    try {
                        System.out.println("收到消息是" + new String(message.getBody(), "UTF-8"));
                        channel2.basicAck(message.getEnvelope().getDeliveryTag(),false);
                    }catch (Exception ex){
                        ex.printStackTrace();
                        // 三次确认 -- reject + sixin
                    }
 
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                    System.out.println("接受失败了...");
                }
            });
 
            System.out.println("开始接受消息");
            System.in.read();
 
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            // 7: 关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            // 8: 关闭连接
 
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
 
 
    }
}

2. 什么是AMQP

01 什么是AMQP

AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计

02 AMQP生产者流转过程

请添加图片描述

03 AMQP消费者流转过程

请添加图片描述

3. RabbitMQ的核心组成部分

01 RabbitMQ的核心组成部分

请添加图片描述

请添加图片描述

02 RabbitMQ整体架构是什么样子的?

请添加图片描述

03 RabbitMQ的运行流程

请添加图片描述

04 RabbitMQ支持的消息模型

请添加图片描述

请添加图片描述

  1. 简单模式 Simple
  2. 工作模式 Work
  3. 发布订阅模式
  4. 路由模式
  5. 主题 Topic模式
  6. 参数模式

4. RabbitMQ入门案例 - fanout 模式

01 RabbitMQ的模式之发布订阅模式

图解

请添加图片描述

发布订阅模式的具体实现

  1. web操作查看视频
  2. 类型:fanout
  3. 特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式

生产者

package com.chen.rabbitmq.fanout;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
/**
 发布订阅模式的具体实现
 类型:fanout
 特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
 */
public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("128.156.157.161");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
 
            // 5: 准备发送消息的内容
            String message = "hello xuexi!!!";
 
            // 6:准备交换机
            String exchangeName = "fanout_change";
 
            // 8: 指定交换机的类型
            String type = "fanout";
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routingkey
            // @params3: 属性配置
            // @params4: 发送消息的内容
            //  #.course.* queue3
            // *.order.# queue2 ta
            // com.order.course.xxx collecion
            channel.basicPublish(exchangeName,"", null, message.getBytes());
 
 
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
 
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消费者

package com.chen.rabbitmq.fanout;
 
import com.rabbitmq.client.*;
 
import java.io.IOException;
 
/**
 发布订阅模式的具体实现
 类型:fanout
 特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
 */
public class Consumer {
 
    private static Runnable runnable = new Runnable() {
        public void run() {
            // 1: 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 2: 设置连接属性
            connectionFactory.setHost("128.156.157.151");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            //获取队列的名称
            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 3: 从连接工厂中获取连接
                connection = connectionFactory.newConnection("生产者");
                // 4: 从连接中获取通道channel
                channel = connection.createChannel();
                // 5: 申明队列queue存储消息
                /*
                 *  如果队列不存在,则会创建
                 *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                 *
                 *  @params1: queue 队列的名称
                 *  @params2: durable 队列是否持久化
                 *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                 *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
                 *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
                 * */
                // 这里如果queue已经被创建过一次了,可以不需要定义
                //channel.queueDeclare("queue1", false, false, false, null);
                // 6: 定义接受消息的回调
                Channel finalChannel = channel;
                finalChannel.basicConsume(queueName, true, new DeliverCallback() {
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException {
                        System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String s) throws IOException {
                    }
                });
                System.out.println(queueName + ":开始接受消息");
                System.in.read();
            } catch (Exception ex) {
                ex.printStackTrace();
                System.out.println("发送消息出现异常...");
            } finally {
                // 7: 释放连接关闭通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    };
 
 
 
    public static void main(String[] args) {
        // 启动三个线程去执行
        new Thread(runnable, "queue1").start();
        new Thread(runnable, "queue2").start();
        new Thread(runnable, "queue3").start();
        new Thread(runnable, "queue4").start();
        //new Thread(runnable, "queue5").start();
    }
}

请添加图片描述

此处没有通过代码去绑定交换机和队列,而是通过可视化界面去绑定的!

5. RabbitMQ入门案例 - Direct 模式

生产者

package com.chen.rabbitmq.routing;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
/**
 Direct 模式
 */
public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("128.176.157.151");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
 
            // 5: 准备发送消息的内容
            String message = "hello direct_exchange!!!";
 
            // 6:准备交换机
            String exchangeName = "direct_exchange";
            // 7: 定义路由key
            String routeKey = "email";
            // 8: 指定交换机的类型
            String type = "direct";
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routingkey
            // @params3: 属性配置
            // @params4: 发送消息的内容
            //  #.course.* queue3
            // *.order.# queue2 ta
            // com.order.course.xxx collecion
            channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消费者

package com.chen.rabbitmq.routing;
 
import com.rabbitmq.client.*;
 
import java.io.IOException;
 
/**
 Direct 模式
 */
public class Consumer {
 
    private static Runnable runnable = new Runnable() {
        public void run() {
            // 1: 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 2: 设置连接属性
            connectionFactory.setHost("123.156.147.151");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            //获取队列的名称
            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 3: 从连接工厂中获取连接
                connection = connectionFactory.newConnection("生产者");
                // 4: 从连接中获取通道channel
                channel = connection.createChannel();
                // 5: 申明队列queue存储消息
                /*
                 *  如果队列不存在,则会创建
                 *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                 *
                 *  @params1: queue 队列的名称
                 *  @params2: durable 队列是否持久化
                 *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                 *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
                 *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
                 * */
                // 这里如果queue已经被创建过一次了,可以不需要定义
                //channel.queueDeclare("queue1", false, false, false, null);
                // 6: 定义接受消息的回调
                Channel finalChannel = channel;
                finalChannel.basicConsume(queueName, true, new DeliverCallback() {
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException {
                        System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String s) throws IOException {
                    }
                });
                System.out.println(queueName + ":开始接受消息");
                System.in.read();
            } catch (Exception ex) {
                ex.printStackTrace();
                System.out.println("发送消息出现异常...");
            } finally {
                // 7: 释放连接关闭通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    };
 
 
 
    public static void main(String[] args) {
        // 启动三个线程去执行
        new Thread(runnable, "queue1").start();
        new Thread(runnable, "queue2").start();
        new Thread(runnable, "queue3").start();
        new Thread(runnable, "queue4").start();
      //  new Thread(runnable, "queue5").start();
    }
}

6. RabbitMQ入门案例 - Topic 模式

请添加图片描述

生产者

package com.chen.rabbitmq.topics;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
/**
 Topic模式
 */
public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("125.156.157.151");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
 
            // 5: 准备发送消息的内容
            String message = "hello topic_exchange!!!";
 
            // 6:准备交换机
            String exchangeName = "topic_exchange";
            // 7: 定义路由key
            String routeKey = "com.order.user";
            // 8: 指定交换机的类型
            String type = "topic";
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routingkey
            // @params3: 属性配置
            // @params4: 发送消息的内容
            //  #.course.* queue3
            // *.order.# queue2 ta
            // com.order.course.xxx collecion
            channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消费者

package com.chen.rabbitmq.all;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
/**
完整案例
 */
public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("151.156.157.151");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("chenjinxian");
        connectionFactory.setPassword("chenjinxian");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 6: 准备发送消息的内容
            String message = " 你好,小白";
            // 交换机
            String  exchangeName = "direct_message_exchange";
            // 交换机的类型 direct/topic/fanout/headers
            String exchangeType = "direct";
 
            // 如果你用界面把queueu 和 exchange的关系先绑定话,你代码就不需要在编写这些声明代码可以让代码变得更加简洁,但是不容读懂
            // 如果用代码的方式去声明,我们要学习一下
            // 7: 声明交换机 所谓的持久化就是指,交换机会不会随着服务器重启造成丢失,如果是true代表不丢失,false重启就会丢失
            channel.exchangeDeclare(exchangeName,exchangeType,true);
 
            // 8: 声明队列
            channel.queueDeclare("queue5",true,false,false,null);
            channel.queueDeclare("queue6",true,false,false,null);
            channel.queueDeclare("queue7",true,false,false,null);
 
            // 9:绑定队列和交换机的关系
            channel.queueBind("queue5",exchangeName,"order");
            channel.queueBind("queue6",exchangeName,"order");
            channel.queueBind("queue7",exchangeName,"course");
 
            channel.basicPublish(exchangeName, "course", null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

7. RabbitMQ入门案例 - Work模式

01 Work模式轮询模式(Round-Robin)

图解

请添加图片描述

当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?

主要有两种模式:

  1. 轮询模式的分发:一个消费者一条,按均分配
  2. 公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配

生产者

跟简单模式一样!

消费者

创建两个一样的!

02 Work模式公平分发模式

生产者

package com.chen.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 轮询模式
 */
public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("123.156.147.151");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("chenjinxian");
        connectionFactory.setPassword("chenjinxian");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 6: 准备发送消息的内容
            //===============================end topic模式==================================
            for (int i = 1; i <= 20; i++) {
                //消息的内容
                String msg = "学相伴:" + i;
                // 7: 发送消息给中间件rabbitmq-server
                // @params1: 交换机exchange
                // @params2: 队列名称/routingkey
                // @params3: 属性配置
                // @params4: 发送消息的内容
                channel.basicPublish("", "queue1", null, msg.getBytes());
            }
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

跟简单模式一样!

消费者

package com.chen.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 轮询模式
 */
public class Work1 {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("123.156.147.155");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("chenjinxian");
        connectionFactory.setPassword("chenjinxian");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者-Work1");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            // 这里如果queue已经被创建过一次了,可以不需要定义
//            channel.queueDeclare("queue1", false, false, false, null);
            // 同一时刻,服务器只会推送一条消息给消费者
            // 6: 定义接受消息的回调
            Channel finalChannel = channel;
            //finalChannel.basicQos(1);
 
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    try{
                        System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(200);
                    }catch(Exception ex){
                        ex.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                }
            });
            System.out.println("Work1-开始接受消息");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

创建两个一样的!

8. RabbitMQ使用场景

01 解耦、削峰、异步

同步异步的问题(串行)

串行方式:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端

请添加图片描述

public void makeOrder(){
    //1.发送订单
    //2.发送短信服务
    //3.发送email服务
    //4.发送app服务
}

并行方式 异步线程池

并行方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

请添加图片描述

public void test(){
    //异步
    theadpool.submit(new Callable<Object>{
        //1.发送短信服务
    })
    //异步
    theadpool.submit(new Callable<Object>{
        //2.
    })
    //异步
    theadpool.submit(new Callable<Object>{
        //3.
    })
    //异步
    theadpool.submit(new Callable<Object>{
        //4.
    })
}

存在问题

  1. 耦合度高
  2. 需要自己写线程池自己维护成本太高
  3. 出现了消息可能会丢失,需要你自己做消息补偿
  4. 如何保证消息的可靠性你自己写
  5. 如果服务器承载不了,你需要自己去写高可用

异步消息队列的方式

请添加图片描述

好处:

  1. 完全解耦,用 MQ建立桥接
  2. 有独立的线程池和运行模型
  3. 出现了消息可能会丢失,MQ有持久化功能
  4. 如何保证消息的可靠性,死信队列和消息转移等
  5. 如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用

按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍

02 高内聚,低耦合

请添加图片描述

好处:

  1. 完全解耦,用 MQ建立桥接
  2. 有独立的线程池和运行模型
  3. 出现了消息可能会丢失,MQ有持久化功能
  4. 如何保证消息的可靠性,死信队列和消息转移等
  5. 如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用

按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍

四、Springboot案例

1. Fanout 模式

https://www.bilibili.com/video/BV1dX4y1V73G?p=44

生产者

application.yml

# 服务端口
server:
  port: 8080
# 配置rabbitmq服务
spring:
	rabbitmq:
		username: admin
		password: admin
		virtual-host: /
		host: 127.0.0.1
		port: 5672

OrderService.java



public class OrderService{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //模拟用户下单
    public void makeOrder(String userid,String productid,int num){
        //1.根据商品id查询库存是否足够
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        sout("订单生产成功:"+orderId);
        //3.通过MQ来完成消息的分发
        //参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
        String exchangeName = "fanout_order_exchange";
        String routingKey = "";
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
    }
}

消费者

application.yml

# 服务端口
server:
  port: 8080
# 配置rabbitmq服务
spring:
	rabbitmq:
		username: admin
		password: admin
		virtual-host: /
		host: 127.0.0.1
		port: 5672

RabbitMqConfiguration.java

@Configuration
public class RabbitMqConfiguration{
    //1.声明注册fanout模式的交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout_order_exchange",true,false);
    }
    //2.声明队列
    @Bean
    public Queue smsQueue(){
        return new Queue("sms.fanout.queue",true);
    }
    @Bean
    public Queue duanxinQueue(){
        return new Queue("duanxin.fanout.queue",true);
    }
    @Bean
    public Queue emailQueue(){
        return new Queue("email.fanout.queue",true);
    }
    //3.完成绑定关系
    @Bean
    public Binding smsBingding(){
        return BindingBuilder.bin(smsQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding duanxinBingding(){
        return BindingBuilder.bin(duanxinQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding emailBingding(){
        return BindingBuilder.bin(emailQueue()).to(fanoutExchange());
    }
}

FanoutSmsConsumer.java

@Component
@RabbitListener(queue = {"sms.direct.queue"})
public class FanoutSmsConsumer{
    @RabbitHandler
    public void reviceMessage(String message){
        sout("sms接收到了的订单信息是:"+message);
    }
}

FanoutDuanxinConsumer.java

@Component
@RabbitListener(queue = {"duanxin.direct.queue"})
public class FanoutDuanxinConsumer{
    @RabbitHandler
    public void reviceMessage(String message){
        sout("duanxin接收到了的订单信息是:"+message);
    }
}

FanoutEmailConsumer.java

@Component
@RabbitListener(queue = {"duanxin.direct.queue"})
public class FanoutEmailConsumer{
    @RabbitHandler
    public void reviceMessage(String message){
        sout("email接收到了的订单信息是:"+message);
    }
}

2. Direct 模式

生产者

OrderService.java

public class OrderService{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //模拟用户下单
    public void makeOrder(String userid,String productid,int num){
        //1.根据商品id查询库存是否足够
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        sout("订单生产成功:"+orderId);
        //3.通过MQ来完成消息的分发
        //参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
        String exchangeName = "direct_order_exchange";
        String routingKey = "";
        rabbitTemplate.convertAndSend(exchangeName,"email",orderId);
        rabbitTemplate.convertAndSend(exchangeName,"duanxin",orderId);
    }
}

消费者

RabbitMqConfiguration.java

@Configuration
public class RabbitMqConfiguration{
    //1.声明注册fanout模式的交换机
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("direct_order_exchange",true,false);
    }
    //2.声明队列
    @Bean
    public Queue smsQueue(){
        return new Queue("sms.direct.queue",true);
    }
    @Bean
    public Queue duanxinQueue(){
        return new Queue("duanxin.direct.queue",true);
    }
    @Bean
    public Queue emailQueue(){
        return new Queue("email.direct.queue",true);
    }
    //3.完成绑定关系
    @Bean
    public Binding smsBingding(){
        return BindingBuilder.bin(smsQueue()).to(fanoutExchange()).with("sms");
    }
    @Bean
    public Binding duanxinBingding(){
        return BindingBuilder.bin(duanxinQueue()).to(fanoutExchange()).with("duanxin");
    }
    @Bean
    public Binding emailBingding(){
        return BindingBuilder.bin(emailQueue()).to(fanoutExchange()).with("email");
    }
}

3. Topic 模式

生产者

public class OrderService{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //模拟用户下单
    public void makeOrder(String userid,String productid,int num){
        //1.根据商品id查询库存是否足够
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        sout("订单生产成功:"+orderId);
        //3.通过MQ来完成消息的分发
        //参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
        String exchangeName = "direct_order_exchange";
        String routingKey = "com.duanxin";
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
    }
}

消费者(采用注解)

FanoutSmsConsumer.java

@Component
@RabbitListener(bindings = @QueueBinding(
	value = @Queue(value = "sms.topic.queue",durable = "true",antoDelete = "false"),
    exchange = @Exchange(value = "topic_order_exchange",type = "ExchangeTypes.TOPIC")
    key = "#.sms.#"
))
public class TopicSmsConsumer{
    @RabbitHandler
    public void reviceMessage(String message){
        sout("sms接收到了的订单信息是:"+message);
    }
}

FanoutDuanxinConsumer.java

@Component
@RabbitListener(bindings = @QueueBinding(
	value = @Queue(value = "duanxin.topic.queue",durable = "true",antoDelete = "false"),
    exchange = @Exchange(value = "topic_order_exchange",type = "ExchangeTypes.TOPIC")
    key = "#.duanxin.#"
))
public classTopicDuanxinConsumer{
    @RabbitHandler
    public void reviceMessage(String message){
        sout("duanxin接收到了的订单信息是:"+message);
    }
}

FanoutEmailConsumer.java

@Component
@RabbitListener(bindings = @QueueBinding(
	value = @Queue(value = "email.topic.queue",durable = "true",antoDelete = "false"),
    exchange = @Exchange(value = "topic_order_exchange",type = "ExchangeTypes.TOPIC")
    key = "#.email.#"
))
public class TopicEmailConsumer{
    @RabbitHandler
    public void reviceMessage(String message){
        sout("email接收到了的订单信息是:"+message);
    }
}

五、RabbitMQ高级

1. 过期时间TTL

https://www.bilibili.com/video/BV1dX4y1V73G?p=44

概述

过期时间 TTl表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置 TTL,目前有两种方法可以设置

  1. 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间
  2. 第二种方法是对消息进行单独设置,每条消息 TTL可以不同

如果上述两种方法同时使用,则消息的过期时间以两者 TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的 TTL值,就称为 dead message被投递到死信队列,消费者将无法再收到该消息

设置队列TTL

RabbitMqConfiguration.java

@Configuration
public class TTLRabbitMQConfiguration{
    //1.声明注册direct模式的交换机
    @Bean
    public DirectExchange ttldirectExchange(){
        return new DirectExchange("ttl_direct_exchange",true,false);}
    //2.队列的过期时间
    @Bean
    public Queue directttlQueue(){
        //设置过期时间
        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);//这里一定是int类型
        return new Queue("ttl.direct.queue",true,false,false,args);}
    
    @Bean
    public Binding ttlBingding(){
        return BindingBuilder.bin(directttlQueue()).to(ttldirectExchange()).with("ttl");
    }
}

设置消息TTL

public class OrderService{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //模拟用户下单
    public void makeOrder(String userid,String productid,int num){
        //1.根据商品id查询库存是否足够
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        sout("订单生产成功:"+orderId);
        //3.通过MQ来完成消息的分发
        //参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
        String exchangeName = "ttl_order_exchange";
        String routingKey = "ttlmessage";
        //给消息设置过期时间
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
            public Message postProcessMessage(Message message){
                //这里就是字符串
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        }
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId,messagePostProcessor);
    }
}

RabbitMqConfiguration.java

@Configuration
public class TTLRabbitMQConfiguration{
    //1.声明注册direct模式的交换机
    @Bean
    public DirectExchange ttldirectExchange(){
        return new DirectExchange("ttl_direct_exchange",true,false);}
    //2.队列的过期时间
    @Bean
    public Queue directttlQueue(){
        //设置过期时间
        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);//这里一定是int类型
        return new Queue("ttl.direct.queue",true,false,false,args);}
    @Bean
    public Queue directttlMessageQueue(){
        return new Queue("ttlMessage.direct.queue",true,false,false,args);}
    
    @Bean
    public Binding ttlBingding(){
        return BindingBuilder.bin(directttlMessageQueue()).to(ttldirectExchange()).with("ttlmessage");
    }
}

2. 死信队列

概述

DLX,全称 Dead-Letter-Exchange,可以称之为死信交换机,也有人称之为死信邮箱。当消息再一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX的队列就称之为死信队列。消息变成死信,可能是由于以下原因:

  1. 消息被拒绝
  2. 消息过期
  3. 队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性,当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的 DLX上去,进而被路由到另一个队列,即死信队列。

要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange指定交换机即可

代码

DeadRabbitMqConfiguration.java

@Configuration
public class DeadRabbitMqConfiguration{
    //1.声明注册direct模式的交换机
    @Bean
    public DirectExchange deadDirect(){
        return new DirectExchange("dead_direct_exchange",true,false);}
    //2.队列的过期时间
    @Bean
    public Queue deadQueue(){
        return new Queue("dead.direct.queue",true);}
    @Bean
    public Binding deadbinds(){
        return BindingBuilder.bind(deadDirect()).to(deadQueue()).with("dead");
    }
}

RabbitMqConfiguration.java

@Configuration
public class TTLRabbitMQConfiguration{
    //1.声明注册direct模式的交换机
    @Bean
    public DirectExchange ttldirectExchange(){
        return new DirectExchange("ttl_direct_exchange",true,false);}
    //2.队列的过期时间
    @Bean
    public Queue directttlQueue(){
        //设置过期时间
        Map<String,Object> args = new HashMap<>();
        //args.put("x-max-length",5);
        args.put("x-message-ttl",5000);//这里一定是int类型
        args.put("x-dead-letter-exchange","dead_direct_exchange");
        args.put("x-dead-letter-routing-key","dead");//fanout不需要配置
        return new Queue("ttl.direct.queue",true,false,false,args);}
    @Bean
    public Queue directttlMessageQueue(){
        return new Queue("ttlMessage.direct.queue",true,false,false,args);}
    
    @Bean
    public Binding ttlBingding(){
        return BindingBuilder.bin(directttlMessageQueue()).to(ttldirectExchange()).with("ttlmessage");
    }
}

请添加图片描述

3. 内存磁盘的监控

01 RabbitMQ内存警告

请添加图片描述

02 RabbitMQ的内存控制

参考帮助文档:http://www.rabbbitmq.com/configure.html

当出现警告的时候,可以通过配置去修改和调整

命令的方式

rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB

fraction/value 为内存阈值。默认情况是:0.4/2GB,代表的含义是:当 RabbitMQ的内存超过40%时,就会产生警告并且会阻塞所有生产者的连接。通过此命令修改阈值在 Broker重启以后将会失效,通过修改配置文件设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启 Broker才会生效

请添加图片描述

配置文件方式 rabbitmq.conf

请添加图片描述

03 RabbitMQ的内存换页

请添加图片描述

04 RabbitMQ的磁盘预警

请添加图片描述

4. 集群

请添加图片描述

01 集群搭建

配置的前提是你的 rabbitmq可以运行起来,比如ps aix|grep rebbitmq你能看到相关进程,又比如运行rabbitmqct status你可以看到类似如下信息而不报错:

请添加图片描述

02 单机多实例搭建

请添加图片描述

启动第二个节点

请添加图片描述

验证启动

ps aux|grep rabbitmq

rabbit-1操作作为主节点

请添加图片描述

rabbit-2操作作为从节点

请添加图片描述

验证集群状态

请添加图片描述

Web监控

rabbitmq-plugins enable rabbitmq_management

请添加图片描述

小结

请添加图片描述

5. 分布式事务

01 简述

分布式事务指事务的操作位于不同的节点上,需要保证事务的ACID特性。

例如在下单场景下,库存和订单如果不在同一个节点上,就涉及分布式事务

02 分布式事务方式

在分布式系统中,要实现分布式事务,无外乎哪几种解决方案。

①两阶段提交(2PC)需要数据库严商

两阶段提交(Two-phase Commit,2PC),通过引协调者(coordinator)来协调参与者的行为,并最终决定这些参与者是否真正要执行事务。

准备阶段

协调者询问参与事务是否执行成功,参与者发回事务执行结果

请添加图片描述

提交阶段

如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务。
需要注意的是,在准备阶段,参与者执行了事务,但是还未提交。只有在提交阶段接收到协调者发来的通知后,才进行提交或者回滚。

请添加图片描述

存在的问题
  1. 同步阻塞所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其它操作。
  2. 单点问题协调者在2PC中起到非常大的作用,发生故障将会造成很大影响。特别是在阶段二发生故障,所有参与者会—直等待状态,无法完成其它操作。
  3. 数据不一致在阶段二,如果协调者只发送了部分Commit 消息,此时网络发生异常,那么只有部分参与者接收到Commit消息,也就是说只有部分参与者提交了事务,使得系统数据不一致。
  4. 太过保守任意一个节点失败就会导致整个事务失败,没有完善的容错机制。

②补偿事务(TCC)严选,阿里、蚂蚁金服

TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撒销)操作。它分为三个阶段:

  • Try阶段主要是对业务系统做检测及资源预留
  • Confirm阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行Confirm阶段时,默认—Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
  • Cancel阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

举个例子,假入Bob要向Smith转账,思路大概是:我们有一个本地方法,里面依次调用

  1. 首先在Try阶段,要先调用远程接口把Smith 和 Bob 的钱给冻结起来。
  2. 在 Confirm阶段,执行远程调用的转账的操作,转账成功进行解冻。
  3. 如果第2步执行成功,那么转账成功,如果第二步执行失败,则调用远程冻结接口对应的解冻方法(Cancel)。

优点:跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些
缺点:缺点还是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。

③本地消息(异步确保)比如:支付宝、微信支付主动查询支付状态,对账单的形式

本地消息表与业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的操作满足事务特性,并且使用了消息队列来保证最终—致性。

  • 在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入本地消息表中。
  • 之后将本地消息表中的消息转发到Kafka等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发。
  • 在分布式事务操作的另一方从消息队列中读取一个消息,并执行消息中的操作。

请添加图片描述

优点:一种非常经典的实现,避免了分布式事务,实现了最终—致性。
缺点:消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。

④MQ事务消息,异步场景,通用性较强,拓展性较高。

有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如Kafka不支持。
以阿里的RabbitMQ中间件为例,其思路大致为:

  • 第一阶段Prepared消息,会拿到消息的地址。第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。
    也就是说在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了,RabbitMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RabbitMQ会根据发送端设置的第略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

请添加图片描述

优点:实现了最终一致性,不需要依赖本地数据库事务。
缺点:实现难度大,主流MQ不支持,RocketMQ事务消息部分代码也未开源。

⑤总结

通过本文我们总结并对比了几种分布式分解方案的优缺点,分布式事务本身是一个技术难题,是没有一种完美的方案应对所有场景的,具体还是要根据业务场景去抉择吧。阿里RocketMQ去实现的分布式事务,现在也有除了很多分布式事务的协调器,比如LCN等,大家可以多去尝试。

具体实现

分布式事务的完整架构图

请添加图片描述

请添加图片描述

①系统与系统之间的分布式事问题

请添加图片描述

②系统间调用过程中事务回滚问题

package com.xuexiangban .rabbitmq.service;2.
import com.xuexiangban.rabbitmq.dao.orderDataBaseService;
import com.xuexiangban.rabbitmq.pojo.Order;
import org.springframework.beans.factory .annotation.Autowired;
import org.springframework.http.client.SimpleclientHttpRequestFactory;
import org.springframework.stereotype. Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;

public class OrderService {
	@Autowired
	private OrderDataBaseService orderDataBaseservice;
	//创建订单
	@Transactional(rollbackFor = Exception.class)//订单创建整个方法添加事务
	public void createOrder(Order orderInfo) throws Exception {
		// 1:订单信息--插入丁订单系统,订单数据库事务orderDataBaseService.saveOrder(orderInfo);
		//2∶通通Http接口发途订单信息到运单系统
		String result = dispatchHttpApi(orderInfo.getorderId());
        	if( !"success".equals(result)) {
			throw new Exception("订单创建失败,原因是运单接口调用失败!");
		}
	}
	/**
	* 模拟http请求接口发途,运单系统,将订单号传过去 springcloud
	*/
	private String dispatchHttpApi(String orderId){
		SimpleclientHttpRehyuestFactory factory - new SimpleClientHttpRequestFactory();
                //链接超时>3秒
		factory .setConnectTimeout ( 300e) ;
		//处理超时>2秒
		 factory .setReadTimeout ( 2000) ;
                //发送http请求
		String url = "http: / /localhost:9000/dispatch/order?orderId="+orderId;
                RestTemplate restTemplate = new RestTemplate(factory);//异常
		String result = restTemplate.getForobject(url,string.class);
                return result;
    }
}

③基于MQ的分布式事务整体设计思路

请添加图片描述

④基于MQ的分布式事务消息的可靠生产问题-定时重发

请添加图片描述

如果这个时候MQ服务器出现了异常和故障,那么消息是无法获取到回执信息。怎么解决呢?

请添加图片描述

⑤基于MQ的分布式事务消息的可靠消费

请添加图片描述

⑥基于MQ的分布式事务消息的消息重发

请添加图片描述

解决消息重试的集中方案

  1. 控制重发的次数
  2. try+catch+手动ack
  3. try+catch+手动ack +死信队列处理

⑦基于MQ的分布式事务消息的死信队列消息转移+人工处理

请添加图片描述

如果死信队列报错就进行人工处理

请添加图片描述

⑧基于MQ的分布式事务消息的死信队列消息重试注意事项

⑨基于MQ的分布式事务消息的定式重发

总结

①基于MQ的分布式事务解决方案优点:

  1. 通用性强
  2. 拓展方便
  3. 耦合度低,方案也比较成熟

②基于MQ的分布式事务解决方案缺点:

  1. 基于消息中间件,只适合异步场景
  2. 消息会延迟处理,需要业务上能够容忍

③建议

  1. 尽量去避免分布式事务
  2. 尽量将非核心业务做成2

笔记来源

本文来自B站狂神说的视频中的markdown文件,由于格式问题,作者对其进行整理并写在博客中。其中下载来源如下:
RabbitMQ笔记

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ狂神说笔记(RabbitMQ B站狂神说笔记、KuangStudy、学相伴飞哥) 的相关文章

  • Linux find命令详解

    find命令 xff1a 快速查找数据信息 find 查找范围 条件01 a 条件02 a 条件03 根据多个多个条件查找数据 xff08 多个条件是并且关系 xff09 find 查找范围 条件01 o 条件02 o 条件03 根据多个多
  • TIME_WAIT过多故障,如何解决?

    1 time wait的作用 1 xff09 可靠地实现TCP全双工连接的终止 在进行关闭连接四次挥手协议时 xff0c 最后的ACK是由主动关闭端发出的 xff0c 如果这个最终的ACK丢失 xff0c 服务器将重发最终的FIN xff0
  • 作业管理-----操作系统

    浅谈作业管理 摘要 作业的概念及作业的提交方式 xff1a 作业是用户在一次解题或事务处理过程中要求计算机所作工作的集合 一个作业进入系统到运行结束 xff0c 一般要经历 后备 执行 完成 三种状态 为了管理和调度作业 xff0c 系统为
  • python network

    call the sync function after 2 seconds t 61 threading Timer 2 self sync t start 为什么TCP传输需要编码 send can not send a string
  • 多生产者---多消费者问题(PV操作的简单例题)

    多生产者 多消费者问题 xff08 PV操作的简单例题 xff09 在引入生产者消费者问题前 xff0c 先要介绍几个知识点 1 临界区的相关问题 临界区是指每个进程访问临界资源的那段程序 临界资源是指每次只允许一个进程的访问的资源 2 P
  • 视频号视频文案怎么写,视频号第一条文案怎么写(新手必看)国仁网络资讯

    视频号文案怎么写 xff0c 为什么有些人在只做短视频的时候 xff0c 他的视频就会被很多人浏览 xff0c 观看次数都是上万 xff0c 甚至更多 xff0c 而有些人的短视频视频的播放量就不会有人问津 xff0c 我们在制作短视频或者
  • 操作系统的概念、四个特征以及os的发展和分类

    计算机系统概述 1 操作系统的概念2 操作系统的四个特征3 操作系统的发展与分类 1 操作系统的概念 操作系统 xff08 Operating System xff0c OS xff09 是指控制和管理整个计算机系统的硬件和软件资源 xff
  • 【编程算法】跳跃游戏ⅠⅡⅢ(Python解法)

    写这篇文章源于之前4 10做的字节跳动的笔试 xff0c 第二道编程题就是跳跃游戏类 xff0c 可以说和牛客或者力扣上边的解题做法是完全一样的 xff0c 可惜当时我才刚开始学习算法 深入了解该类型后发现真的很有意思 xff0c 这篇文章
  • 【Java开发】 Spring 10 :Spring Boot 自动配置原理及实现

    用了这么久的 SpringBoot xff0c 我们再来回顾一下它 xff0c 本文介绍 Spring Boot 的自动配置 xff0c 这是它区别于 Spring 的最大的点 xff0c 本文的自动配置项目包含三个项目 xff0c 建议拉
  • MyBatis与MyBatisPlus的区别

    一 MyBatis Plus简介 1 1 什么是mybatis plus MyBatis Plus xff08 简称 MP xff09 是一个 MyBatis 的增强工具 xff0c 在 MyBatis 的基础上只做增强不做改变 xff0c
  • Spring 笔记

    Spring 笔记 1 Spring xff08 2021 1 27 xff09 1 1 简介 Spring xff1a 春天 gt 给软件行业带来了春天 xff01 2002 xff0c 首次推出了Spring框架的雏形 xff1a in
  • 妙用shell脚本画图形

    妙用shell脚本画图形 目录 妙用shell脚本画图形一 99乘法表二 输出1条直线三 画矩形四 左边直角三角形五 右侧直角三角形六 等腰三角形七 平行四边形八 梯形九 菱形 一 99乘法表 展示一 xff1a 展示二 xff1a 二 输
  • 搭建LNMP基础框架

    目录 一 编译安装Nginx服务二 编译安装MySQL服务三 编译安装PHP服务四 部署Discuz xff0c 社区论坛Web应用 一 编译安装Nginx服务 1 关闭防火墙 xff0c 将安装Nginx所需软件包传到 opt目录下 sy
  • 银河麒麟4.0.2二进制安装mysql5.7

    先查看银河麒麟的版本 root 64 idiom kylin1 cat etc kylin build Kylin 4 0 2 Build 20191024 一 下载二进制包 xff0c 并安装所需软件 root 64 idiom kyli
  • 使用shell脚本一键部署LNMP架构

    span class token comment bin bash span span class token comment 将需要的安装包传到 opt目录下 xff0c 并关闭防火墙 span systemctl stop firewa
  • Nginx优化与防盗链

    目录 一 隐藏版本号二 修改用户与组三 缓存时间四 日志分割五 连接超时六 更改进程数七 配置网页压缩八 配置防盗链九 fpm参数优化 一 隐藏版本号 可以使用Fiddler工具抓取数据包 xff0c 查看Nginx版本 也可以在Cento
  • MySQL索引、事务与存储引擎

    目录 一 MySQL索引1 索引的概念2 索引的作用3 创建索引的原则依据4 索引的分类和创建4 1 61 61 普通索引 61 61 4 2 61 61 唯一索引 61 61 4 3 61 61 主键索引 61 61 4 4 61 61
  • openstack基础知识

    目录 一 云计算1 什么是云计算2 云计算的特色3 云计算的三种使用方式1 xff09 公有云2 xff09 私有云3 xff09 混合云 4 云计算服务模型1 xff09 IaaS 基础架构即服务 2 xff09 PaaS xff08 平
  • openstack-keystone

    目录 一 keystone身份服务二 keystone的主要功能三 keystone相关概念四 keystone认证流程五 OpenStack Keystone组件部署步骤部署步骤 一 keystone身份服务 keystone xff08
  • k8s-----------YAML&harbor

    目录 概述使用YAML文件创建资源1 查看资源版本的标签2 创建yaml文件测试 Pod1 特点2 pod容器分类3 镜像拉取策略 部署harbor1 登录harbor私有仓库2 下载Tomcat镜像进行推送3 推送 概述 Kubernet

随机推荐

  • k8s-----------高级pod&调度

    目录 pod进阶pod重启策略 健康检查 探针调度约束调度方式 故障排除 pod进阶 limits cup cpu上限limits memory 内存上限requests cpu 创建时分配的基本CPU资源requests memory 创
  • k8s-----------控制器

    目录 Deployment 部署无状态应用 Pod与控制器之间的关系 SatefulSet xff08 部署有状态应用 xff09 无状态和有状态无状态有状态 常规service和无头服务区别DaemonSetjobCronJob 控制器
  • 安装electron时安装失败解决

    错误描述 xff1a 在安装 electron 的时候 xff0c 使用官方推荐的如下命令 xff1a npm install save dev electron 结果报错如下 npm ERR code 1 npm ERR path D A
  • 10:天干地支

    10 天干地支 时间限制 1 S 内存限制 8192 KB Accept 15 Submit 41 提交 讨论版 描述 天干地支 xff0c 源自中国远古时代对天象的观测 甲 乙 丙 丁 戊 己 庚 辛 壬 癸 称为十天干 xff0c 子
  • txt格式vscode转码

    txt打开异常 xff0c 或乱码 右下角有格式类型 xff1a utf 8 xff0c 点击它会有一个 select action 弹框 可选择特定格式重新打开 xff0c 或保存 选择好对应的格式 乱码解决 或者点击 save with
  • 送给 Java 程序员的 Spring 学习指南

    https www infoq cn article Ad 8ghcGGCNU572U6oEX 学习 Spring 的基础要求 Spring 官网首页是这么介绍自己的 Spring the source for modern Java xf
  • Centos下如果是二进制文件,编辑是文本,后缀是sh也无法执行

    这次部署redis遇到个问题 xff0c 执行sh文件来启动redis xff0c 结果报配置文件无法打开 用vi打开sh文件反复检查过路径是对的 然后手敲路径执行 xff0c 运行正常 xff1b 直接执行sh文件不行 xff1a 反复修
  • Python Pandas 查看数据信息 DataFrame.info()

    在进行数据分析之前 xff0c 需要先查看数据的信息 xff0c 这样才方便后续的数据处理 比如 xff0c 在excel表中20220520是一个常规类型的数据 xff0c 那它导入到DataFrame中是int类型还是str类型呢 xf
  • 03-1.MariaDB安装配置详细步骤

    Author xff1a Sickey Date xff1a 2021 11 25 安装前准备 配置静态IP防火墙等等 1 安装mariadb数据库 先查看RDO中MariaDB的版本及配置 etc my cnf d server cnf
  • 24行代码简单实现qq空间自动点赞

    什么是Auto js xff1f Auto js是基于JavaScript语言运行在Android平台上的工具 它依赖于无障碍服务 它可以做什么 xff1f 解放双手 xff0c 让手机自动打游戏 自动签到 自动领红包等等等等 它有什么优点
  • 操作系统(C++)——生产者消费者模型

    一 C 43 43 实现代码 span class token macro property span class token directive hash span span class token directive keyword i
  • 2023.03.14java内置的线程池 -》 ScheduledExecutorService 具备延迟运行的能力!的使用

    区别 ScheduledExecutorService xff0c 执行任务方法为schedule xff0c 而不是submit es schedule new MyRunnable i 2 TimeUnit SECONDS 指定线程了线
  • sudo rm -rf /* 命令运行演示(管理员身份删除根目录所有文件)

    一 前言 闲来无事 xff0c 好奇传说中的 sudo rm rf 命令究竟有什么样的魅力让无数人趋之若鹜 xff0c 本着奉献精神 xff0c 作者将在自己的服务器上测试一番 xff0c 各位读者切勿轻易尝试 不 xff0c 切勿尝试 x
  • 服务器如何安装宝塔面板?

    一 前言 作者是按照宝塔面板官方指引进行下载的 xff0c 中间进行更为清晰明了的图示说明 宝塔官方说明 xff08 或者直接按下面步骤安装 xff09 xff1a 宝塔Linux面板安装教程 2021年8月18日更新 7 7 0正式版 L
  • 计算机网络体系结构(详图)

    本文旨在作学习记录 xff0c 其中 xff0c 图源的信息已标明 图源 xff1a 谢希仁的 计算机网络 图源 xff1a 网络
  • wordpress+宝塔在阿里云服务器上搭建个人博客(如何在服务器上搭建个人博客)

    目录 一 宝塔安装 1 开放端口 2 命令安装 二 环境安装 xff08 LNMP xff09 三 wordpress安装包下载 四 部署站点 四 博客访问 五 结语与参考 一 宝塔安装 宝塔官方安装教程 xff08 或者直接按下面步骤安装
  • docker-compose安装教程(包含docker安装教程)

    本文旨在学习记录 xff0c 该过程是作者经历过大大小小的十来次失败总结而来 xff0c 内容都是各方博客荟萃的结果 xff0c 下述内容都已实践成功 xff0c 若失败 xff0c 只需初始化云盘 xff0c 重新安装即可 xff0c 无
  • Idea中xml文件局部黄色背景颜色去除

    Idea中xml文件局部黄色背景颜色去除 参考 xff1a Idea中更改主题后xml配置文件局部黄色背景颜色去除 版本2020 3 初始打开 xff0c 或更换主题后 xff0c 黑色背景屎黄色 xff0c 白色背景浅黄色或青绿色 Int
  • RabbitMQ安装教程(最新RabbitMQ安装,通用教程)

    目录 一 前言 二 Erlang下载安装 三 RabbitMQ下载安装 三 RabbitMQ Web界面管理 四 参考与结语 一 前言 RabbitMQ是一个开源的遵循 AMQP协议实现的基于 Erlang语言编写 xff0c 即需要先安装
  • RabbitMQ狂神说笔记(RabbitMQ B站狂神说笔记、KuangStudy、学相伴飞哥)

    一 中间件 1 什么是中间件 什么是中间件 我国企业从20世纪80年代开始就逐渐进行信息化建设 xff0c 由于方法和体系的不成熟 xff0c 以及企业业务的市场需求的不断变化 xff0c 一个企业可能同时运行着多个不同的业务系统 xff0