RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,那么也就剩下5种。
但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。
一、基本消息模型
1.导入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<!--和springboot2.0.5对应-->
<version>5.4.1</version>
</dependency>
2.准备连接工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
/**
* 建立与RabbitMQ的连接
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
//Virtual代表虚拟消息服务器,每个服务器相对独立
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}
3.创建消息发送者(生产者)
import com.ty.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class SenTest {
//队列的名字
private static final String QUEUE_HELLO = "queue_hello";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = null;
Channel channel = null;
try{
//1.创建链接对象
connection = ConnectionUtil.getConnection();
//2.创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
channel = connection.createChannel();
/**
*3. 声明队列,如果Rabbit中没有此队列将自动创建
* param1:队列名称
* param2:是否持久化
* param3:队列是否独占此连接
* param4:队列不再使用时是否自动删除此队列
* param5:队列参数
*/
channel.queueDeclare(QUEUE_HELLO, true, false, false, null);
/**
* 4.消息发布方法
* param1:Exchange的名称,如果没有指定,则使用Default Exchange
* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
* param3:消息包含的属性
* param4:消息体
* 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显
示绑定或解除绑定
* 默认的交换机,routingKey等于队列名称
*/
channel.basicPublish("", QUEUE_HELLO ,null ,"我是一个hello消息".getBytes());
System.out.println("已经发送消息:我是一个hello消息");
} catch (Exception e) {
e.printStackTrace();
}finally {
if(channel != null){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if(connection != null){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
启动后管理工具查看 http://localhost:15672/
点击队列名称,进入详情页,可以查看消息
4.创建消息接受者(消费者)
import com.rabbitmq.client.*;
import com.ty.util.ConnectionUtil;
import java.io.IOException;
public class RevTest {
//队列的名字
private static final String QUEUE_HELLO = "queue_hello";
public static void main(String[] args) {
try {
//1.创建链接
Connection connection = ConnectionUtil.getConnection();
//2.创建通道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE_HELLO, true, false, false, null);
//4.定义消费方法
Consumer consumer = new DefaultConsumer(channel){
/**
* 消费者接收消息调用此方法
* @param consumerTag 消费者的标签,在channel.basicConsume()去指定
* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)throws IOException {
//交换机
String exchange = envelope.getExchange();
System.out.println("exchange:"+exchange);
//路由key
String routingKey = envelope.getRoutingKey();
System.out.println("routingKey:"+routingKey);
//消息id
long deliveryTag = envelope.getDeliveryTag();
System.out.println("deliveryTag:"+deliveryTag);
//消息内容
String msg = new String(body, "utf8");
System.out.println("收到消息:" + msg);
}
};
/**
* 5.接受消息
* 监听队列String queue, boolean autoAck,Consumer callback
* 参数明细
* 1、队列名称
* 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
为false则需要手动回复
* 3、消费消息的方法,消费者接收到消息后调用此方法
*/
channel.basicConsume(QUEUE_HELLO, true,consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
这个时候队列里面的消息没有了
消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立即打印
5.消息确认机制(ACK)
通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。
那么问题来了:RabbitMQ怎么知道消息被接收了呢?
如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!
因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收,不过这种回执ACK分两种情况:
- 自动ACK:消息一旦被接收,消费者自动发送ACK。
- 手动ACK:消息接收后,不会发送ACK,需要手动调用。
这需根据消息的重要性来选:
- 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
- 如果消息非常重要,不容丢失,那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
自动确认存在问题
修改消费者,添加异常,如下:
生产者不做任何修改,直接运行,消息发送成功
运行消费者,程序抛出异常,但是消息依然被消费
手动确认实现
二、Work queues
work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。
**应用场景:**对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
测试:
- 使用入门程序,启动多个消费者。
- 生产者发送多个消息。
结果:
- 一条消息只会被一个消费者接收。
- rabbit采用轮询的方式将消息是平均发送给消费者的,消费者在处理完某条消息后,才会收到下一条消息。
工作队列,又称任务队列,主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多工人时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。
这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。
接下来我们来模拟这个流程:
- P:生产者:任务的发布者
- C1:消费者,领取任务并且完成任务,假设完成速度较快
- C2:消费者2:领取任务并完成任务,假设完成速度慢
1.创建消息发送者
同helloword一样,不过多发几条消息。
2.创建多个消息接受者
同 helloword一样,不过消费者一个能力强一个差(RevTest1跟之前一样,RevTest2模拟完成耗时多加一个让线程休眠)
3.测试
两个消费者同时启动,然后在启动生产者发送50条消息:
可以发现,两个消费者各自消费了25条消息,而且各不相同,这就实现了任务的分发。
4.设置能者多劳
刚刚的模拟中,消费者1比消费者2的效率要高,消费者2一次任务的耗时较长,然而两人最终消费的消息数量是一样的 消费者1大量时间处于空闲状态,消费者2一直忙碌。现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
我们可以使用basicQos方法和prefetchCount = 1设置。 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。 相反,它会将其分派给不是仍然忙碌的下一个工作人员。
三、订阅模型分类
在之前的模式中,我们创建了一个工作队列。 工作队列背后的假设是:每个任务只被传递给一个工作人员。 在这一部分,我们将做一些完全不同的事情 - 我们将会传递一个信息给多个消费者。 这种模式被称为“发布/订阅”。
1.订阅模型示意图
- 1个生产者,多个消费者
- 每一个消费者都有自己的一个队列
- 生产者没有将消息直接发送到队列,而是发送到了交换机
- 每个队列都要绑定到交换机
- 生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的
2.Exchange分类:
X(Exchanges)交换机一方面接收生产者发送的消息,另一方面知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Fanout:广播
将消息交给所有绑定到交换机的队列。
- Direct:定向
把消息交给符合指定routing key 的队列 一堆或一个。
- Topic:通配符
把消息交给符合routing pattern(路由模式)的队列 一堆或者一个。
四、订阅模型-广播模式FANOUT
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失。
发送者需要声明交换机 ,不需要声明队列,发送消息的时候需要指定交换机,不需要指定routingkey接受者需要声明队列 ,需要给队列绑定交换机 ,接受者的交换机和消息发送者的交换机要一致。多个消息接受者,声明的队列的名字需要不一样,而交换机的名字需要一样。
在广播模式下,消息发送流程:
- 可以有多个消费者。
- 每个消费者有自己的queue(队列)。
- 每个队列都要绑定到Exchange(交换机)。
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
- 交换机把消息发送给绑定过的所有队列。
- 队列的消费者都能拿到消息,实现一条消息被多个消费者消费。
1.创建消息发送者
两个变化:
- 声明Exchange,不再声明Queue
- 发送消息到Exchange,不再发送到Queue
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.ty.util.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class SenTest {
//交换机的名字
private static final String EXCHANGE = "FANOUT_EXCHANGE";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = null;
Channel channel = null;
try{
//1.创建链接对象
connection = ConnectionUtil.getConnection();
//2.创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
channel = connection.createChannel();
//3.声明交换机,指定类型为fanout
channel.exchangeDeclare(EXCHANGE, "fanout",true);
//4.消息发布
String message = "广播模式FANOUT" ;
channel.basicPublish(EXCHANGE ,"" , MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("已经发送消息:" + message);
} catch (Exception e) {
e.printStackTrace();
}finally {
if(channel != null){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if(connection != null){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
2.创建消息接受者
消费1与消费者2的区别在于声明的队列的名字需要不一样,但是交换机的名字需要一样。
import com.rabbitmq.client.*;
import com.ty.util.ConnectionUtil;
import java.io.IOException;
public class RevTest1 {
//队列的名字
private static final String QUEUE = "FANOUT_QUEUE_1";
//交换机的名字
private static final String EXCHANGE = "FANOUT_EXCHANGE";
public static void main(String[] args) {
try {
//1.创建链接
Connection connection = ConnectionUtil.getConnection();
//2.创建通道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE, true, false, false, null);
//4.绑定队列到交换机
channel.queueBind(QUEUE, EXCHANGE, "");
//5.定义消费方法
Consumer consumer = new DefaultConsumer(channel){
//获取消息并且处理,这个方法类似事件监听,如果有消息的时候会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)throws IOException {
//消息内容
String msg = new String(body, "utf8");
System.out.println("消费者1:" + msg);
}
};
//6.监听队列,自动返回完成
channel.basicConsume(QUEUE, true,consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.测试
同时运行两个消费者,然后发送1条消息:
五、订阅模型-定向模式Direct
direct定向模式是交换机根据product指定的routingkey,收到消息后去匹配指定routingkey的队列,将消息发送给队列,消费者进行消费,是有选择性的接收消息。
-
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
-
X:Exchange(交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列。
-
C1:消费者,其所在队列指定了需要routing key 为 error 的消息。
-
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息。
1.创建消息发送者
同fanout模式一样,不过要改一下模型、指定routingkey。
2.创建消息接受者
消费者1和消费者2同fanout模式一样,不过队列名要不一样交换机名字相同,绑定队列到交换机时要指定routingkey。
消费者1:
消费者2:
3.测试
我们分别发送update、delete的RoutingKey,发现结果:
六、订阅模型-通配符模式Topics
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列,只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符。
Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: ty.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
audit.#:能够匹配audit.irs.corporate 或者 audit.irs
audit.*:只能匹配audit.irs
1.创建消息发送者
同fanout模式一样,不过要改一下模型、指定routingkey。
2.创建消息接受者
消费者1和消费者2同fanout模式一样,不过队列名要不一样交换机名字相同,绑定队列到交换机时要指定routingkey。
消费者1:
消费者2:
3.测试
我们分别发送ty.update、ty.insert的RoutingKey,发现结果:
七、Header模式
header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配 队列。
举例:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,
设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。
1.创建消息发送者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.ty.util.ConnectionUtil;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class SenTest {
//交换机的名字
private static final String EXCHANGE = "HEADERS_EXCHANGE";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = null;
Channel channel = null;
try{
//1.创建链接对象
connection = ConnectionUtil.getConnection();
//2.创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
channel = connection.createChannel();
//3.声明交换机,指定类型为headers
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
//4.发送消息
String message = "发送了一个email消息" ;
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("inform_type_email", "email");//匹配email通知消费者绑定的header
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
properties.headers(headers);
//5.消息发布
channel.basicPublish(EXCHANGE ,"" , properties.build(), message.getBytes());
System.out.println("已经发送消息:" + message);
} catch (Exception e) {
e.printStackTrace();
}finally {
if(channel != null){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if(connection != null){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
2.创建消息接受者
消费者1:
import com.rabbitmq.client.*;
import com.ty.util.ConnectionUtil;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
public class RevTest1 {
//队列的名字
private static final String QUEUE = "HEADERS_QUEUE_1";
//交换机的名字
private static final String EXCHANGE = "HEADERS_EXCHANGE";
public static void main(String[] args) {
try {
//1.创建链接
Connection connection = ConnectionUtil.getConnection();
//2.创建通道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE, true, false, false, null);
Map<String, Object> headersEmail = new Hashtable<String, Object>();
headersEmail.put("inform_type_email", "email");
//4.绑定队列到交换机
channel.queueBind(QUEUE, EXCHANGE, "",headersEmail);
//5.定义消费方法
Consumer consumer = new DefaultConsumer(channel){
//获取消息并且处理,这个方法类似事件监听,如果有消息的时候会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)throws IOException {
//消息内容
String msg = new String(body, "utf8");
System.out.println("消费者1:" + msg);
}
};
//6.监听队列,自动返回完成
channel.basicConsume(QUEUE, true,consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者2:
改一下队列名称和键值对。
3.测试
八、订阅模型-广播模式FANOUT
RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
- 客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
- 服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。
- 服务端将RPC方法 的结果发送到RPC响应队列。
- 客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。
九、如何避免消息丢失
1.手动签收
消费者的ACK机制,可以防止消费者丢失消息。但是如果在消费者消费之前,MQ就宕机了消息就没了。
2.持久化
持久化可以防止消费者丢失消息。
1)交换机持久化
2)队列持久化
3)消息持久化