项目场景:
由于我们现在所做的项目有有很多的外放接口供代理商调用,但是有些接口的响应并不是实时返回的,此时我们就需要使用回调接口的方式,将信息响应给代理商。在这期间可能会出网络不稳定等其他情况,导致回调接口调用失败。所以需要特定的回调重试机制来进行处理。这个机制参考了支付宝的通知模式。
思考过程:
回调本身很简单,只需代理商按要求实现我们的接口便可,但是重试机制比较麻烦,他的重试时间间隔不固定,而且到后面时间的间隔过长,使用定时任务达到的效果也并不理想,最后,通过使用rabbitmq,死信队列的特性,完美的实现了这个重试机制(我认为的,不知道有没有更好的办法),解决方案见下。
解决方案:
首先,rabbitmq的消息如果设置了ttl(ttl对应我们重试的间隔时间),在ttl时间内没有被消费,就自动进入死信队列,此时我们就消费死信队列里面的消息,这样正好达到消息延迟消费的效果,那么,如何实现不同的重试间隔:我们把七次重试的间隔,保存到数组里面,并用redis记录消息的重试次数取对应的数组下标,当我们重试失败的话,就通过redis拿到次数,在数组中取出时间,设置到消息里面,重新发送到队列中。到此,重试机制就完成了。下面是代码实现。
代码实现:
这里是,队列的配置
@Configuration
public class RabbitGatewayCallbackConfig {
/**
* 缓冲交换机
*/
@Bean
public DirectExchange gatewayCallbackDelayExchange() {
return new DirectExchange(ExchangeConst.GATEWAY_CALLBACK_DELAY_EXCHANGE);
}
/**
* 实际消费队列
* 我们会监听这个队列,并对队列里面的消息进行消费
*/
@Bean
public Queue gatewayCallbackQueue() {
return new Queue(QueueConst.GATEWAY_CALLBACK_QUEUE,true,false,false);
}
/**
* 绑定交换机并指定routing key
*/
@Bean
public Binding gatewayCallbackBinding() {
return BindingBuilder.bind(gatewayCallbackQueue()).to(gatewayCallbackDelayExchange()).with(RoutingConstant.GATEWAY_CALLBACK_ROUTING);
}
/**
* 回调缓冲队列,我们所用的消息都会放到这个队列里面,实际并不会消费这个
* 队列里面的消息,而是等到消息过期后直接进到信息队列里面进行消费
*/
@Bean
public Queue gatewayCallBufferQueue() {
Map<String,Object> args = new HashMap<>();
//args.put("x-message-ttl", "10000");由于延迟回调时间不固定,所以禁用此配置
args.put("x-dead-letter-exchange", ExchangeConst.GATEWAY_CALLBACK_DELAY_EXCHANGE);
args.put("x-dead-letter-routing-key", RoutingConstant.GATEWAY_CALLBACK_ROUTING);
return new Queue(QueueConst.GATEWAY_CALLBACK_BUFFER_QUEUE, true, false, false, args);
}
}
具体业务的代码,基本的思路如下:
@Component
public class GatewayCallbackListener {
private static final Logger logger = LoggerFactory.getLogger(GatewayCallbackListener.class);
//保存对应的重试时间间隔
private static final String[] CALLBACK_INTERVAL = {"0", "240000", "600000", "600000", "3600000", "7200000", "21600000", "54000000"};
//自己封装的redis工具类
@Resource
private RedisCommon redisCommon;
@Resource
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = QueueConst.GATEWAY_CALLBACK_QUEUE)
public void callBack(Message message) {
//获取消息中回调的相关参数
String messageString = new String(message.getBody(), StandardCharsets.UTF_8);
logger.info("QueueConst.GATEWAY_CALLBACK_QUEUE 队列成功接收到消息,time[{}];message:[{}],", new Date(), messageString);
//因为保存的是json字符串,所以直接解析为json对象
JSONObject messageJson = JSONObject.parseObject(messageString);
//获取相应的参数,根据名称你们也大概知道是什么意思了
String messageId = messageJson.getString("messageId");//作为redis的key值,redis的key保存着重试的次数
String messageData = messageJson.getString("message");
String businessGatewayUrl = messageJson.getString("businessGatewayUrl");
if (StringUtils.isEmpty(businessGatewayUrl)) {
logger.error("businessGatewayUrl 为空,不进行回调处理,此消息为垃圾消息,不进行重试");
redisCommon.del(messageId);
return;
}
JSONObject messageDataJson = JSONObject.parseObject(messageData);
//自己封装的restTemplate方法,在这里进行回调
String callbackResultJsonString = MarvinHttpsUtil.doPost(businessGatewayUrl, messageDataJsonToMap(messageDataJson), 3000);
logger.info("回调代理商接口返回信息:[{}]", callbackResultJsonString);
JSONObject callbackResultJson = JSONObject.parseObject(callbackResultJsonString);
//判断是否回调成功,失败的话进入队列进行重试
if (Objects.isNull(callbackResultJson) || !"0".equals(callbackResultJson.getString("resultCode"))) {
//获取回调次数
logger.info("接口回调失败,进入重试");
String callbackTimesString;
if (StringUtils.isEmpty(callbackTimesString = redisCommon.get(messageId))) {
logger.error("messageId 为空,不继续进行回调处理,此消息为垃圾消息,不进行重试");
redisCommon.del(messageId);
return;
}
int callbackTimes;
try {
callbackTimes = Integer.parseInt(callbackTimesString);
} catch (Exception e) {
logger.error("callbackTimes不为数字,说明此消息在redis中的相关配置已被恶意篡改过,不进行重试");
redisCommon.del(messageId);
return;
}
if (++callbackTimes >= CALLBACK_INTERVAL.length) {
logger.error("该消息已经超过最多重试次数,不会再继续进行重试");
redisCommon.del(messageId);
return;
}
redisCommon.setAndTtl(messageId, callbackTimes + "",GatewayCallbackListener.getCallbackInterval(callbackTimes) + 600000);
rabbitTemplate.convertAndSend(QueueConst.GATEWAY_CALLBACK_BUFFER_QUEUE, GatewayCallbackListener.getMessage(messageString, callbackTimes));
} else {
logger.info("接口回调成功");
redisCommon.del(messageId);
}
}
private Map<String, String> messageDataJsonToMap(JSONObject messageDataJson) {
Map<String, String> map = new HashMap<>();
//业务相关参数,这里不展示
return map;
}
public static Message getMessage(String JsonStringData, int index) {
if (index > CALLBACK_INTERVAL.length - 1) {
throw new RuntimeException("index 不可大于数组的长度");
}
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration(CALLBACK_INTERVAL[index]);
return new Message(JsonStringData.getBytes(StandardCharsets.UTF_8), messageProperties);
}
public static long getCallbackInterval(int index){
try {
return Long.parseLong(CALLBACK_INTERVAL[index]);
} catch (Exception e) {
throw new RuntimeException("请输入正确的index值:需大于0并且小于8");
}
}
}
总的来说就是 当我们回调失败的话,就会对回调的内容进行封装,并保存到消息里面,保存重试之间间隔,然后在redis上设置记录该消息重试次数的key值,每次重试都会取数组的中的值设置到消息里面,再加一直到第七次不再继续进行重试(记得为redis设置ttl,防止业务异常,而key一直存在没删掉)。