如何对第三方相同请求进行筛选过滤

2023-05-16

文章目录

    • 问题背景
    • 处理思路
    • 注意事项
    • 代码实现

问题背景

公司内部多个系统共用一套用户体系库,对外(钉钉)我们是两个客户身份(这里是根据系统来的),例如当第三方服务向我们发起用户同步请求:是一个更新用户操作,它会同时发送一个 delete 和 insert 请求,这两个请求几乎是并发进来的,实际上应该是先发起的delete 再 insert, 实际情况可能和网络延迟也有关系,此时在我们系统中就无法保证这两个请求的顺序执行,即先 delete 处理完之后 再进行 insert 的数据处理(正常流程),又或者直接把一定时间内同一个用户的 delete 和 insert 操作合并为一个update操作(本质就是更新操作)。

还有一种情况是:第三方系统中添加或者 删除一个用户时,会以两个客户的身份去发送两个相同的用户同步请求,但同一个用户在我们系统内用户数据只有一份,对应的接口肯定也都是同一个,即相同的添加接口会在一瞬间被调用两次,删除即使执行两次的话也没什么问题,问题是添加 即使在添加前判断了用户账号是否存在 并发过来的情况下还是避免不了一些脏数据的产生,加锁的话对整体影响又特别大。

处理思路

根据userId(账号)为每个请求分配一个房间(单独的线程),如果是第一次进来那么就new一个房间(也就是类,里边会有一个单独的线程处理这个用户的行为),后边一定时间内相同的 userId 进来会找到对应已存在的房间,当设置的时间窗口到了之后,判断当前userId的同步行为有哪些,如果有 insert 和 delete,那么直接转为 update 操作。如果是两个insert行为,那么最后就只调用一次insert服务,如果是两个delete行为,那么就只调用一个delete服务。

注意事项

时间窗口的设定,如果时间设置过短,属于同一个操作的请求因为网络波动 请求到接口的时间会有一定间隔,如果你设置的时间间隔小于等待的时间,还是会把本就属于同一批次的操作 多次处理

测试过程:刚开始时间设置的1500ms,也就是当第一个userId进来后,等待1.5秒后根据这段时间内收集到的用户行为再去真正的处理,后来在测试中发现有些本就属于同一批次的请求还是会被处理多次,也就是时间调小了,改成2000ms,测试还是发现同样的问题。最后:采取的是根据最近一个的userId请求的时间 等待1500ms,即相同的userId的请求进来后 在当前时间再重新计算等待1500ms,时间到了之后没有发现新的用户行为即算是一个批次结束

ps:可以创建一个单独的服务来负责对请求进行合理的处理分发,处理之后再去调用对应的业务系统服务

代码实现

定义操作行为枚举

public enum OperationEnum {

    INSERT("insert"),
    DELETE("delete"),
    ;
    private final String value;

    OperationEnum(String operation) {
        this.value = operation;
    }

    public String getValue() {
        return value;
    }
}

定义每个用户所属的房间,房间内存储用户的多个行为(insert、delete):

public class ActionRoomBean {

    //用于保存有效事件数据 <insert or delete, data>
    private Map<String, JSONObject> actionDataMap = new HashMap<>();
    private String userId;
    //真正负责处理事件的线程
    private DispatchTask dispatchTask;

    /**
     * 定义操作方法 排队接收
     * @param action 请求动作:insert 或者 delete
     * @param data 请求参数
     */
    public void addAction(String action, JSONObject data) {
        //有新请求进来后 计数器 + 1
        if(dispatchTask != null){
            dispatchTask.getIncrementAndGet();
        }
        //如果包含直接跳出
        if (actionDataMap.containsKey(action)) {
            return;
        }
        actionDataMap.put(action, data);
    }

    public ActionRoomBean() {
    }
    /**
     * 有参构造
     * @param userId 用户账号
     * @param actionDataMap 操作类型,请求参数
     */
    public ActionRoomBean(String userId, Map<String, JSONObject> actionDataMap) {
        this.actionDataMap = actionDataMap;
        this.userId = userId;
    }
    /**
     * 创建完这个类的实例后,要先调用startManager方法 启动线程
     */
    public void startManager() {
        dispatchTask = new DispatchTask(userId, actionDataMap);
        new Thread(dispatchTask).start();
    }
}

房间内真正的执行者(子线程):

public class DispatchTask implements Runnable {
    //等待的时间窗口
    private static long sleepTime = 1500;
    //计数器,用户有新的行为之后 +1,用来控制是否继续等待(sleep)
    private final AtomicInteger count = new AtomicInteger(0);
    //用于保存有效事件数据 <insert or delete, data>,与 ActionRoomBean中的 actionDataMap 指向的是同一个地址
    Map<String, JSONObject> actionDataMap;
    //用户账号
    String userId;
    /**
     * 有参构造
     */
    public DispatchTask(String userId, Map<String, JSONObject> dataLib) {
        this.userId = userId;
        this.actionDataMap = dataLib;
    }

    @Override
    public void run() {
        try {
            //线程等待前的数量和休眠后被唤醒的数量做对比,如果不相等说明休眠时间内有新的用户行为,则进入循环继续sleep
            int afterCount = 0;
            while (afterCount == 0 || afterCount != count.get()){
                //每休眠一次 +1,如果下次循环的值与 +1之后的afterCount相等,说明时间窗口内没有新的行为,则不循环
                afterCount = count.incrementAndGet();
                Thread.sleep(sleepTime);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        try {
            String url = RestTemplateUtil.DD_READING_API_URL;
            JSONObject param = null;
            // 只有添加操作
            if (actionDataMap.containsKey(OperationEnum.INSERT.getValue()) && actionDataMap.size() == 1) {
                url += "/nc/eduUserInsert";
                param = actionDataMap.get(OperationEnum.INSERT.getValue());
                if (param != null) {
                    RestTemplateUtil.postForObject(url, param.toJSONString());
                }
            } else if (actionDataMap.containsKey(OperationEnum.DELETE.getValue()) && actionDataMap.size() == 1) {
                //只有删除操作
                url += "/nc/eduUserDelete";
                param = actionDataMap.get(OperationEnum.DELETE.getValue());
                if (param != null) {
                    RestTemplateUtil.postForObject(url, param.toJSONString());
                }
            } else if (actionDataMap.containsKey(OperationEnum.INSERT.getValue()) && actionDataMap.containsKey(OperationEnum.DELETE.getValue())) {
                //既有添加又有删除,就是更新处理
                url += "/nc/eduUserUpdate";
                param = actionDataMap.get(OperationEnum.INSERT.getValue());
                if (param != null) {
                    RestTemplateUtil.postForObject(url, param.toJSONString());
                }
            }
        } finally {
            //最后从全局变量中删除userId
            DispatchController.closeRoom(userId);
        }
    }
    /**
     * 计数器 + 1
     */
    public Integer getIncrementAndGet() {
        return count.incrementAndGet();
    }
}

控制器:

@RestController
@RequestMapping(value = "/api/dd")
public class DispatchController {
    //全局map,记录当前有多少个用户正在被处理中
    private final static Map<String, ActionRoomBean> allMap = new ConcurrentHashMap<>();
    //简单的配置的密钥,用于接口的身份校验
    @Value("${url.secret}")
    private String secret;

    /**
     * insert和 delete 操作都会进入这个接口,用 operation 区分当前是什么操作
     */
    @PostMapping(value = "/dispatch")
    public Result dispatch(@RequestBody JSONObject jsonObject,
                           @RequestHeader(value = "secret") String secret){
        //进行简单的接口身份校验
        if(!Objects.equals(secret, this.secret)){
            return Result.generateError("secret eroor");
        }
        String userId = jsonObject.getString("userId");
        //operation = insert 或者 delete
        String operation = jsonObject.getString("operation");
        if(EmptyUtil.isNotEmpty(userId) && EmptyUtil.isNotEmpty(operation)){
            //调用进入房间的方法
            unboltRoom(userId, operation, jsonObject);
        }
        return Result.generateSuccess();
    }

    /**
     * 为每个userId创建一个实例(房间)
     * 这里决定了是创建一个新的房间还是进入到已有的房间中
     */
    private void unboltRoom(String userId, String operation, JSONObject jsonObject) {
        //加锁处理,由于真正的执行是在子线程中 所以加锁对整体性能影响也不是很大
        //主要是避免:同一个userId创建了多个实例,即使map中key不可重复,也会造成请求丢失
        //例如:同一个userId进来insert和delete请求各一个,并发不加锁的情况下就有可能创建了两个实例
        synchronized (this) {
            ActionRoomBean room = allMap.get(userId);
            //如果全局map中没有,说明是这个userId是第一个进来
            if (room == null) {
                Map<String, JSONObject> actionMap = new HashMap<>(4);
                actionMap.put(operation, jsonObject);
                room = new ActionRoomBean(userId, actionMap);
                //开启计时
                room.startManager();
                //放入到全局map中
                allMap.put(userId, room);
            }
            //如果有,直接调用 addAction方法
            room.addAction(operation, jsonObject);
        }
    }
    /**
     * 当前批次处理完之后,从集合中删除用户实例
     */
    public static void closeRoom(String userId) {
        allMap.remove(userId);
    }
}

整体核心代码就是上边这些,以上还可以通过线程池去优化一下。

如果涉及到批量导入,同时有大量用户同步数据过来,就需要在测试环境进行反复测试 看是否会丢数据(因为每个用户都是一个独立的子线程),对线程的数量进行优化。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何对第三方相同请求进行筛选过滤 的相关文章

随机推荐

  • Fabric2.0 使用开发模式(dev 模式)测试

    dev模式 dev模式不是Fabric 2 0引入的 xff0c 1 x版本就已存在的 对于Fabric 2 0来说 xff0c 可以构建外部的链码容器 xff0c 对于运维和开发调试来说 xff0c 变得越来越方便 在2 0版本之前 xf
  • C语言获取文件行数

    int tmain int argc TCHAR argv FILE fp int flag 61 0 file row 61 0 count 61 0 if fp 61 fopen 34 C Users zzl Desktop lmcli
  • Hyperledger Fabric排序服务实现

    排序算法 solo kafka raft Raft 推荐 作为 v1 4 1 的新特性 xff0c Raft 是一种基于 etcd 中 Raft 协议实现的崩溃容错 xff08 Crash Fault Tolerant xff0c CFT
  • 以太坊学习一:密码学

    密码学作为区块链最基础的的技术之一 xff0c 这些知识既包括对信息的转换 加解密 xff0c 以及校验过程 xff0c 也包括以太坊地址和交易Hash xff0c 交易信息RLP编码 基于椭圆曲线公私钥签名 区块Merkle树交易 Has
  • VMware安装Debian完成后启动黑屏仅有一个光标

    问题 xff1a vmware安装Debian完成 xff0c 启动时出现黑屏现象 xff0c 仅有一个光标 问题原因 xff1a 安装步骤有误 解决方案 重新安装镜像 xff0c 安装过程中记得 将GRUB 启动引导器安装至您的主驱动器
  • mybatis resultType为map 字段为null不返回

    框架 springboot框架 xff0c 分为两种情况 xff1a 一种情况为部分字段为null xff0c 一种情况是全部字段均为null 部分字段为null 返回的数据格式形如 这种情况下 xff0c 只会返回 post code p
  • mysql在update语句中使用分页查询limit [offset,] rows

    在update语句中 limit 前几条是没问题的 xff0c 形如下面的写法 span class token keyword update span temp dj purchase span class token keyword s
  • 认识常见中间件-redis(一)

    Redis 是一种基于内存的数据库 xff0c 对数据的读写操作都是在内存中完成 xff0c 因此读写速度非常快 xff0c 常用于缓存 xff0c 消息队列 分布式锁等场景 Redis 提供了多种数据类型来支持不同的业务场景 xff0c
  • 线程池源码分析

    ThreadPoolExecutor的参数解释 public class ThreadPoolExecutor extends AbstractExecutorService public ThreadPoolExecutor int co
  • Ubuntu 18.04开机报错无法启动

    在虚拟机中启动Ubuntu时 xff0c 显示类似如下界面 原因 xff1a 硬盘空间不足 xff0c 所以无法启动系统了 解决方案 xff1a 1 启动系统 xff0c 在该界面单击按键shift xff08 如果是虚拟机 xff0c 要
  • win10+Xming+VSCode接远程服务器使用图形化界面(GUI)

    Xming安装 官网下载 Download下载安装下载完毕 xff0c 点开安装包 xff0c 直接按默认设置一路点击next完成安装 进入Xming的安装文件夹 xff0c 默认是 C Program Files x86 Xming xf
  • Python 判断文件是否存在,存在则删除

    span class token comment filepath为文件路径 span span class token keyword import span os span class token comment 判断文件是否存在 sp
  • arm下QT环境搭建

    第一次接触QT xff0c 发现每个人搭建环境问题都不一样 xff0c 我把我的问题和步骤写下 xff0c 以供参考 xff01 1 选择环境 xff0c QT需要安装Xwindows环境的操作系统 xff0c 开始我使用操作系统是没有图形
  • 计算机网络-聊天室的设计与实现

    计算机网络实践 一 实践设计的目的和意义二 实践设计的内容和要求三 设计用设备仪器四 实践设计的相关技术五 项目设计与实践1 设计思路2 模块描述3 运行结果 六 结束语源码与详细过程 一 实践设计的目的和意义 在互联网如此发达的今天 xf
  • python-下载某短视频平台视频(高清无水印)

    python 下载某短视频平台音视频 xff08 高清无水印 xff09 前言1 获取视频 url2 发送请求3 数据解析4 本地保存5 完整代码 前言 1 Cookie中文名称为小型文本文件 xff0c 指某些网站为了辨别用户身份而储存在
  • Java中的Reflection(反射)、暴力反射

    文章目录 1 反射 Reflection 的概念1 1 反射的出现背景1 2 反射概述1 3 Java反射机制研究及应用1 4 反射相关的主要API1 5 反射的优缺点 2 Class类并获取Class实例2 1 理解Class2 1 1
  • JVM(类的加载与ClassLoader、双亲委派机制)

    文章目录 1 类的生命周期2 类的加载过程3 类加载器 xff08 classloader 3 1 类加载器的作用3 2 类加载器的分类 JDK8 3 3 双亲委派机制3 3 1 双亲委派机制优势 3 4 查看某个类的类加载器对象3 5 使
  • Java中的反射(通过反射获取类的结构、invoke方法、获取注解)

    文章目录 1 创建运行时类的对象2 获取运行时类的完整结构2 1 相关API2 2 获取所有的属性及相关细节2 3 获取所有的方法及相关细节2 4 获取其他结构 构造器 父类 接口 包 注解等 2 5 获取泛型父类信息2 6 获取内部类或外
  • JDK的版本迭代(JDK9 - JDK20)

    文章目录 1 发布特点2 名词解释Oracle JDK和Open JDKJEPLTS 3 各版本支持时间路线图4 各版本介绍jdk 9jdk 10jdk 11jdk 12jdk 13jdk 14jdk 15jdk 16jdk 17jdk 1
  • 如何对第三方相同请求进行筛选过滤

    文章目录 问题背景处理思路注意事项代码实现 问题背景 公司内部多个系统共用一套用户体系库 xff0c 对外 钉钉 我们是两个客户身份 这里是根据系统来的 xff0c 例如当第三方服务向我们发起用户同步请求 xff1a 是一个更新用户操作 x