RabbitMQ使用详解

2023-11-18

文章目录

RabbitMQ

一. 简介

​ RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控制、负载均衡等特性,使得RabbitMQ拥有更加广泛的应用场景。RabbitMQ跟Erlang和AMQP有关。下面简单介绍一下Erlang和AMQP。

​ Erlang是一门动态类型的函数式编程语言,它也是一门解释型语言,由Erlang虚拟机解释执行。从语言模型上说,Erlang是基于Actor模型的实现。在Actor模型里面,万物皆Actor,每个Actor都封装着内部状态,Actor相互之间只能通过消息传递这一种方式来进行通信。对应到Erlang里,每个Actor对应着一个Erlang进程,进程之间通过消息传递进行通信。相比共享内存,进程间通过消息传递来通信带来的直接好处就是消除了直接的锁开销(不考虑Erlang虚拟机底层实现中的锁应用)。

​ AMQP(Advanced Message Queue Protocol)定义了一种消息系统规范。这个规范描述了在一个分布式的系统中各个子系统如何通过消息交互。而RabbitMQ则是AMQP的一种基于erlang的实现。AMQP将分布式系统中各个子系统隔离开来,子系统之间不再有依赖。子系统仅依赖于消息。子系统不关心消息的发送者,也不关心消息的接受者。

二. rabbitmq基本原理

​ RabbitMQ是消息队列的一种实现,那么一个消息队列到底需要什么?答案是队列,即Queue,那么接下来所有名词都是围绕这个Queue来拓展的。

​ 就RabbimtMQ而言,Queue是其中的一个逻辑上的实现,我们需要连接到RabbitMQ来操作队列进而实现业务功能,所以就会有Connection,我们发一条消息连接一次,这样很显然是浪费资源的,建立连接的过程也很耗时,所以我们就会做一个东西让他来管理连接,当我用的时候,直接从里边拿出来已经建立好的连接发信息,那么ConnectionFactory应运而生。

​ 接下来,当程序开发时,可能不止用到一个队列,可能有订单的队列、消息的队列、任务的队列等等,那么就需要给不同的queue发信息,那么和每一个队列连接的这个概念,就叫Channel

​ 再往下来,当我们开发的时候还有时候会用到这样一种功能,就是当我发送一条消息,需要让几个queue都收到,那么怎么解决这个问题呢,难道我要给每一个queue发送一次消息?那岂不是浪费带宽又浪费资源,我们能想到什么办法呢,当然是我们发送给RabbitMQ服务器一次,然后让RabbitMQ服务器自己解析需要给哪个Queue发,那么Exchange就是干这件事的
但是我们给Exchange发消息,他怎么知道给哪个Queue发呢?这里就用到了RoutingKey和BindingKey
BindingKey是Exchange和Queue绑定的规则描述,这个描述用来解析当Exchange接收到消息时,Exchange接收到的消息会带有RoutingKey这个字段,Exchange就是根据这个RoutingKey和当前Exchange所有绑定的BindingKey做匹配,如果满足要求,就往BindingKey所绑定的Queue发送消息,这样我们就解决了我们向RabbitMQ发送一次消息,可以分发到不同的Queue的过程

至此,我们就把所有的名词贯通咯,接下来做个概要描述:

  • ConnectionFactory:与RabbitMQ服务器连接的管理器
  • Connection:与RabbitMQ服务器的TCP连接
  • Channel:与Exchange的连接,一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,为了多路复用。RabbitMQ建议客户端线程之间不要共用Channel,但是建议尽量共用Connection。
  • Exchange:接受消息生产者的消息,并根据消息的RoutingKey和 Exchange绑定的BindingKey,以及Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。
  • Message Queue:消息队列,用于存储还未被消费者消费的消息。
  • Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。
  • RoutingKey:由Producer发送Message时指定,指定当前消息被谁接受
  • BindingKey:由Consumer在Binding Exchange与Message Queue时指定,指定当前Exchange下,什么样的RoutingKey会被下派到当前绑定的Queue中
  • Binding:联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。
  • Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程。
  • Virtual Host:其实是一个虚拟概念,类似于权限控制组,可以通过命令分配给用户Virtual Host的权限,默认的guest用户是管理员权限,初始空间有/,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host
    如下图:

三. 安装

3.1安装erlang环境

rabbitmq是使用erlang语言开发的,所以必须要先安装erlang环境。erlang的软件环境下载地址为:https://www.erlang.org/downloads,安装完毕之后需要配置环境变量:

ERLANG_HOME=G:\erlang\erl-23.0

在这里插入图片描述

3.2 安装rabbitmq

本教程采用解压的方式进行安装。rabbitmq的下载地址为:https://www.rabbitmq.com/install-windows.html

在这里插入图片描述

A. 配置环境变量

1.RABBITMQ_SERVER=G:\rabbitmq\rabbitmq_server-3.8.3
2.在path中加入:%RABBITMQ_SERVER%\sbin

B.安装可视化工具和服务

下载完毕之后进行解压,进入到家目录下的sbin目录下,执行如下命令,安装rabbitmq的web可视化工具:

rabbitmq-plugins.bat enable rabbitmq_management

安装rabbitmq的服务:rabbitmq-service.bat install

启动rabbitmq的服务:rabbitmq-server.bat start

在浏览器输入:http://localhost:15672/,访问rabbitmq的控制台,用户名和密码均为 guest

在这里插入图片描述

3.3 使用docker安装
docker run -itd --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

四. RabbitMQ程序的编写

4.1 rabbitMQ支持的消息模型

在这里插入图片描述

在这里插入图片描述

4.2 引入依赖
<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
</dependency>
4.3 简单模型(直连)

在这里插入图片描述

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
4.3.1 生成者
public class Producer {
    public static void main(String[] args) throws Exception {
        String host = "localhost";
        int port = 5672;
        String username = "guest";
        String password = "guest";
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);

        Connection conn = factory.newConnection();

        Channel channel = conn.createChannel();

        channel.queueDeclare("test", true, false, false, null);
        
        channel.basicPublish("", "test", null, (i + "hello").getBytes());

        RabbitMQUtils.close(conn, channel);
    }
}
4.3.2 消费者
public class Consumer1 {
    public static void main(String[] args) throws Exception{
        Connection conn = RabbitMQUtils.getConnection();

        Channel channel = conn.createChannel();

        channel.basicConsume("test", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer-one: 收到的消息: " + new String(body));
            }
        });
    }
}
4.4 工作模型(work quene)

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

在这里插入图片描述

4.4.1 消息生产者
for (int i = 0; i < 20; i++) {
	channel.basicPublish("", "test", null, (i + "hello").getBytes());
}
4.4.2 多个消费者

消费者一

public class Consumer1 {
    public static void main(String[] args) throws Exception{
        Connection conn = RabbitMQUtils.getConnection();

        Channel channel = conn.createChannel();

        channel.basicConsume("test", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer-one: 收到的消息: " + new String(body));
            }
        });
    }
}

消费者二

public class Consumer1 {
    public static void main(String[] args) throws Exception{
        Connection conn = RabbitMQUtils.getConnection();

        Channel channel = conn.createChannel();

        channel.basicConsume("test", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    TimeUnit.SECOND.sleep(3);
                }catch(Exception ex){}
                System.out.println("Consumer-one: 收到的消息: " + new String(body));
            }
        });
    }
}
4.4.3 消息自动确认

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.

channel.basicQos(1); // 每次只消费一条消息
channel.queueDeclare("firstQueue", false, false, false, null);

// 消息改为手动确认
channel.basicConsume("firstQueue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                           AMQP.BasicProperties properties, byte[] body) throws IOException {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(new String(body, Charset.defaultCharset()));
        // 手动确认消息已经被消费了, 第一个参数是当前消费的消息的标签(递增的整数)
        // 第二个参数是否确认多条消息,包括之前消费的消息
        channel.basicAck(envelope.getDeliveryTag(), false);
	}
});
4.5 发布订阅模型(fanout)

在这里插入图片描述

在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
4.5.1 消息生成者
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();

// 声明一个交换机, 交换机的类型为 fanout
channel.exchangeDeclare("multiple", BuiltinExchangeType.FANOUT);

channel.basicPublish("multiple", "", null, "fanout-message".getBytes());

RabbitMQUtils.close(channel, connection);
4.5.2 消息消费者

使用如下代码创建多个消息的消费者

Connection connection = RabbitMQUtils.getConnection();

Channel channel = connection.createChannel();

//String qName = channel.queueDeclare("consumer-2", false, false, false, null).getQueue();
String qName = channel.queueDeclare().getQueue(); //创建临时队列

channel.queueBind(qName, "multiple", "");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, 
                           AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});
4.6 直连模型(direct)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

在这里插入图片描述

4.6.1 消息生产者
// 声明一个交换机, 交换机的类型为 direct
channel.exchangeDeclare("direct-module", BuiltinExchangeType.DIRECT);

channel.basicPublish("direct-module", "success", null, "成功信息".getBytes());
channel.basicPublish("direct-module", "error", null, "错误信息".getBytes());
4.6.2 消息消费者

消费者一

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, "direct-module", "success");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

消费者二

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, "direct-module", "success");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

4.7 主题模式(topic)

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

在这里插入图片描述

# 统配符
		* (star) can substitute for exactly one word.    匹配不多不少恰好1个词
		# (hash) can substitute for zero or more words.  匹配一个或多个词
# 如:
		audit.#    匹配audit.irs.corporate或者audit.irs 等
        audit.*   只能匹配 audit.irs

4.7.1 消息生产者

// 声明一个交换机, 交换机的类型为 fanout
channel.exchangeDeclare("topic-module", BuiltinExchangeType.TOPIC);

channel.basicPublish("topic-module", "company.java", null, "通知信息".getBytes());
//channel.basicPublish("topic-module", "error", null, "错误信息".getBytes());

RabbitMQUtils.close(channel, connection);

4.7.2 消息消费者

消费者一

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, "topic-module", "company.#");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                  AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

消费者二

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, "topic-module", "company.java.#");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                  AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

消费者三

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, "topic-module", "company.html.*");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                  AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

结论

company: 可以被消费者一接收到
company.java: 可以被消费者一、消费者二接收到
company.java.manager: 可以被消费者一、消费者二接收到
company.html.teacher: 可以被消费者一、消费者三接收到

五. RabbitMQ与springboot整合

5.1 依赖
<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-amqp</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>fastjson</artifactId>
		<version>1.2.4</version>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
		<exclusions>
			<exclusion>
				<groupId>org.junit.vintage</groupId>
				<artifactId>junit-vintage-engine</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
	</dependency>
</dependencies>
5.2 使用json的序列化
// 消息的消费方json数据的反序列化
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
    			ConnectionFactory connectionFactory){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    return factory;
}

// 定义使用json的方式转换数据
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate amqpTemplate = new RabbitTemplate();
    amqpTemplate.setConnectionFactory(connectionFactory);
    amqpTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return amqpTemplate;
}
5.3 简单模型

消息消费方

@RabbitListener(queuesToDeclare = {@Queue("simpleQueue")})
public void simpleModel(User user) {
	log.info("message: {}", user);
}

消息发送

@SpringBootTest
class SpringbootRabbitmqApplicationTests {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void simpleMessageSend() {
        rabbitTemplate.convertAndSend("simpleQueue", new User(1, "张"));
    }
}
5.4 工作模型

工作模式只需要在简单模式的基础上,添加一个消息的消费方。

5.5 发布订阅模型

消息消费方

// value=@Queue 创建临时队列
// exchange创建交换机
@RabbitListener(bindings = {
	@QueueBinding(value = @Queue,
		exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))
})
public void receiveMessage1(User user) {
	System.out.println(String.format("消费者 【one】: %s", user));
}

@RabbitListener(bindings = {
	@QueueBinding(value = @Queue,
		exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))
})
public void receiveMessage2(User user) {
	System.out.println(String.format("消费者 【two】: %s", user));
}

消息发送方

// fanout模型
@Test
public void fanoutMessageSend() {
	for (int i = 0; i < 5; i++) {
		rabbitTemplate.convertAndSend("fanout-ex", "", new User(i, "张三"));
	}
	try {
		TimeUnit.SECONDS.sleep(5);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}
5.6 直连模式(direct)

消息的消费方

@RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    key = {"error", "success"},
                    exchange = @Exchange(value = "direct-ex", type = ExchangeTypes.DIRECT))
})
public void receiveMessage1(User user) {
        System.out.println(String.format("消费者 【one】: %s", user));
}

@RabbitListener(bindings = {
      @QueueBinding(value = @Queue,
            key = {"error"},
            exchange = @Exchange(value = "direct-ex", type = ExchangeTypes.DIRECT))
})
public void receiveMessage2(User user) {
    System.out.println(String.format("消费者 【two】: %s", user));
}

消息生产者

@Test
public void directMessageSend() {
	//rabbitTemplate.convertAndSend("direct-ex", "success", new User(2, "张三"));
	rabbitTemplate.convertAndSend("direct-ex", "error", new User(2, "张三"));
	try {
		TimeUnit.SECONDS.sleep(10);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}
5.7 topic模型

消息的消费方

@RabbitListener(bindings = {
	@QueueBinding(value = @Queue,
		key = {"company.#"},
		exchange = @Exchange(value = "topic-ex", type = ExchangeTypes.TOPIC))
})
public void receiveMessage1(User user) {
	System.out.println(String.format("消费者 【one】: %s", user));
}

@RabbitListener(bindings = {
	@QueueBinding(value = @Queue,
		key = {"company.java.#"},
		exchange = @Exchange(value = "topic-ex", type = ExchangeTypes.TOPIC))
})
public void receiveMessage2(User user) {
	System.out.println(String.format("消费者 【two】: %s", user));
}

@RabbitListener(bindings = {
	@QueueBinding(value = @Queue,
		key = {"company.html.*"},
		exchange = @Exchange(value = "topic-ex", type = ExchangeTypes.TOPIC))
})
public void receiveMessage3(User user) {
System.out.println(String.format("消费者 【three】: %s", user));
}

消息生产方

@Test
public void topicMessageSend() {
	//rabbitTemplate.convertAndSend("topic-ex", "company", new User(2, "张三"));
	rabbitTemplate.convertAndSend("topic-ex", "company.java", new User(2, "张三"));
	try {
		TimeUnit.SECONDS.sleep(2);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}
5.8 消息的手动确认

配置

spring:
  rabbitmq:
    listener:
      simple:
        # 提交方式为手动
        acknowledge-mode: MANUAL

提交代码

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

说明:当配置了json反序列化(见5.2节),代码中实例化了SimpleRabbitListenerContainerFactory,会默认覆盖application.yml文件中的配置,需要在代码层面手动的设置提交的方式:

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

六. 事务与confirm机制

6.1RabbitMQ事务

RabbitMQ的事务是对AMQP协议的实现,通过设置Channel 的模式来完成,语句为:

channel.txSelect();  //开启事务
// ....本地事务操作
channel.txCommit();  //提交事务
channel.txRollback(); //回滚事务

特别说明:RabbitMQ的事务机制是同步操作,会极大的降低RabbitMQ的性能。

6.2 Confirm机制

由于RabbitMQ的事务性能的问题,于是就又推出了发送方确认模式。

channel.confirmSelect(); //开启发送方确认模式
6.2.1 单条消息确认
channel.waitForConfirms(); //对于单条消息的确认,返回值为true或者false
6.2.2 批量消息确认
try {
	channel.waitForConfirmsOrDie();  //批量消息确认,如果有一条消息没有发送成功,会抛出异常
}catch (Exception ex) {
	ex.printStackTrace();
}
6.2.3 回调方式确认
channel.confirmSelect();
// mandatory 需要开启
channel.basicPublish("", "tx-queue1", true, "text".getBytes());

channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
      System.out.println("成功达到交换机");
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    	System.out.println("没有到达交换机");
    }
});

// 没有到达队列的时候触发
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, 
                            String routingKey, AMQP.BasicProperties properties, 
                             byte[] body) throws IOException {
    		System.out.println("没有到达队列");
    }
});
6.3 springboot对confirm实现

springboot对confirm第三种机制的实现。

6.3.1 配置
spring:
  rabbitmq:
    # 开启信息是否 回调 到交换机的确认方法,即 setConfirmCallback 方法
    publisher-confirm-type: CORRELATED
6.3.2 编码实现
@Bean("availableTemplate")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate rabbitTemplate = new RabbitTemplate();
		rabbitTemplate.setConnectionFactory(connectionFactory);

		// 开启 setReturnCallback 
		rabbitTemplate.setMandatory(true);

		// exchange告诉程序是否以及到达交换机,该方法无论成功都会回调。如果到达ack为true; 否则为false;
		rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
      	System.out.println(correlationData.getId());
				if(ack) {
             System.out.println("成功到达交换机");
        }else {
             System.out.println("没有到达交换机");
        }
		});

		rabbitTemplate.setReturnCallback((message, replyCode, replyText, ex, rk) -> {
				System.out.println("消息没有到达队列");
		});

		return rabbitTemplate;
}
6.3.3 消息的发送
@Test
public void sendMsg2() {
		// 消息属性
		MessageProperties messageProperties = MessagePropertiesBuilder
						.newInstance().setMessageId(UUID.randomUUID().toString())
						.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
						.build();
		// 消息
		Message message = new Message(JSONObject.toJSONBytes(new User(23, "张三")),
                                  messageProperties);

		CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
		rabbitTemplate.convertAndSend("","tx-queue", message, data);

		try {
				TimeUnit.SECONDS.sleep(120);
		} catch (InterruptedException e) {
				e.printStackTrace();
		}
}

七. 死信队列

我们先假设一个场景,当消息被消费方消费过很多次后,依然无法消费,那么就没有尝试的必要了,我们需要将这类信息放到一个特定的队列中,等待人工的接入。死信队列并不是一个特殊的队列,只是一个普通的队列,只是我们把他们取名叫做死信队列。

死信队列的设计是在某个队列的头信息中设定x-dead-letter-exchange (死信交换机)和x-dead-letter-routing-key(死信路由键)即可。关联到一个绑定到某个死信交换机的队列上。然后给该队列指定过期时间或者指定的消息的过期时间,那么该消息到期后会自动到达死信队列中。

7.1 RabbitAdmin对象
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
     RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
}
7.2 创建死信队列与交换机
@Slf4j
@Component
public class DeadQueue {

    // 死信队列名
    private static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
    // 死信交换机
    private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
    // 死信路由键
    private static final String DEAD_LETTER_ROUTING_KEY = "dead_letter_routing_key";

    private RabbitAdmin rabbitAdmin;

    public DeadQueue(RabbitAdmin rabbitAdmin) {
        this.rabbitAdmin = rabbitAdmin;
    }

    @PostConstruct
    public void initDeadQueue() {
        Queue deadQueue = QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
        rabbitAdmin.declareQueue(deadQueue);  //创建死信队列

        Exchange deadExchange = ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE)
                .durable(true).build();
        rabbitAdmin.declareExchange(deadExchange);  //创建死信交换机

        Binding binding = BindingBuilder.bind(deadQueue).to(deadExchange)
                .with(DEAD_LETTER_ROUTING_KEY).noargs();
        rabbitAdmin.declareBinding(binding);  // 将队列绑定到交换机上
        log.info("死信队列:{}, 死信交换机: {}, 已经成功绑定.", DEAD_LETTER_QUEUE, DEAD_LETTER_EXCHANGE);
    }
}
7.3 死信队列的绑定
/**
 * x-dead-letter-exchange: 死信队列交换机
 * x-dead-letter-routing-key: 死信队列路由键
 * x-message-ttl: 消息在队列中最大的存活时间, 如果没有被消费,就会进入到死信队列
 */
@RabbitListener(bindings = @QueueBinding(
  value = @org.springframework.amqp.rabbit.annotation.Queue(
    value = "msg-queue",
    durable = "true",
    arguments = {@Argument(name = "x-dead-letter-exchange", value = DEAD_LETTER_EXCHANGE),
                 @Argument(name = "x-dead-letter-routing-key", value = DEAD_LETTER_ROUTING_KEY),
                 @Argument(name = "x-message-ttl", value = "20000", type = "java.lang.Long")
                }
  ),
  exchange = @org.springframework.amqp.rabbit.annotation.Exchange(name = "msg-exchange"),
  key = {"msg"}
))
public void receiveMsg(Message message, Channel channel) throws Exception{
  log.info("消息体:{}", new String(message.getBody()));
  channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
7.4 消息发送
@Test
public void sendMsg2() {
  // 消息属性
  MessageProperties messageProperties = MessagePropertiesBuilder
    .newInstance().setMessageId(UUID.randomUUID().toString())
    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
    .setExpiration("5000")  //消息的存活时间,与队列的TTL,取最小的时间,进入死信队列
    .build();
  // 消息
  Message message = new Message(JSONObject.toJSONBytes(new User(23, "message")), messageProperties);

  CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
  rabbitTemplate.convertAndSend("msg-exchange","msg", message, data);

  try {
    TimeUnit.SECONDS.sleep(120);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}
7.5 应用场景

场景一:未支付订单在规定的时间取消。实现的方式为,将订单消息放入到一个队列中,并指定其过期时间。当过期时间到了之后,就进入到了死信队列,那么可以直接在死信队列的消费端取出对应的消息即可。

场景二:某条消息在消费端曾多次尝试消费,但是均未消费成功,那么就进入死信队列,让人工干预。

八. 幂等性

所有的消息中间件都会存在这样一个问题,那就是消息的重复消费问题,例如说记录用户的积分信息,消息每次消费都会生成一条记录,这会队我们的业务带来致命的问题,所以我们必须做幂等性设计,所谓幂等设计就是,一条消息无论消费多少次所产生的结果都是相同的。对应的数学公式为:

f(n) = f(f(n))

8.1 方案一

为每条消息生成全局唯一ID,每次消费消息之后都将ID在表中插入一条数据,每次消费之前先查询ID是否存在,如果不存在就执行对应的逻辑;如果存在则直接确认。

8.2 方案二

利用redis+数据库的方案来实现幂等性的设计,实现的思路与redis的缓存击穿方案类似;当插入数据的时候,将唯一ID同时插入数据库,然后放入到redis中。

九. 消息的重试机制

消息的重试是发生在消息的消费端。

十.消息的可靠性投递

在这里插入图片描述

在这里插入图片描述

文档参考声明:

CSDN(sessinsong): https://blog.csdn.net/sessionsong/article/details/86317991

简书 (jiangmo):https://www.jianshu.com/p/64357bf35808

面试:a. 如何保证消息不丢失?

​ b. 如何保证消息的不重复消费?

​ c. 如何使用mq来是实现分布式事务?

​ d. 在工作中mq用在哪里?支付回调。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ使用详解 的相关文章

  • 使用spring-amqp和rabbitmq实现带退避的非阻塞重试

    我正在寻找一种使用 spring amqp 和 Rabbit MQ 的退避策略来实现重试的好方法 但要求是侦听器不应被阻止 因此可以自由地处理其他消息 我在这里看到了类似的问题 但它不包括 后退 的解决方案 RabbitMQ 和 Sprin
  • 在 RabbitMQ 主题交换中路由与模式不匹配的消息

    两个队列绑定到具有以下路由键的主题交换 队列 A 与路由键模式匹配绑定 foo队列 B 与路由键模式匹配绑定 bar 我想向此交换添加第三个队列 该队列接收的消息都不是foo消息也不bar消息 如果我用一个绑定这个队列 路由密钥 我自然会得
  • 如何根据条件限制并发消息消耗

    场景 我已经简化了事情 许多最终用户可以从前端 Web 应用程序 生产者 开始工作 繁重的工作 例如渲染大型 PDF 这些作业被发送到单个持久的 RabbitMQ 队列 许多工作应用程序 消费者 处理这些作业并将结果写回到数据存储中 这个相
  • Celery 3.0.1 中的框架错误

    我最近从 2 3 0 升级到 Celery 3 0 1 所有任务都运行良好 很遗憾 我经常收到 帧错误 异常 我还运行主管来重新启动线程 但由于这些线程从未真正被杀死 主管无法知道 celery 需要重新启动 有没有人见过这个 2012 0
  • 如何在 Node js 中保持分叉的子进程处于活动状态

    我想创建一个像带有node的foreverjs一样运行的rabbitmq cli 它可以生成 child process 并使其在后台运行 并且可以随时与 child process 进行通信 我面临的问题是 当主 cli 程序退出时 ch
  • 保持鼠兔 BlockingConnection 存活而不禁用心跳

    我正在使用 pika 0 10 0 和 python 2 7 版本开发 RabbitMQ 消费者 在我的消费者客户端中 我有一个根据输入消息运行一段时间的进程 时间可能从 3 到 40 分钟不等 我不想禁用心跳 相反 我正在寻找一些回滚机制
  • rabbitmq-erlang-client,使用 rebar 友好的 pkg,在开发环境上工作在 rebar 版本上失败

    我成功地将rabbitmq erlang client的rebar友好包用于一个简单的Hello World rebarized和OTP 兼容 应用程序 并且在开发环境中工作正常 我能够启动 erl 控制台并执行我的操作applicatio
  • 如何在nodejs中验证rabbitmq?

    错误 握手被服务器终止 403 ACCESS REFUSED 消息 ACCESS REFUSED 使用身份验证拒绝登录 旋转机制平原 有关详细信息 请参阅代理日志文件 我单独尝试了 authMechanism PLAIN AMQPLAIN
  • RabbitMQ 启动失败

    RabbitMQ Windows 服务将无法启动 C Program Files x86 RabbitMQ Server rabbitmq server 3 0 4 sbin gt rabbitmq service bat start C
  • Celery 任务状态取决于 CELERY_TASK_RESULT_EXPIRES

    据我所知 任务状态完全取决于 CELERY TASK RESULT EXPIRES 设置的值 如果我在任务完成执行后检查此间隔内的任务状态 则返回的状态为 AsyncResult task id state 是正确的 如果没有 状态将不会更
  • RabbitMQ Java 客户端自动重新连接

    当我的应用程序失去与 RabbitMQ 的连接时 我将其连接工厂设置为自动尝试并重新连接 ConnectionFactory factory new ConnectionFactory factory setUsername usernam
  • Erl 无法连接到本地 EPMD。为什么?

    Erlang R14B04 erts 5 8 5 source 64 bit rq 1 async threads 0 kernel poll false Eshell V5 8 5 abort with G root ip 10 101
  • 如何重置rabbitmq管理用户

    使用rabbitmq 我们可以安装管理插件 然后我们通过浏览器访问http localhost 55672 使用访客 访客 问题是 我无法再登录 因为我更改了密码并为角色输入了空白 有没有办法重置rabbitmq管理的用户 您可以通过以下方
  • AMQPRuntimeException:读取数据时出错。收到 0 而不是预期的 7 字节

    它曾经有效 但现在不再有效了 我正在使用 php amqplib 和 RabbitMQ 当我尝试创建新的 AMQP 连接时 connection new AMQPConnection localhost 5672 username pass
  • 面向服务的架构 - AMQP 或 HTTP

    一点背景 非常大的整体 Django 应用程序 所有组件都使用相同的数据库 我们需要分离服务 以便我们可以独立升级系统的某些部分而不影响其余部分 我们使用 RabbitMQ 作为 Celery 的代理 现在我们有两个选择 使用 REST 接
  • 生产者/消费者的不同语言

    我想知道是否可以通过 AMQP 和 RabbitMQ 对生产者和消费者使用不同的语言 例如 Java 代表生产者 python php 代表消费者 或者反之亦然 是的 AMQP 与语言无关 这意味着只要您有可以连接到 AMQP 的客户端sa
  • 如何使用 Java 在 RabbitMQ 中实现标头交换?

    我是一个新手 试图在java客户端中实现标头交换 我知道这就是 x match 绑定参数的用途 当 x match 参数设置为 any 时 只需一个匹配的标头值就足够了 或者 将 x match 设置为 all 强制所有值必须匹配 但任何人
  • 使用 Spring 与 RabbitMQ 集成

    我正在为我们的一个应用程序开发消息传递界面 该应用程序是一种服务 旨在接受 作业 进行一些处理并返回结果 实际上以文件的形式 这个想法是使用 RabbitMQ 作为消息传递基础设施 并使用 Spring AMQP 来处理协议特定的细节 我不
  • RabbitMQ 中的 celeryev 队列变得非常大

    我在rabbitmq上使用celery 我已经向队列发送了数千条消息 它们正在成功处理 一切正常 然而 几个rabbitmq队列中的消息数量增长得相当大 队列中有数十万个项目 队列被命名为celeryev 见下面的截图 这是适当的行为吗 这
  • Django Celery 和多个数据库(Celery、Django 和 RabbitMQ)

    是否可以设置与 Django Celery 一起使用的不同数据库 我有一个配置了多个数据库的项目 并且不希望 Django Celery 使用默认数据库 如果我仍然可以使用 django celery 管理页面并读取存储在这个不同数据库中的

随机推荐

  • 【铨顺宏项目推荐】RFID无线射频识别技术的设计思路

    一 项目背景 在传统的珠宝物流管理中 条形码技术通常被使用 虽然该技术可以在一定程度上提高物流管理效率 但仍不能满足现代珠宝行业的需求 条形码存储信息量低 信息不可追加 易损坏 读取位置要求高等问题逐渐显现 RFID与传统的条码识别方法相比
  • _thiscall调用约定的简单概念

    thiscall 我们知道在c中由三种调用约定 cdecl stdcall和 fastcall 其中 stdcall调用约定是windows平台的 在c 中还有一种约定 thiscall调用约定 它是一种类成员方法调用约定 当我们说起 th
  • python3神经网络学习NN学习初步(一)

    1 神经网络的概念 我们用一张图来了解一下吧 多层向前神经网络由以下部分组成 输入层 input layer 隐藏层 hidden layers 输入层 output layers 补充 一般第一层是输入层 最后一层是输出层 其他的的都是中
  • Impala presto hbase hive sparksql

    Impala 技术点梳理 http www cnblogs com TiestoRay p 10243365 html Impala 优点 实时性查询 计算的中间结果不写入磁盘 缺点 对于内存的依赖过于严重 内存溢出直接导致技术任务的失败
  • MainWindow.h

    ifndef MAINWINDOW H define MAINWINDOW H include
  • 中小企业如何低成本实施MES管理系统

    中小企业在市场竞争中需要有高效的管理体系来支持其运营和发展 中小企业MES管理系统是一种先进的管理系统 可以提升工厂智能化水平 提高生产效率 是中小企业必须采取的有效管理工具 然而 由于资金和技术的限制 中小企业往往难以承担高额的软件购置和
  • yocto编译常见问题及解决方法

    1 opt yocto rel share downloads exists but you do not appear to have write access to it 这个是没有权限往指定的目录里写 解决方法 将build目录下对应
  • oralce 超出表空间限额问题

    oralce 超出表空间限额问题 Pl sql建表时报 超出表空间限额 的错误 检查表空间 只占了3 的空间 最后经提醒原来是数据库用户默认只有15M的空间 最后通过修改用户的权限 不限制存储空间 解决问题 来自 ITPUB博客 链接 ht
  • 【VSCode】推荐一款Microsoft Visual Studio Code能在编辑器内智能补全代码的插件 - Tabnine AI

    Tabnine AI Autocomplete for Javascript Python Typescript PHP Go Java Ruby more Tabnine是一个AI代码补全插件 支持JavaScript Python Ja
  • openGL之API学习(一九四)glGenTextures glActiveTexture

    glGenTextures产生的是一个比较小的整数id 纹理单元名 glActiveTexture激活的是纹理单元号 GL TEXTUREi 它们二者的关系为GL TEXTUREi GL TEXTURE0 id glBindTexture使
  • MySQL server has gone away (BrokenPipeError(32, 'Broken pipe'))[MySQL插入内容超过4M]

    MySQL server has gone away BrokenPipeError 32 Broken pipe MySQL插入内容超过4M Bug描述 用Python的pymysql向MySQL数据库insert插入数据时 遇到报错信息
  • 估算服务器处理数据性能,服务器性能计算方法-20210720074826.docx-原创力文档

    一 数据库服务器性能计算需求分析 考虑到广州市公安局超级情报系统 SIS 设备升级项目的数据库 服务器的性能 我们建议采用主流的TPC C值进行性能估算 TPC C 是一种旨在衡量联机事务处理 OLTP 系统性能与可 伸缩 性的行业标准基准
  • Multi-mode pattern generator

    Multi mode pattern generator verilog编写 支持如下模式 Full screen White Full screen Black Full screen Red Full screen Green Full
  • JAVA操作Excel时文字自适应单元格的宽度设置方法

    使用JAVA操作Excel通常都使用JXL 方法很简单网上也有很多的教程 然后往往一些细节性的问题却导致我们这些Programmer苦恼不已 这两天帮一个朋友做一个Excel表格自动生成的小软件 就遇到的类似的问题 问题描述 通过Java向
  • 不同数据库的validationQuery检查语句

    数据库 validationQuery Oracle select 1 from dual MySQL select 1 Microsoft SQL Server select 1 DB2 select 1 from sysibm sysd
  • LeetCode83: 删除排序链表中的重复元素

    给定一个已排序的链表的头 head 删除所有重复的元素 使每个元素只出现一次 返回 已排序的链表 示例 1 输入 head 1 1 2 输出 1 2 示例 2 输入 head 1 1 2 3 3 输出 1 2 3 提示 链表中节点数目在范围
  • ASP高级计划与排产简介

    ASP Advanced Planning and Scheduling 高级计划与排产是一种计算机化的制造计划和排程系统 旨在帮助制造商更有效地管理其生产过程 它可以通过对订单需求 库存水平 生产能力和物料供应等关键信息的实时跟踪和分析
  • Git Bash ssh远程连接kali linux

    操作步骤 查看本机IP地址等网络相关信息 ifconfig 开启ssh服务 该命令正常情况下没有回显 service ssh start 查看ssh服务是否正常开启 如果正常开启 则此时有 active running 的提示 servic
  • dynamic_cast报错 异常

    转载请标明是引用于 http blog csdn net chenyujing1234 代码 http www rayfile com zh cn files 89459c23 7a0b 11e1 908f 0015c55db73d UnH
  • RabbitMQ使用详解

    文章目录 RabbitMQ 一 简介 二 rabbitmq基本原理 三 安装 3 1安装erlang环境 3 2 安装rabbitmq 3 3 使用docker安装 四 RabbitMQ程序的编写 4 1 rabbitMQ支持的消息模型 4