文章目录
- 1. 环境配置
- 2. RabbitMq配置
- 2.1 消息发送确认机制
- 2.2 消息投递确认机制
- 2.3 ACK消息签收机制
- 3. 消息生产者
1. 环境配置
pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.yml
server:
port: 9001
spring:
rabbitmq:
host: 127.0.0.1
virtual-host: /
username: guest
password: guest
listener:
simple:
prefetch: 1
acknowledge-mode: manual
retry:
enabled: true
publisher-confirm-type: correlated
publisher-returns: true
2. RabbitMq配置
RabbitAdmin配置
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitAdminConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtualhost}")
private String virtualhost;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
2.1 消息发送确认机制
为确保消息发送的准确性,设置发布时确认,确认消息是否到达 Broker 服务器 消息只要被Broker接收,就会触发 ConfirmCallbackConfig
回调
消息接收确认回调
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送到Broker成功!");
} else {
System.out.println("发送异常原因 = " + cause);
}
}
}
2.2 消息投递确认机制
如果消息未能投递到目标 queue
里将触发回调 returnCallback
,一旦向 queue
投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作
消息投递机制回调接口
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class ReturnCallbackConfig implements RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnCallback(this);
}
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("returnCallback ..............");
System.out.println(message);
System.out.println(i);
System.out.println(s);
System.out.println(s1);
System.out.println(s2);
}
}
2.3 ACK消息签收机制
为确保消息 消费成功,需设置消费者消息确认机制
,如果消费失败或异常了,可做补偿机制
以下是三种 channel
签收方式
basicAck
消息确认basicNack
消息回退basicReject
消息拒绝
消费者消息确认机制
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class ACKConfirmListener {
@RabbitListener(queuesToDeclare = @Queue(value = "simple.queue", durable = "true"))
public void ackListener(String msg, Channel channel, Message message) throws IOException {
try {
System.out.println("msg = " + msg);
throw new RuntimeException("消费者故意抛出异常......");
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息消费异常,重回队列......");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
System.out.println("ACK消息消费确认.....")
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
3. 消息生产者
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend() {
rabbitTemplate.convertAndSend("simple.queue", "ACK消息确认机制生产者......");
}
https://blog.csdn.net/qq_48721706/article/details/125194646
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)