Java请求合并与分而治之

2023-11-07

在系统设计的时候,你是否也遇到过这两个问题:

1、大量请求造成数据库压力过大

2、大量数据库查询造成请求执行时间过长

本文将介绍在高并发、大数据环境下,以上两种问题的应对思路。

一、请求合并

首先思考一个场景,在高并发的系统中,在每秒内有大量的请求要访问数据库,如果不考虑缓存,怎么才能够处理降低数据库压力。有的同学可能会说这多简单啊,增加带宽,加内存提升服务器性能。

如果不用这些方法呢?那么就可以用到请求合并的方法,将一段时间内的请求进行合并,然后统一提交查询数据库,能够做到将几十个甚至上百个查询进行批量处理。

当然,这么做也有一个前提,就是这些请求对实时性的要求不能太高。在这个条件下,牺牲一定的处理时间,来减少网络连接数,这么处理是一种性价比非常高的方法。

首先我们模拟一个场景,在不进行合并请求的情况下进行1000次请求,使用Postman进行请求测试,并使用Druid连接池进行数据库的监控:

可以看出,实际进行了1000次数据库的访问。在超高流量的情况下,这种访问方式是十分危险的,所以减少数据库的访问就成了当务之急。

再看看之前提到的请求合并,实现起来有这么几个问题需要解决:

1、以什么粒度作为合并请求的规则:

这里推荐按照时间粒度去合并请求,不推荐按照请求数量达到一定值再进行合并是因为有可能一段时间内请求数量比较少,达不到阈值则无法执行,造成早到达的请求等待非常长的时间。

Java中的ScheduledExecutorService提供了定时调度机制,且本身实现了ExecutorService接口,所以本身也支持线程池的所有功能。

2、如何存放一段时间的请求:

存放请求的方式就比较多了,我们知道,在高并发系统的设计中,消息队列被普遍应用于解耦,使用消息队列存放请求是非常合适的做法。由于我们这里是单机环境,能够保证线程安全的阻塞队列LinkedBlockingQueue就能简单实现我们的需求。

3、如何将请求的结果返回给请求

自从JAVA 1.5以后引入了Future接口,用于处理异步调用和并发事务。Future表示一个可能还没有完成的异步任务的结果,针对这个结果可以添加Callback以便在任务执行成功或失败后作出相应的操作。

简单的说,我们可以用它来接收线程的执行结果。

好了,请求的合并、执行、返回三大步骤都梳理清楚了,让我们看看具体怎么实现。

@Service
public class BatchQueryService {
    //队列用来存放请求
    private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>();

    @Autowired
    ItemService queryItemService;

    //封装请求
    class Request {
        String code;
        CompletableFuture<Map<String, Object>> future;

        public String getCode() {
            return code;
        }

        public void setCode(String code) {
            this.code = code;
        }

        public CompletableFuture<Map<String, Object>> getFuture() {
            return future;
        }

        public void setFuture(CompletableFuture<Map<String, Object>> future) {
            this.future = future;
        }
    }

    @PostConstruct
    public void init() {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        scheduledExecutorService.scheduleAtFixedRate(() -> {

            int size = queue.size();
            if (size == 0)
                return;

            List<Request> requests = new ArrayList<>(size);
            for (int i = 0; i < size; i++) {
                Request request = queue.poll();
                requests.add(request);
            }
            System.out.println("批量处理了" + size + "条请求");

            List<String> codes = new ArrayList<>();
            for (Request request : requests) {
                codes.add(request.getCode());
            }

            List<Map<String, Object>> responses = queryItemService.queryByCodes(codes);

            //结果集完成--> 把请求分发给每一个具体的Request
            Map<String, Map<String, Object>> responseMap = new HashMap<>();
            for (Map<String, Object> response : responses) {
                String code = response.get("code").toString();
                responseMap.put(code, response);
            }

            //返回请求           
            for (Request request : requests) {
                Map<String, Object> result = responseMap.get(request.getCode());
                request.future.complete(result);
            }

        }, 0, 200, TimeUnit.MILLISECONDS);
    }

    //根据code进行单个查询
    public Map<String, Object> queryItem(String code) {
        Request request = new Request();
        request.setCode(code);

        CompletableFuture<Map<String, Object>> future = new CompletableFuture<>();
        request.setFuture(future);

        queue.add(request);

        try {
            return future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }
}

使用1000个线程对请求合并方法进行测试:

 @ResponseBody
    @RequestMapping("/batchQuery")
    public String batchQuery(){
        Thread thread[]=new Thread[1000];
        for (int i = 0; i <1000 ; i++) {
            int j=i;
            thread[i]=new Thread(new Runnable() {
                @Override
                public void run() {
                    queryService.queryItem(j + "");
                }
            });
            thread[i].start();

            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return "ok";
    }

看一下控制台输出结果:

Druid监控:

原本1000次的数据库操作被我们减少到了7次,实际对数据库的访问降到了之前的0.7%。当然,实际业务环境中可能定时调度的时间间隔不会增加到200ms这么大,这里只是为了演示一下请求合并能够发挥的巨大潜能。

最后对请求合并进行一下总结:

优点显而易见,通过请求合并减少了数据库的网络连接,降低数据库压力。最大化的利用的系统的IO,来提升系统的吞吐性能。

当然它也存在一定的局限性,仅能够用于对请求实时性要求不高的高并发系统,如果系统的应用场景不是在高并发场景下,那么根本没有使用请求合并的必要。

二、分而治之

其实在学数据结构和算法的时候,大家应该都接触过分而治之的思想,其实说白了就是递归调用本函数的一个过程,在这个过程中,不断把任务变小,简化计算的流程。这种思想,在进行系统架构的时候同样适用。如果一个请求要访问大量的数据,那么我们就可以将这个任务拆分分别执行,最终再将执行结果返回给客户端。

这里就要引入JDK 1.7后提供的一个多线程执行框架Fork/Join,它能够把一个大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果。

ForkJoin框架为我们提供了RecursiveAction和RecursiveTask来创建ForkJoin的任务,简单来说:

Recursiveaction  用于创建没有返回值的任务

RecursiveTask  用于创建有返回值的任务

举个例子,还是用上一小节中我们的数据,现在数据库中存储了id从0到9999的一万件商品,我们要对其总值进行求和(别问为什么不直接用sum()函数,举个例子而已)。

 @ResponseBody
    @RequestMapping("/single")
    public int single() {
        long startTime = System.currentTimeMillis();
        int sum = 0;
        for (int i = 0; i < 1000; i++) {
            sum += itemService.queryByCode(i + "").getPrice();
        }
        System.out.println(sum);
        long endTime = System.currentTimeMillis();
        System.out.println("程序运行时间:" + (endTime - startTime) + "ms");
        return sum;
    }

看一下程序运行时间,5235毫秒:

使用ForkJoin对任务进行划分:

public class ForkJoinTask  extends RecursiveTask<Integer> {
    private int arr[];
    private int start;
    private int end;

    private static final int MAX = 500;

    public ForkJoinTask(int[] arr, int start, int end){
        this.arr=arr;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum=0;
        if((end - start) < MAX) {
            //直接做业务工作
            for (int i = start; i < end; i++) {
                sum += arr[i];
            }
            return sum;
        }   else{
            //继续拆分
            int middle = (start + end) / 2;
            ForkJoinTask left=new ForkJoinTask(arr, start, middle);
            ForkJoinTask right=new ForkJoinTask(arr, middle, end);
            left.fork();
            right.fork();
            return left.join() + right.join();
        }
    }
}

再运行测试:

@ResponseBody
    @RequestMapping("/fork")
    public int forkJoin() {
        long startTime = System.currentTimeMillis();
        int arr[] = new int[10000];
        for (int i = 0; i < 1000; i++) {
            arr[i]=i;
        }

        ForkJoinPool pool=new ForkJoinPool();
        ForkJoinTask task=new ForkJoinTask(arr,0,arr.length);
        Integer sum =  pool.invoke(task);
        System.out.println(sum);

        long endTime = System.currentTimeMillis();
        System.out.println("程序运行时间:" + (endTime - startTime) + "ms");
        return sum;
    }

再看一下程序运行时间,只有6毫秒:

是不是觉得快了很多,直接将运行速度提升了800多倍!其实ForkJoin运行速度快的原因还有一个黑科技,那就是当一个线程在完成自己的任务队列的处理任务后,会帮助其他线程完成任务,完成后再放回其他队列,这也被称为工作窃取。

如上图所示,线程1在完成自己的任务后,发现线程2还有任务没有完成,这时它会去取到线程2没有完成的任务,做完后再把结果放回线程2。

除此之外,我们还可以通过增加线程数量进一步加快运行速度,线程数量的选择可以根据具体业务环境进行配置优化。

ForkJoinPool pool=new ForkJoinPool(Runtime.getRuntime().availableProcessors()*4);

总结:

本文分别从请求合并和分而治之两种角度介绍了系统的优化,可以看出,在平常的工作中,代码优化这一条路还有很长要走。文中所有的代码大家可以从我的github获取。

https://github.com/trunks2008/RequestMerge

如果您喜欢这篇文章,欢迎关注订阅我的微信公众号 “码农参上”,不定时发布最新文章

公众号后台回复“面试”获取海量大厂面试资料

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java请求合并与分而治之 的相关文章

随机推荐

  • FPGA UltraScale+ 利用ICAP原语实现Multiboot功能

    例程参考 https blog csdn net xiaomingzi55 article details 124365631 1 这个贴子说的很清楚 唯一一点就是它是ICAP2 这样写是没问题的 1 对于BPI模式来说 可以通过RS 1
  • python Excel处理

    Python 前言 我是一个菜鸡 写文章是为了记录自己成长的过程 如有错误 请各位大佬轻喷 欢迎大家指正 基础知识不做赘述 下文介绍Excel办公自动化 openpyxl导入 安装步骤 win R 输入cmd 然后打出pip install
  • 单片机上云流程(使用STM32和esp8266构建物联网体系)

    1 首先是给esp8266烧录MQTT固件 咳咳 首先当然是准备一个esp8266 01s和对应的烧录器啊qwq 选择好要烧录进去的固件 没有的话去安信可官网下载 然后配置好对应的选项 点击ERASE先进行擦除 再点击START进行烧录 这
  • Makefile中使用Shell

    http blog csdn net zdl1016 article details 6448989 http blog csdn net absurd article details 636418 Makefile与Shell的问题 大概
  • FPGA程序上板调试问题

    FPGA程序上板调试问题 1 memory block 找不到coe文件 原因1 coe文件格式有问题 memory initialization radix 是数值格式 memory initialization vector 是初始化的
  • Neuronal Dynamics:第五章笔记

    Neuronal Dynamics 第五章笔记
  • 一文读懂C++的if与else判断语句

    说个明白 先从最简单的说起 if语句 if语句是C 中最简单的判断语句 if S 语句1 语句2 如果S为真 非零 则执行语句1 否则执行语句2 如果语句1就是单个语句 则可以这么写 if S 语句1 这里的分号不能少 或者这么写 if S
  • DevExpress v15.2.4帮助文档下载(全)

    原文地址 http www devexpresscn com Resources Documentation 498 html DevExpress v15 2帮助文档下载列表大全来啦 包含 net系列所有帮助文档 提供CHM和PDF两个版
  • 一个产品的风险预测怎么写_创业计划书中,项目风险评估怎么写?

    展开全部 首先需要了解在实施过程中我们可能碰到哪些风险 按照一般意义 我们常常所说的风险分为两大e69da5e6ba903231313335323631343130323136353331333436316264类 一种是不可预知的 一种是
  • STM32CubeMX定时器输出比较模式——输出相位可调矩形波

    1 介绍 STM32的定时器通道输出矩形波 可以使用PWM模式和输出比较模式 PWM模式能够产生频率和占空比可调的矩形波信号 但不能对信号的相位进行调节 使用输出比较模式 可以实现信号的相位调节和频率调节 但不能对信号的占空比进行调节 输出
  • 100天精通Python(基础篇)——第30天:数据容器-list列表索引

    从前往后索引 name list 0 name list 123 666 print name list 0 print name list 1 print type name list 从后向前索引 name list 1 name li
  • 【数据结构与算法】1.树、二叉树、字典树、红黑树

    文章目录 简介 1 树 Tree 2 二叉树 Binary Tree 2 1 二叉树数据结构 2 2 二叉树的三种遍历方式 3 二叉查找树 Binary Search Tree 3 1 二叉查找树的概念和定义 3 2 二分查找算法 4 字典
  • 手动安装Python第三库vtk库

    我们在使用py进行可视化操作时大概率会用到vtk库 一般方法是通过pip 安装 但是这玩意得看人品 本人人品十分不好 哈哈哈哈 所以我们就要自己下载轮子手动安装 附上第三方库地址 https www lfd uci edu gohlke p
  • 真实!大概五分之一的年轻人存款在一万元以内。

    近日 有调查称 大概五分之一的年轻人存款在一万元以内 10万元存款是一个 坎 存款超过10万就会超过53 7 的人 年轻人 存款 两个词碰撞在一起 引来了广泛的关注和讨论 你认为年轻人存款难吗 可以从以下几个角度发表你的看法 角度一 你的目
  • 计算机应用问题,计算机应用的现状与发展的问题

    计算机应用的现状与发展的问题 来源 职称阁时间 2018 07 27 11 59热度 这篇论文主要介绍的是计算机应用的现状与发展的问题的相关内容 本文作者就是通过对计算机的应用现状等内容做出详细的阐述与介绍 特推荐这篇优秀的论文供相关人士参
  • 【C++设计模式】依赖倒转原则

    2023年8月30日 周三上午 目录 概述 含义 举个简单的例子 传统做法 使用依赖倒转原则 代码说明 再举一个具体的例子 以生活为例 概述 依赖倒转原则 Dependency Inversion Principle DIP 是面向对象设计
  • Hadoop 端口

    1 系统 8080 80 用于tomcat和apache的端口 22 ssh的端口 2 Web UI 用于访问和监控Hadoop系统运行状态 Daemon 缺省端口 配置参数 HDFS Namenode 50070 dfs http add
  • Shiro源码分析-----认证流程/授权流程----------Subject

    本文转载自 认证流程和授权流程 源码分析的第二篇以Subject的初始化为题 一 回顾Apache Shiro创建Subject的步骤如下 1 获取SecurityManager工厂 此处使用Ini配置文件初始化SecurityManage
  • csgo删除服务器地图在哪个文件夹,CSGO de_cache地图

    资源说明 CSGO de cache地图 由 Volcano 制作 需要的玩家不要错过哦 资源详情 Sal Volcano Garozzo发布了其最新制作的CS GO地图de cache Volcano制作的首个地图de nuke ve 相
  • Java请求合并与分而治之

    在系统设计的时候 你是否也遇到过这两个问题 1 大量请求造成数据库压力过大 2 大量数据库查询造成请求执行时间过长 本文将介绍在高并发 大数据环境下 以上两种问题的应对思路 一 请求合并 首先思考一个场景 在高并发的系统中 在每秒内有大量的