重试机制的实现(4m,10m,10m,1h,2h,6h,15h)

2023-11-20

项目场景:

由于我们现在所做的项目有有很多的外放接口供代理商调用,但是有些接口的响应并不是实时返回的,此时我们就需要使用回调接口的方式,将信息响应给代理商。在这期间可能会出网络不稳定等其他情况,导致回调接口调用失败。所以需要特定的回调重试机制来进行处理。这个机制参考了支付宝的通知模式。


思考过程:

回调本身很简单,只需代理商按要求实现我们的接口便可,但是重试机制比较麻烦,他的重试时间间隔不固定,而且到后面时间的间隔过长,使用定时任务达到的效果也并不理想,最后,通过使用rabbitmq,死信队列的特性,完美的实现了这个重试机制(我认为的,不知道有没有更好的办法),解决方案见下。

解决方案:

首先,rabbitmq的消息如果设置了ttl(ttl对应我们重试的间隔时间),在ttl时间内没有被消费,就自动进入死信队列,此时我们就消费死信队列里面的消息,这样正好达到消息延迟消费的效果,那么,如何实现不同的重试间隔:我们把七次重试的间隔,保存到数组里面,并用redis记录消息的重试次数取对应的数组下标,当我们重试失败的话,就通过redis拿到次数,在数组中取出时间,设置到消息里面,重新发送到队列中。到此,重试机制就完成了。下面是代码实现。

在这里插入图片描述


代码实现:

这里是,队列的配置

@Configuration
public class RabbitGatewayCallbackConfig {


    /**
     * 缓冲交换机
     */
    @Bean
    public DirectExchange gatewayCallbackDelayExchange() {
        return new DirectExchange(ExchangeConst.GATEWAY_CALLBACK_DELAY_EXCHANGE);
    }

    /**
     * 实际消费队列
     * 我们会监听这个队列,并对队列里面的消息进行消费
     */
    @Bean
    public Queue gatewayCallbackQueue() {
        return new Queue(QueueConst.GATEWAY_CALLBACK_QUEUE,true,false,false);
    }

    /**
     * 绑定交换机并指定routing key
     */
    @Bean
    public Binding gatewayCallbackBinding() {
        return BindingBuilder.bind(gatewayCallbackQueue()).to(gatewayCallbackDelayExchange()).with(RoutingConstant.GATEWAY_CALLBACK_ROUTING);
    }

    /**
     * 回调缓冲队列,我们所用的消息都会放到这个队列里面,实际并不会消费这个
     * 队列里面的消息,而是等到消息过期后直接进到信息队列里面进行消费
     */
    @Bean
    public Queue gatewayCallBufferQueue() {
        Map<String,Object> args = new HashMap<>();
        //args.put("x-message-ttl", "10000");由于延迟回调时间不固定,所以禁用此配置
        args.put("x-dead-letter-exchange", ExchangeConst.GATEWAY_CALLBACK_DELAY_EXCHANGE);
        args.put("x-dead-letter-routing-key", RoutingConstant.GATEWAY_CALLBACK_ROUTING);
        return new Queue(QueueConst.GATEWAY_CALLBACK_BUFFER_QUEUE, true, false, false, args);
    }

}

具体业务的代码,基本的思路如下:

@Component
public class GatewayCallbackListener {

    private static final Logger logger = LoggerFactory.getLogger(GatewayCallbackListener.class);
	
	//保存对应的重试时间间隔
    private static final String[] CALLBACK_INTERVAL = {"0", "240000", "600000", "600000", "3600000", "7200000", "21600000", "54000000"};

	//自己封装的redis工具类
    @Resource
    private RedisCommon redisCommon;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = QueueConst.GATEWAY_CALLBACK_QUEUE)
    public void callBack(Message message) {
    	//获取消息中回调的相关参数
        String messageString = new String(message.getBody(), StandardCharsets.UTF_8);
        logger.info("QueueConst.GATEWAY_CALLBACK_QUEUE 队列成功接收到消息,time[{}];message:[{}],", new Date(), messageString);
        //因为保存的是json字符串,所以直接解析为json对象
        JSONObject messageJson = JSONObject.parseObject(messageString);
        //获取相应的参数,根据名称你们也大概知道是什么意思了
        String messageId = messageJson.getString("messageId");//作为redis的key值,redis的key保存着重试的次数
        String messageData = messageJson.getString("message");
        String businessGatewayUrl = messageJson.getString("businessGatewayUrl");
        if (StringUtils.isEmpty(businessGatewayUrl)) {
            logger.error("businessGatewayUrl 为空,不进行回调处理,此消息为垃圾消息,不进行重试");
            redisCommon.del(messageId);
            return;
        }
        JSONObject messageDataJson = JSONObject.parseObject(messageData);
		//自己封装的restTemplate方法,在这里进行回调
        String callbackResultJsonString = MarvinHttpsUtil.doPost(businessGatewayUrl, messageDataJsonToMap(messageDataJson), 3000);

        logger.info("回调代理商接口返回信息:[{}]", callbackResultJsonString);
        JSONObject callbackResultJson = JSONObject.parseObject(callbackResultJsonString);
        //判断是否回调成功,失败的话进入队列进行重试
        if (Objects.isNull(callbackResultJson) || !"0".equals(callbackResultJson.getString("resultCode"))) {
            //获取回调次数
            logger.info("接口回调失败,进入重试");
            String callbackTimesString;
            if (StringUtils.isEmpty(callbackTimesString = redisCommon.get(messageId))) {
                logger.error("messageId 为空,不继续进行回调处理,此消息为垃圾消息,不进行重试");
                redisCommon.del(messageId);
                return;
            }
            int callbackTimes;
            try {
                callbackTimes = Integer.parseInt(callbackTimesString);
            } catch (Exception e) {
                logger.error("callbackTimes不为数字,说明此消息在redis中的相关配置已被恶意篡改过,不进行重试");
                redisCommon.del(messageId);
                return;
            }

            if (++callbackTimes >= CALLBACK_INTERVAL.length) {
                logger.error("该消息已经超过最多重试次数,不会再继续进行重试");
                redisCommon.del(messageId);
                return;
            }
            redisCommon.setAndTtl(messageId, callbackTimes + "",GatewayCallbackListener.getCallbackInterval(callbackTimes) + 600000);
            rabbitTemplate.convertAndSend(QueueConst.GATEWAY_CALLBACK_BUFFER_QUEUE, GatewayCallbackListener.getMessage(messageString, callbackTimes));
        } else {
            logger.info("接口回调成功");
            redisCommon.del(messageId);
        }
    }

    private Map<String, String> messageDataJsonToMap(JSONObject messageDataJson) {
        Map<String, String> map = new HashMap<>();
		//业务相关参数,这里不展示
        return map;
    }


    public static Message getMessage(String JsonStringData, int index) {
        if (index > CALLBACK_INTERVAL.length - 1) {
            throw new RuntimeException("index 不可大于数组的长度");
        }
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration(CALLBACK_INTERVAL[index]);
        return new Message(JsonStringData.getBytes(StandardCharsets.UTF_8), messageProperties);
    }


    public static long getCallbackInterval(int index){
        try {
            return Long.parseLong(CALLBACK_INTERVAL[index]);
        } catch (Exception e) {
            throw new RuntimeException("请输入正确的index值:需大于0并且小于8");
        }
    }

}

总的来说就是 当我们回调失败的话,就会对回调的内容进行封装,并保存到消息里面,保存重试之间间隔,然后在redis上设置记录该消息重试次数的key值,每次重试都会取数组的中的值设置到消息里面,再加一直到第七次不再继续进行重试(记得为redis设置ttl,防止业务异常,而key一直存在没删掉)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

重试机制的实现(4m,10m,10m,1h,2h,6h,15h) 的相关文章

随机推荐

  • 用于包管理的基本命令APT-GET和APT-CACHE

    这篇文章解释你如何快速学习从命令行使用apt get和apt cache安装 移除 升级和搜索软件包 这篇文章提供一些有用命令 它们将帮助你在基于Debian Ubuntu的系统中处理包管理 apt get是什么 apt get工具是一个强
  • Flutter基础Dart单例的实现

    引言 在日常开发搭建基础框架时 常常用到全局使用的数据类或者工具类 比如日期工具类 地图工具类等 在项目一般使用几种单例类来分享心得 有类实例方式 工厂方式以及类静态方式 实例方式 使用DartPad cn网站来练习dart简单代码片段还是
  • [QT编程系列-40]:QML语言简介

    目录 第1章 简介 第2章 QT QML示例 第三章 QML的步骤 第1章 简介 QML Qt Meta Object Language 是Qt框架中用于构建用户界面的声明性语言 它是一个轻量级的语言 用于描述界面的结构和行为 使开发人员能
  • 樱花动漫中的视频下载分析

    昨天有个朋友问我樱花动漫中的视频怎么下载 那么今天我就写篇文章来专门分析下樱花动漫中的视频是怎么下载的 1 还是打开我们的马赛克视频助手 来分析樱花动漫的数据包 当然你们也没有用其他的抓包工具 不过我习惯了用这个 可以使用它的转到来源功能
  • EPI distortion correction形变矫正, eddy, fieldmap等五种不同方法

    EPI distortion correction形变矫正 1 topup eddy 2 fieldmap eddy 2 1 对mag做去脑壳 2 2 基于去过脑壳的mag 1volume bet nii gz数据 对fieldmap进行预
  • STM32 FLASH操作

    STM32 的闪存模块由 主存储器 信息块和闪存存储器接口寄存器等 3 部分组成 主存储器 该部分用来存放代码和数据常数 如 const 类型的数据 对于大容量产品 其被划分为 256 页 每页 2K 字节 注意 小容量和中容量产品则每页只
  • 解决iframe在ios中无法滚动的bug

    在解决iframe在ios无法滚动的bug中 需要在iframe外面包裹一层div 如下 div class scroll wrapper div 然后设置scroll wrapper的样式 给scroll wrapper添加 webkit
  • camunda 流程引擎如何开始并行任务,且有哪些实现方式?

    认识并行任务 在流程引擎开发中 稍微复杂一点的项目都会遇到并行任务 什么是并行任务呢 就是在完成一个流程节点后 下一步会同时开始多个任务 且任务与任务之间互不影响 这个说起来真的有点绕 刚开始我真不理解 试了好多次才想明白 下面就画2条不同
  • 错误的分页写法及修改

    一 错误的写法 api GetMapping groupList ApiImplicitParams ApiImplicitParam name current value 页码 required true dataType Long da
  • MMdetection学习笔记 第一步安装配置

    安装 之前的安装老是出问题 这里重新仔细进行第三遍 参考了哔哩哔哩的视频教程 mmdetection 教程 使用篇 https www bilibili com video BV1Jb4y1r7ir p 3 share source cop
  • Qt(windows下)捕获异常信息并自动重启

    参考 https blog csdn net x85371169 article details 79267592 目前在弄一个工业上用的软件 需要实现无人值守功能 软件经过两三个星期的debug 已经将绝大部分导致软件死机的bug修复 但
  • 使用burpsuite对web进行账号密码暴力破解

    一 打开环境 1 打开php 2 打开burp suite 3 打开小狐狸 进入127 0 0 1 pikachu 二 Repeater 重发器 1 访问pikachu 多次进行用户名和密码的登录 描述 2 burp suite 中抓包 3
  • set和multiset的用法详解

    一 set文档介绍 1 set是按照一定次序存储元素的容器 2 在set中 元素的value也标识它 value就是key 类型为T 并且每个value必须是唯一的 set中的元素 不能在容器中修改 元素总是const 但是可以从容器中插入
  • 【语义分割】2、Mapillary 数据集简介

    文章目录 一 简介 二 类别 三 标注示例 一 简介 Mapillary Vistas 数据集包含 66 类共 25 000 张高分辨率街景场景的数据 其中有 37 个类是以实例区分的标签 数据总量是 cityscapes 的5倍之多 包括
  • Open3D 基于点云高程制作热力图

    目录 一 概述 二 代码实现 三 结果展示 一 概述 如题 基于点云的高程来制作热力图渲染赋色点云 其结果如下图所示 二 代码实现 import numpy as np import open3d as o3d from matplotli
  • leetcode刷题方法

    leetcode刷题方法 一 范围内的200题 二 刷题步骤 三 算法思路 四 更新 1 12日留 此文章借鉴 陈同学在搬砖 微信公众号的一篇文章 https mp weixin qq com s xr2abGNv8wDZJ qyN4Kew
  • HTML、CSS、JavaScript分别实现什么功能?

    学习Web前端开发基础技术需要掌握 HTML CSS JavaScript 那么这三个都是分别实现什么功能的呢 下面和小编一起来看看吧 一 HTML是网页内容的载体 内容就是网页制作者放在页面上想要让用户浏览的信息 可以包含文字 图片 视频
  • Springboot整合dubb3+nacos作注册中心(基础篇)

    1 首先看下项目结构如下 2 新建父工程springboot dubbo模块 pom文件如下
  • 亲测可用,SpringBoot项目打印接口请求信息日志,CommonsRequestLoggingFilter实现方式

    文章目录 需求背景 效果图 实现思路 其他方案对比 优缺点分析 具体实现 需求背景 线上项目出现bug时 可以通过接口的请求参数来排查定位问题 和业务方battle时 能够证明他是自己操作的问题 效果图 实现思路 Spring提供了Comm
  • 重试机制的实现(4m,10m,10m,1h,2h,6h,15h)

    项目场景 由于我们现在所做的项目有有很多的外放接口供代理商调用 但是有些接口的响应并不是实时返回的 此时我们就需要使用回调接口的方式 将信息响应给代理商 在这期间可能会出网络不稳定等其他情况 导致回调接口调用失败 所以需要特定的回调重试机制