接口请求合并的3种技巧,性能直接爆表!

2023-11-16

将相似或重复请求在上游系统中合并后发往下游系统,可以大大降低下游系统的负载,提升系统整体吞吐率。文章介绍了 hystrix collapserConcurrentHashMultiset、自实现BatchCollapser 三种请求合并技术,并通过其具体实现对比各自适用的场景。

前言

工作中,我们常见的请求模型都是”请求-应答”式,即一次请求中,服务给请求分配一个独立的线程,一块独立的内存空间,所有的操作都是独立的,包括资源和系统运算。我们也知道,在请求中处理一次系统 I/O 的消耗是非常大的,如果有非常多的请求都进行同一类 I/O 操作,那么是否可以将这些 I/O 操作都合并到一起,进行一次 I/O 操作,是否可以大大降低下游资源服务器的负担呢?

最近我工作之余的大部分时间都花在这个问题的探究上了,对比了几个现有类库,为了解决一个小问题把 hystrix javanica 的代码翻了一遍,也根据自己工作中遇到的业务需求实现了一个简单的合并类,收获还是挺大的。可能这个需求有点”偏门”,在网上搜索结果并不多,也没有综合一点的资料,索性自己总结分享一下,希望能帮到后来遇到这种问题的小伙伴。更多内容,公众 号Java精选,回复java面试,获取面试资料,支持在线刷题。

Hystrix Collapser

hystrix

开源的请求合并类库(知名的)好像也只有 Netflix 公司开源的 Hystrix 了, hystrix 专注于保持 WEB 服务器在高并发环境下的系统稳定,我们常用它的熔断器(Circuit Breaker) 来实现服务的服务隔离和灾时降级,有了它,可以使整个系统不至于被某一个接口的高并发洪流冲塌,即使接口挂了也可以将服务降级,返回一个人性化的响应。请求合并作为一个保障下游服务稳定的利器,在 hystrix 内实现也并不意外。

我们在使用 hystrix 时,常用它的 javanica 模块,以注解的方式编写 hystrix 代码,使代码更简洁而且对业务代码侵入更低。所以在项目中我们一般至少需要引用 hystrix-corehystrix-javanica 两个包。

另外,hystrix 的实现都是通过 AOP,我们要还要在项目 xml 里显式配置 HystrixAspect 的 bean 来启用它。

<aop:aspectj-autoproxy/>  
<bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect" />
collapser

hystrix collapser 是 hystrix 内的请求合并器,它有自定义 BatchMethod 和 注解两种实现方式,自定义 BatchMethod 网上有各种教程,实现起来很复杂,需要手写大量代码,而注解方式只需要添加两行注解即可,但配置方式我在官方文档上也没找见,中文方面本文应该是独一份儿了。

其实现需要注意的是:

  • 我们在需要合并的方法上添加 @HystrixCollapser 注解,在定义好的合并方法上添加 @HystrixCommand 注解;

  • single 方法只能传入一个参数,多参数情况下需要自己包装一个参数类,而 batch 方法需要 java.util.List<SingleParam>

  • single 方法返回 java.util.concurrent.Future<SingleReturn>, batch 方法返回 java.util.List<SingleReturn>,且要保证返回的结果数量和传入的参数数量一致。

下面是一个简单的示例:

public class HystrixCollapserSample {  
  
    @HystrixCollapser(batchMethod = "batch")  
    public Future<Boolean> single(String input) {  
        return null; // single方法不会被执行到  
    }  
  
    public List<Boolean> batch(List<String> inputs) {  
        return inputs.stream().map(it -> Boolean.TRUE).collect(Collectors.toList());  
    }  
}
源码实现

为了解决 hystrix collapser 的配置问题看了下 hystrix javanica 的源码,这里简单总结一下 hystrix 请求合并器的具体实现,源码的详细解析在我的笔记:Hystrix collasper 源码解析。

  • 在 spring-boot 内注册切面类的 bean,里面包含 @HystrixCollapser 注解切面;

  • 在方法执行时检测到方法被 HystrixCollapser 注解后,spring 调用 methodsAnnotatedWithHystrixCommand方法来执行 hystrix 代理;

  • hystrix 获取一个 collapser 实例(在当前 scope 内检测不到即创建);面试宝典:https://www.yoodb.com

  • hystrix 将当前请求的参数提交给 collapser, 由 collapser 存储在一个 concurrentHashMap (RequestArgumentType -> CollapsedRequest)内,此方法会创建一个 Observable 对象,并返回一个 观察此对象的 Future 给业务线程;

  • collpser 在创建时会创建一个 timer 线程,定时消费存储的请求,timer 会将多个请求构造成一个合并后的请求,调用 batch 执行后将结果顺序映射到输出参数,并通知 Future 任务已完成。

需要注意,由于需要等待 timer 执行真正的请求操作,collapser 会导致所有的请求的 cost 都会增加约 timerInterval/2 ms;

配置

hystrix collapser 的配置需要在 @HystrixCollapser 注解上使用,主要包括两个部分,专有配置和 hystrixCommand 通用配置;

专有配置包括:

  • collapserKey,这个可以不用配置,hystrix 会默认使用当前方法名;

  • batchMethod,配置 batch 方法名,我们一般会将 single 方法和 batch 方法定义在同一个类内,直接填方法名即可;

  • scope,最坑的配置项,也是逼我读源码的元凶,com.netflix.hystrix.HystrixCollapser.Scope 枚举类,有 REQUEST, GLOBAL 两种选项,在 scope 为 REQUEST 时,hystrix 会为每个请求都创建一个 collapser, 此时你会发现 batch 方法执行时,传入的请求数总为1。而且 REQUEST 项还是默认项,不明白这样请求合并还有什么意义;

  • collapserProperties, 在此选项内我们可以配置 hystrixCommand 的通用配置;

通用配置包括:

  • maxRequestsInBatch, 构造批量请求时,使用的单个请求的最大数量;

  • timerDelayInMilliseconds, 此选项配置 collapser 的 timer 线程多久会合并一次请求;

  • requestCache.enabled, 配置提交请求时是否缓存;java新特性:https://www.yoodb.com/java/characteristic/java-8/Java8-optional.html

一个完整的配置如下:

@HystrixCollapser(  
            batchMethod = "batch",  
            collapserKey = "single",  
            scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,  
            collapserProperties = {  
                    @HystrixProperty(name = "maxRequestsInBatch", value = "100"),  
                    @HystrixProperty(name = "timerDelayInMilliseconds", value = "1000"),  
                    @HystrixProperty(name = "requestCache.enabled", value = "true")  
            })

BatchCollapser

设计

由于业务需求,我们并不太关心被合并请求的返回值,而且觉得 hystrix 保持那么多的 Future 并没有必要,于是自己实现了一个简单的请求合并器,业务线程简单地将请求放到一个容器里,请求数累积到一定量或延迟了一定的时间,就取出容器内的数据统一发送给下游系统。

设计思想跟 hystrix 类似,合并器有一个字段作为存储请求的容器,且设置一个 timer 线程定时消费容器内的请求,业务线程将请求参数提交到合并 器的容器内。不同之处在于,业务线程将请求提交给容器后立即同步返回成功,不必管请求的消费结果,这样便实现了时间维度上的合并触发。

另外,我还添加了另外一个维度的触发条件,每次将请求参数添加到容器后都会检验一下容器内请求的数量,如果数量达到一定的阈值,将在业务线程内合并执行一次。

由于有两个维度会触发合并,就不可避免会遇到线程安全问题。为了保证容器内的请求不会被多个线程重复消费或都漏掉,我需要一个容器能满足以下条件:

  • 是一种 Collection,类似于 ArrayList 或 Queue,可以存重复元素且有顺序;

  • 在多线程环境中能安全地将里面的数据全取出来进行消费,而不用自己实现锁。

java.util.concurrent 包内的 LinkedBlockingDeque 刚好符合要求,首先它实现了 BlockingDeque 接口,多线程环境下的存取操作是安全的;此外,它还提供 drainTo(Collection<? super E> c, int maxElements)方法,可以将容器内 maxElements 个元素安全地取出来,放到 Collection c 中。

实现

以下是具体的代码实现:

public class BatchCollapser<E> implements InitializingBean {  
     private static final Logger logger = LoggerFactory.getLogger(BatchCollapser.class);  
     private static volatile Map<Class, BatchCollapser> instance = Maps.newConcurrentMap();  
     private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);  
  
     private volatile LinkedBlockingDeque<E> batchContainer = new LinkedBlockingDeque<>();  
     private Handler<List<E>, Boolean> cleaner;  
     private long interval;  
     private int threshHold;  
  
     private BatchCollapser(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {  
         this.cleaner = cleaner;  
         this.threshHold = threshHold;  
         this.interval = interval;  
     }  
  
     @Override  
     public void afterPropertiesSet() throws Exception {  
         SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {  
             try {  
                 this.clean();  
             } catch (Exception e) {  
                 logger.error("clean container exception", e);  
             }  
         }, 0, interval, TimeUnit.MILLISECONDS);  
     }  
  
     public void submit(E event) {  
         batchContainer.add(event);  
         if (batchContainer.size() >= threshHold) {  
             clean();  
         }  
     }  
  
     private void clean() {  
         List<E> transferList = Lists.newArrayListWithExpectedSize(threshHold);  
         batchContainer.drainTo(transferList, 100);  
         if (CollectionUtils.isEmpty(transferList)) {  
             return;  
         }  
  
         try {  
             cleaner.handle(transferList);  
         } catch (Exception e) {  
             logger.error("batch execute error, transferList:{}", transferList, e);  
         }  
     }  
  
     public static <E> BatchCollapser getInstance(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {  
         Class jobClass = cleaner.getClass();  
         if (instance.get(jobClass) == null) {  
             synchronized (BatchCollapser.class) {  
                 if (instance.get(jobClass) == null) {  
                     instance.put(jobClass, new BatchCollapser<>(cleaner, threshHold, interval));  
                 }  
             }  
         }  
  
         return instance.get(jobClass);  
     }  
 }

以下代码内需要注意的点:

  • 由于合并器的全局性需求,需要将合并器实现为一个单例,另外为了提升它的通用性,内部使用使用 concurrentHashMap 和 double check 实现了一个简单的单例工厂。

  • 为了区分不同用途的合并器,工厂需要传入一个实现了 Handler 的实例,通过实例的 class 来对请求进行分组存储。

  • 由于 java.util.Timer 的阻塞特性,一个 Timer 线程在阻塞时不会启动另一个同样的 Timer 线程,所以使用 ScheduledExecutorService 定时启动 Timer 线程。

ConcurrentHashMultiset

设计

上面介绍的请求合并都是将多个请求一次发送,下游服务器处理时本质上还是多个请求,最好的请求合并是在内存中进行,将请求结果简单合并成一个发送给下游服务器。如我们经常会遇到的需求:元素分值累加或数据统计,就可以先在内存中将某一项的分值或数据累加起来,定时请求数据库保存。

Guava 内就提供了这么一种数据结构:ConcurrentHashMultiset,它不同于普通的 set 结构存储相同元素时直接覆盖原有元素,而是给每个元素保持一个计数 count, 插入重复时元素的 count 值加1。而且它在添加和删除时并不加锁也能保证线程安全,具体实现是通过一个 while(true) 循环尝试操作,直到操作够所需要的数量。

ConcurrentHashMultiset 这种排重计数的特性,非常适合数据统计这种元素在短时间内重复率很高的场景,经过排重后的数量计算,可以大大降低下游服务器的压力,即使重复率不高,能用少量的内存空间换取系统可用性的提高,也是很划算的。架构设计系列:https://www.yoodb.com/framework/knowledge-hierarchy.html

实现

使用 ConcurrentHashMultiset 进行请求合并与使用普通容器在整体结构上并无太大差异,具体类似于:

if (ConcurrentHashMultiset.isEmpty()) {  
    return;  
}  
  
List<Request> transferList = Lists.newArrayList();  
ConcurrentHashMultiset.elementSet().forEach(request -> {  
    int count = ConcurrentHashMultiset.count(request);  
    if (count <= 0) {  
        return;  
    }  
  
    transferList.add(count == 1 ? request : new Request(request.getIncrement() * count));  
    ConcurrentHashMultiset.remove(request, count);  
});

小结

最后总结一下各个技术适用的场景:

  • hystrix collapser: 需要每个请求的结果,并且不在意每个请求的 cost 会增加;

  • BatchCollapser: 不在意请求的结果,需要请求合并能在时间和数量两个维度上触发;

  • ConcurrentHashMultiset:请求重复率很高的统计类场景;

另外,如果选择自己来实现的话,完全可以将 BatchCollapser 和 ConcurrentHashMultiset 结合一下,在BatchCollapser 里使用 ConcurrentHashMultiset 作为容器,这样就可以结合两者的优势了。

作者:枕边书

https://zhenbianshu.github.io/

公众号“Java精选”所发表内容注明来源的,版权归原出处所有(无法查证版权的或者未注明出处的均来自网络,系转载,转载的目的在于传递更多信息,版权属于原作者。如有侵权,请联系,笔者会第一时间删除处理!
最近有很多人问,有没有读者交流群!加入方式很简单,公众号Java精选,回复“加群”,即可入群!

Java精选面试题(微信小程序):3000+道面试题,包含Java基础、并发、JVM、线程、MQ系列、Redis、Spring系列、Elasticsearch、Docker、K8s、Flink、Spark、架构设计等,在线随时刷题!
------ 特别推荐 ------
特别推荐:专注分享最前沿的技术与资讯,为弯道超车做好准备及各种开源项目与高效率软件的公众号,「大咖笔记」,专注挖掘好东西,非常值得大家关注。点击下方公众号卡片关注。

点击“阅读原文”,了解更多精彩内容!文章有帮助的话,点在看,转发吧!
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

接口请求合并的3种技巧,性能直接爆表! 的相关文章

随机推荐

  • Arthas开源一周年,Github Star 16K,我们一直在坚持什么?

    缘起 最近看到一个很流行的标题 开源XX年 star XXX 我是如何坚持的 看到这样的标题 忽然发觉Arthas从2018年9月开源以来 刚好一年了 正好在这个秋高气爽的时节做下总结和回顾 Arthas是Alibaba开源的Java诊断工
  • UKN服务器找不到,在windows下用ppk后缀文件登陆远程服务器

    最近要部署一个项目到服务器上 对方给我生成了一个以ppk为后缀名的密钥 让我直接登陆 这里记录一下过程 用putty通过ssh登陆服务器 下载putty 貌似官网上不了 我是在这里下载的 下载putty exe 不用安装 连接服务器 打开p
  • 安装深度(Deepin)系统

    Deepin系统安装 Deepin是和Ubuntu一样 是一个基于Debian的Linux的发型版本 Deepin相对于Ubuntu Deepin更适合中国用户的使用习惯 一 官网工具制作启动盘 制作启动盘 和安装系统 操作非常简单 nic
  • 计算机文化基础成绩,计算机文化基础成绩评定办法

    计算机文化基础 成绩评定办法 本次 计算机文化基础 的最终总评成绩由平时成绩和期末成绩两部分组成 其中平时成绩满分100分 其中考勤占20分 平时作业占80分 占总评的40 期末成绩满分100分 占总评的60 包括学生的Office软件操作
  • 超棒的JS/CSS动画效果网站——持续搜集

    Animate css 这里有超多的纯CSS小动画 代码清晰 作为学习的代码也很不错 地址 https daneden github io animate css CodePen 这里展示了超多优秀 特别 富有创意的前端效果 简直就是一个宝
  • Qt缺少Mysq驱动QMYSQL driver not loaded

    如果Qt在指定Mysql驱动时 报了这样的错说明缺少mysql相关的动态链接库 QSqlDatabase QMYSQL driver not loaded QSqlDatabase available drivers QSQLITE QOD
  • 从0开始的(c语言)数据结构学习 3:栈

    注 本文以造轮子为主 属于相对理论性 教学性的东西 在实际使用中 如果你是c 请直接 include lt stack gt 理解 什么是栈 你现在有一个放网球的竖球筒 每次你放进去的球都会放在最上面 同理 当你要取出来一个球的时候 也只能
  • 攻防世界misc——misc1

    打开题目 获得字符串 d4e8e1f4a0f7e1f3a0e6e1f3f4a1a0d4e8e5a0e6ece1e7a0e9f3baa0c4c4c3d4c6fbb9e1e6b3e3b9e4b3b7b7e2b6b1e4b2b6b9e2b1b1b
  • Kali无法打开Firefox浏览器

    本文章鉴于我本人的学习过程 起初我是能正常打开Firefox浏览器的 在我提升了root权限后浏览器就打不开了 于是我看了几篇其他人解决的办法 解决办法如下 用chown改变权限 输入 chown R root 目前我所遇到的就是这种情况
  • 计算机网络安全的背景

    虽然传统的计算机发展和当今的电子商务不同 但是不可否认网络已经成 为非常重要的信息和数据互换交换的平台 但是随着网络不断发展渗透到人们的日 常生活 手机终端 交易支付等环节时 网络安全已经成为一个焦点和不可逾越的 发展鸿沟 尽管目前网上支付
  • json 插入数据_使用python将数据存入SQLite3数据库

    作者 JiekeXu 2017年毕业于某本科院校 从事于数据库运维行业 一个热爱Python的DBA 个人公众号 JiekeXu之路 Python从网站上抓取的数据为了可以重复利用 一般都会存储下来 存储方式最简单的会选择存储到文本文件 常
  • 微信中的video属性设置

  • python数据驱动测试设计_Python Selenium 之数据驱动测试的实现

    数据驱动模式的测试好处相比普通模式的测试就显而易见了吧 使用数据驱动的模式 可以根据业务分解测试数据 只需定义变量 使用外部或者自定义的数据使其参数化 从而避免了使用之前测试脚本中固定的数据 可以将测试脚本与测试数据分离 使得测试脚本在不同
  • 【 视频 】NTSC和PAL电视制式

    今天的电视机还沿用着50年代彩色电视发明时的标准 它们就是NTSC 国家电视制式委员会 和PAL 逐行倒相 NTSC多用于美国和日本 二战 PAL多用于欧洲 澳大利亚 中东和亚洲地区 本文将介绍NTSC和PAL的主要概念 这些知识对更现代的
  • 苹果Mac电脑文件夹路径怎么看?“访达”也能显示文件路径

    Windows系统中 我的电脑 或 资源管理器 会显示文件 文件夹路径 Mac在 访达 中却不显示 确实不便 也让人费解 连 三指拖移 都是默认不开启 Mac有这类奇怪逻辑就一点也不奇怪了 是的 这是苹果最不可理喻之处 老子就是要到处嚷嚷
  • 力扣刷题(day0011)两个数组的交集

    给定两个数组 nums1 和 nums2 返回 它们的交集 输出结果中的每个元素一定是 唯一 的 我们可以 不考虑输出结果的顺序 示例 1 输入 nums1 1 2 2 1 nums2 2 2 输出 2 示例 2 输入 nums1 4 9
  • python中的连续比较是什么_Python算法的分治算法,python,之,连续,子,列表,最大,和...

    连续子列表的最大和 在一个列表中找到连续子列表的最大和 列表中的数字可负可正 并且子列表不能为空 问题提出 找到以下列表的最大子列表的和 2 1 3 4 1 2 1 5 4 解题思路 最大子列表有可能在左子列表 右子列表与右子列表之间 我们
  • python小数格式:不用科学计数法、不截断

    对于一个小数 如 0 0000000000001 想得到相应的字符串 而不是转换为 1e 12 可以用 numpy 的 format float positional import numpy as np a 0 0001 b 0 0000
  • anaconda安装所有库代码集总

    1 安装jupyter conda install jupyter
  • 接口请求合并的3种技巧,性能直接爆表!

    将相似或重复请求在上游系统中合并后发往下游系统 可以大大降低下游系统的负载 提升系统整体吞吐率 文章介绍了 hystrix collapser ConcurrentHashMultiset 自实现BatchCollapser 三种请求合并技