RabbitMQ简介、概念、安装、启动、工作模式
1、RabbitMQ简介
- RabbitMQ是支持多种消息协议,易于部署和使用的开源消息代理服务器,用于在分布式系统中存储转发消息
由以高性能、健壮以及可伸缩性出名的Erlang语言编写;提供了成熟的高并发,高可用的解决方案
可以根据实际业务情况动态地扩展集群节点在集群中的机器上设置镜像,使得在部分节点出现问题的情况下仍然可用 - 支持多种客户端语言,如:Python、Ruby、.NET、Java等,支持AJAX
- RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
2、Message Queue : MQ
- 跨进程的通信机制,用来在系统之间进行传递
- MQ作为中间件,可以进行系统间异步请求和响应,从而减少响应数据及解耦
- MQ典型应用场景:
- 异步处理。把消息放入消息中间件中,等到需要的时候再去处理。
- 流量削峰。例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。
- 日志处理
- 应用解耦。假设某个服务A需要给许多个服务(B、C、D)发送消息,当某个服务(例如B)不需要发送消息了,服务A需要改代码再次部署;当新加入一个服务(服务E)需要服务A的消息的时候,也需要改代码重新部署;另外服务A也要考虑其他服务挂掉,没有收到消息怎么办?要不要重新发送呢?是不是很麻烦,使用MQ发布订阅模式,服务A只生产消息发送到MQ,B、C、D从MQ中读取消息,需要A的消息就订阅,不需要了就取消订阅,服务A不再操心其他的事情,使用这种方式可以降低服务或者系统之间的耦合。
3、MQ安装
- Window环境下安装与使用RabbitMQ
- 安装Erlang程序运行环境
- 下载地址http://www.erlang.org/downloads
- 安装RabbitMQ服务器
- 下载地址http://www.rabbitmq.com/
- 启动RabbitMQ服务
- 激活RabbitMQ管理控制台
- cd sbin
- rabbitmq-plugins.bat enable rabbitmq_management
- 通过浏览器进行访问
- 用户名:guest 密码:guest
- linux安装
- 下载Erlang运行环境RPM包
- https://www.erlang-solutions.com/resources/download.html
esl-erlang_21.2.6-1_centos_7_amd64.rpm - 下载RabbitMQ服务器安装包
http://www.rabbitmq.com/install-rpm.html#downloads
rabbitmq-server-3.7.13-1.el7.noarch.rpm - 安装rpm包
rpm -ivh --nodeps esl-erlang_21.2.6-1_centos_7_amd64.rpm
rpm -ivh --nodeps rabbitmq-server-3.7.13-1.el7.noarch.rpm - 启用服务:前台启动
rabbitmq-server - 启用控制台
- 通过后台管理插件我们可以动态监控mq的流量,创建用户,队列等。
- rabbitmq-plugins enable rabbitmq_management
4、启动、停止服务
- rabbitmq-server 前台启动
- rabbitmq-server -detached 后台启动服务
- rabbitmqctl stop 停止服务
5、登录网页管理界面
rabbitmq的网页管理的端口是15672,如果你是远程操作服务器,输入http://ip:15672,发现连接不上,因为服务器防火墙不允许这个端口远程访问;
放行防火墙端口
# 将mq的tcp监听端口和网页管理端口都设置成允许远程访问
firewall-cmd --permanent --add-port=15672/tcp
firewall-cmd --permanent --add-port=5672/tcp
systemctl restart firewalld.service
管理界面介绍
# 输入用户名密码登录后进入主界面
Overview:用来显示流量,端口,节点等信息,以及修改配置文件;
Connections:显示所有的TCP连接;
channels:显示所有的信道连接;
exchanges:显示所有的交换机以及创建删除等;
queues:显示所有的队列以及创建删除等;
admins:显示所有的用户以及用户管理;
6、概念
- 生产者: 用来向队列(或交换机)发送消息
- 消费者: 处理(消费)队列中的消息
- 消息: 队列中进行存储的数据
- 队列: 存放消息的容器,安装先进先出的方式进行存储
- 虚拟主机:用来存储消息队列,类似MySQL中的数据库。
7、常用命令
-
创建虚拟主机
- rabbitmqctl add_vhost /名称
- rabbitmqctl delete_vhost [vhost_name]
-
rabbitmq有一个默认的用户名和密码,guest和guest,但为了安全考虑,该用户名和密码只允许本地访问,如果是远程操作的话,需要创建新的用户名和密码;
- 创建用户
- rabbitmqctl add_user 用户名 密码
- rabbitmqctl add_user {username} {password}
- 删除
- rabbitmqctl delete_user {username}
- 修改密码
- rabbitmqctl change_password {username} {newpassword}
-
设置标签(用户角色)
-
rabbitmqctl set_user_tags 用户名 角色
-
用户角色说明
-
management:用户可以访问管理插件
policymaker:用户可以访问管理插件,并管理他们有权访问的vhost的策略和参数。
monitoring:用户可以访问管理插件,查看所有连接和通道以及与节点相关的信息。
administrator:用户可以做任何监视可以做的事情,管理用户,vhost和权限,关闭其他用户的连接,并管理所有vhost的政策和参数。
-
设置权限:设置用户可以访问哪些vhost;配置权限、读权限、写权限
rabbitmqctl set_permissions -p /虚拟主机名称 用户名 '.*' '.*' '.*'
8、Java客户端访问RabbitMQ
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.8.113");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/myvhost");
Connection con = factory.newConnection();
Channel channel = con.createChannel();
channel.queueDeclare("simple_queue",true,false,false,null);
channel.basicPublish("","simple_queue",null,"这是一条message".getBytes());
channel.close();
con.close();
System.out.println("消息发送成功!");
}
public static void main(String[] args) throws IOException, TimeoutException {
Connection con = RabbitMQUtil.getConnection();
final Channel channel = con.createChannel();
channel.queueDeclare("simple_queue", true, false, false, null);
channel.basicConsume("simple_queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("消费者处理的消息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
9、消息的状态
- ready: 表示消息待消费
- unacked: 表示消息被消费者认领了,但是没有确认签收。此时一旦消费者断开连接,消息就回到ready状态
- finished:表示已经消费完成,消息会被从队列中移除。
10、RabbitMQ的工作模式
总共是6种工作模式:
- 简单模式
- work工作模式
- 一个生产者、多个消费者;每个消费者获取的消息是唯一的。并且在集群架构下可以最大发挥每台服务器的性能。
- 发布订阅模式:
- 一个生产者,多个消费者;并且每个消费者获取的数据是一致的;该模式下交换机的类型是fanout扇形交换机。发布订阅模式中,生产者不在和对列打交道,直接将消息发送给交换机,交换机负责将消息转发给对列。
public class Ticket {
public static void main(String[] args) throws IOException, TimeoutException {
Connection con = RabbitMqUtil.getConnectin();
Channel channel = con.createChannel();
channel.exchangeDeclare("pubsub_exchange", BuiltinExchangeType.FANOUT);
for (int i = 0; i < 10; i++) {
channel.basicPublish("pubsub_exchange","",null,("票务消息"+i).getBytes());
}
channel.close();
con.close();
System.out.println("发送成功");
}
}
public class Xc {
public static void main(String[] args) throws IOException, TimeoutException {
Connection con = RabbitMqUtil.getConnectin();
final Channel channel = con.createChannel();
channel.queueDeclare("xc_queue",true,false,false,null);
channel.queueBind("xc_queue","pubsub_exchange","");
channel.basicConsume("xc_queue",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("消息: "+ new String(body));
}
});
}
}
public class Qn {
public static void main(String[] args) throws IOException, TimeoutException {
Connection con = RabbitMqUtil.getConnectin();
final Channel channel = con.createChannel();
channel.queueDeclare("qn_queue",true,false,false,null);
channel.queueBind("qn_queue","pubsub_exchange","");
channel.basicConsume("qn_queue",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("消息: "+ new String(body));
}
});
}
}
- 路由模式:
- 在发布订阅模式基础之上,路由模式提供了条件筛选的功能,将符合路由key条件的数据,转发给对应的队列。direct直接交换机。
public class Ticket {
public static void main(String[] args) throws IOException, TimeoutException {
Connection con = RabbitMqUtil.getConnectin();
Channel channel = con.createChannel();
channel.exchangeDeclare("route_exchange", BuiltinExchangeType.DIRECT);
Map<String,String> map = new HashMap();
map.put("schk20201001","成都-上海");
map.put("schk20201002","成都-北京");
map.put("xmhk20201001","厦门-上海");
map.put("xmhk20201002","厦门-北京");
for (String key : map.keySet()){
channel.basicPublish("route_exchange",key,null,map.get(key).getBytes());
}
channel.close();
con.close();
System.out.println("发送成功");
}
}
public class Xc {
public static void main(String[] args) throws IOException, TimeoutException {
Connection con = RabbitMqUtil.getConnectin();
Channel channel = con.createChannel();
channel.queueDeclare("xc_queue",true,false,false, null);
channel.queueBind("xc_queue","route_exchange","schk20201001");
channel.queueBind("xc_queue","route_exchange","schk20201002");
channel.basicConsume("xc_queue",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("票务信息:" + new String(body) );
}
});
}
public class Qn {
public static void main(String[] args) throws IOException, TimeoutException {
Connection con = RabbitMqUtil.getConnectin();
Channel channel = con.createChannel();
channel.queueDeclare("qn_queue",true,false,false, null);
channel.queueBind("qn_queue","route_exchange","xmhk20201001");
channel.queueBind("qn_queue","route_exchange","xmhk20201002");
channel.basicConsume("qn_queue",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("票务信息:" + new String(body) );
}
});
}
}
- 主题模式:
- Rabbitmq提供了相关的表达式来进行模糊匹配;* 表示一个关键字,#多个关键字
public class Ticket {
public static void main(String[] args) throws IOException, TimeoutException {
Connection con = RabbitMqUtil.getConnectin();
Channel channel = con.createChannel();
channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC);
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("接收到的消息id:" + deliveryTag);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("拒接的消息id:" + deliveryTag);
}
});
channel.addReturnListener(new ReturnCallback() {
@Override
public void handle(Return rm) {
System.out.println("-----------------------------------------");
System.out.println("交换机:"+rm.getExchange());
System.out.println("消息的状态码:" + rm.getReplyCode());
System.out.println("消息描述:"+rm.getReplyText());
System.out.println("路由key:"+rm.getRoutingKey());
System.out.println("----------------------------------------");
}
});
Map<String,String> map = new HashMap();
map.put("schk.20201001","成都-上海");
map.put("schk.20201002","成都-北京");
map.put("xmhk.20201001","厦门-上海");
map.put("xmhk.20201002","厦门-北京");
map.put("gjhk.20201001","北京-东京");
map.put("gjhk.20201002","北京-美国");
for (String key : map.keySet()){
channel.basicPublish("topic_exchange",key,true,null,map.get(key).getBytes());
}
System.out.println("发送成功");
}
}
public class Qn {
public static void main(String[] args) throws IOException, TimeoutException {
Connection con = RabbitMqUtil.getConnectin();
Channel channel = con.createChannel();
channel.queueDeclare("qn_queue",true,false,false,null);
channel.queueBind("qn_queue","topic_exchange","xmhk.#");
channel.basicConsume("qn_queue",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("票务信息: "+ new String(body));
}
});
}
}
public class Xc {
public static void main(String[] args) throws IOException, TimeoutException {
Connection con = RabbitMqUtil.getConnectin();
Channel channel = con.createChannel();
channel.queueDeclare("xc_queue",true,false,false,null);
channel.queueBind("xc_queue","topic_exchange","schk.#");
channel.basicConsume("xc_queue",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("票务信息: "+ new String(body));
}
});
}
}
11、RabbitMQ的消息确认机制
- 消息确认涉及两种状态:
- Confirm代表生产者将消息送达了Broker时产生的状态,后续会出现两种情况:
- ack代表Broker已经将数据接收
- nack代表Broker未接收到消息
- Return代表消息被正常接收,但Broker没有对应的队列进行投递,消息被退回给生产者的状态
- 上述状态表示生产者与Broker之间的消息投递情况,与消费者是否接收/确认消息无关。
- 代码实现
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println("MQ已经接收到消息,消息ID:"+l);
}
@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println("MQ拒收消息,消息ID:"+l);
}
});
channel.addReturnListener(new ReturnCallback() {
@Override
public void handle(Return r) {
System.out.println("-----------------------------------------------------");
System.out.println("返回的消息状态码:"+r.getReplyCode());
System.out.println("消息描述:"+r.getReplyText());
System.out.println("交换机"+r.getExchange());
System.out.println("路由key"+r.getRoutingKey());
System.out.println("-----------------------------------------------------");
}
});
channel.basicPublish("topic_exchange",key,true,null,res.get(key).getBytes());
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)