RabbitMQ高级特性
1.1 消息的可靠投递
1.1.1 消息发送端
在使用RabbitMQ的时候,作为消息发送方式希望杜绝任何消息丢失或者投递失败的场景。RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性模式
- confirm: 确认模式
- return: 退回模式
RabbitMQ整个消息投递的路径为:
producer ➡rabbimq broker ➡exchange➡queue➡consumer
- 消息从prodeucer到exchange会返回一个confirmCallback
- 消息从exchange到queue投递失败会返回一个returnCallback
我们将利用这两个callback控制消息的可靠性传递
- 设置ConnectiongFactory的publish-coinfirms=“true”开启确认模式
- 使用rabbbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法,在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理
- 设置ConnectionFactory的publisher-returns=“true”开启退回模式
- 使用rabbitTemplate.setRetrurnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer,并执行回调函数returnMessage
- 在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解
1.1.2 消费端接收确认
Consumer Ack
表示消费端收到消息后的确认方式
有三种方式:
- 自动确认:acknowledge=“none"
- 手动确认: acknowledge=:manual"
- 根据异常情况确认:acknowledge="auto"该方式使用麻烦
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将响应message从RabbitMQ的消息缓存中移除。但在实际业务处理中,很有可能接收到,业务处理出现异常,那么消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息
1.2 消费端限流
在rabbit:listener-container中配置prefetch属性设置消费端一次拉去多少笑
消费端的确认方式一定为手动确认。acknowledge=”manual“
只有被消费端拉过去的所有消息都确认完了才会进行下一次的消费
1.3 TTL(Time To Live)
即存活时间/过期时间 属性名:x-message-ttl
当消息达到存活时间后,还没有被消费,会被自动清除
RabbitMQ可以对消息设置过期时间,也可以对整个队列设置过期时间
- 设置队列过期时间使用参数:x-message-ttl,单位毫秒,会对整个队列消息统一过期
- 设置消息过期时间使用参数:expiration,单位毫秒,当该消息在队列头部时(消费时)会单独判断这一消息是否过期
- 如果两者都进行了设置,以时间短的为准,此外,若顶部消息为3s,后面消息过期时间为2s,则需顶部消息删除后,后续消息才会进行删除
1.4 死信队列
英文名缩写,DLX, Dead letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机时DLX
消息成为死信的三种情况:
- 队列消息长度到达限制
- 消费者拒绝接受消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false
- 原队列存在消息过期设置,消息到达超时时间未被消费
队列绑定死信i交换机
给队列设置参数: x-dead-letter-exchange和x-dead-letter-routing-key
当消息成为死信后,若该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
1.5 延迟队列
延迟队列,即消息进入队列后不会被立即消费,只有到达指定时间后,才会被消费。
如需求:
- 下单后,30分钟未支付,取消订单,回滚库存
- 新用户注册成功7天后,发送短信问候
实际上,rabbitMq中并未提供延迟队列功能,但是可以使用:TTL+死信队列组合实现延迟队列的效果
2. 应用问题
2.1 消息可靠性保障–消息补偿
需求: 100%确保消息发送成功
2.2 消息幂等性保障
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
消息幂等性保障–乐观锁机制
通过数据库的乐观锁保证消息的幂等性