本文对应的代码地址:https://github.com/zhangshilin9527/rabbitmq-study
前置工作:
1.安装rabbitmq
2.登录
地址: http://localhost:15672/
账号密码: guest/guest
3.创建角色
4.1 创建virtual Hosts
4.2为virtual Hosts赋权
5.增加Exchanges(交换机)
6.增加Queue(队列)
7.Exchange绑定queue
Springboot集成RabbitMQ
Springboot集成RabbitMQ,首先创建一个Springboot项目,可以通过官网的脚手架生成一个springboot项目。Springboot集成RabbitMQ的官网地址为:https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-rabbitmq,下面是我参考文档集成的步骤:
生产者代码
1.加入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.加入yaml配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: xiaolinzi
username: xiaolinzi
password: xiaolinzi
#超时时间
connection-timeout: 10000s
#开启消息确认模式
publisher-confirms: true
#开启消息送达提示
publisher-returns: true
#开启不可达消息不会被broker给删除
template:
mandatory: true
3.配置mq
/**
* 交换机
* @return
*/
@Bean
public DirectExchange xiaolinziDirectExchange() {
//durable 表示小时是否持久化
//autoDelete 消息是否自动删除
DirectExchange directExchange = new DirectExchange(DIRECT_EXCHANGE, true, false);
return directExchange;
}
/**
* 队列
* @return
*/
@Bean
public Queue xiaolinziQueue() {
//exclusive:是否排外的 一般等于true的话用于一个队列只能有一个消费者来消费的场景
Queue queue = new Queue(DIRECT_QUEUE, true, false, false);
return queue;
}
/**
* 绑定关系
* @return
*/
@Bean
public Binding xiaolinziBinder() {
return BindingBuilder.bind(xiaolinziQueue()).to(xiaolinziDirectExchange()).with(DIRECT_QUEUE_KEY);
}
4.发送消息
public void sendMsg() {
SendInfo sendInfo = bulidSendInfo();
//构建correlationData 唯一标识,可以使用id做特殊处理
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//开启确认模式
rabbitTemplate.setConfirmCallback(new XiaolinziConfirmCallBack());
//开启消息投递监听
rabbitTemplate.setReturnCallback(new XiaolinziRetrunCallBack());
rabbitTemplate.convertAndSend("xiaolinzi_direct", "xiaolinzi_direct_queue_key", JSONObject.toJSONString(sendInfo), correlationData);
}
消费者代码
1.加入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.加入yaml配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: xiaolinzi
username: xiaolinzi
password: xiaolinzi
#超时时间
connection-timeout: 10000s
#开启消息确认模式
publisher-confirms: true
#开启消息送达提示
publisher-returns: true
#开启不可达消息不会被broker给删除
template:
mandatory: true
3.接受消息
@RabbitListener(queues = {"xiaolinzi_direct_queue"})
@RabbitHandler
public void consumerMsg(Message message, Channel channel) throws IOException {
logger.info("消费消息:{}", message);
//手工签收
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
logger.info("接受deliveryTag:{}", deliveryTag);
channel.basicAck(deliveryTag, false);
}
测试
发送消息:
看一下交换机上,现在有一条消息
接收消息
再看一下交换机上,已经没有了消息