令牌桶算法限流

2023-05-16

限流

限流是对某一时间窗口内的请求数进行限制,保持系统的可用性和稳定性,防止因流量暴增而导致的系统运行缓慢或宕机。常用的限流算法有令牌桶和和漏桶,而Google开源项目Guava中的RateLimiter使用的就是令牌桶控制算法。

在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流

  • 缓存:缓存的目的是提升系统访问速度和增大系统处理容量
  • 降级:降级是当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级,以此释放服务器资源以保证核心任务的正常运行
  • 限流:限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理

我们经常在调别人的接口的时候会发现有限制,比如微信公众平台接口、百度API Store、聚合API等等这样的,对方会限制每天最多调多少次或者每分钟最多调多少次

我们自己在开发系统的时候也需要考虑到这些,比如我们公司在上传商品的时候就做了限流,因为用户每一次上传商品,我们需要将商品数据同到到美团、饿了么、京东、百度、自营等第三方平台,这个工作量是巨大,频繁操作会拖慢系统,故做限流。

以上都是题外话,接下来我们重点看一下令牌桶算法

令牌桶算法

下面是从网上找的两张图来描述令牌桶算法:

   

RateLimiter

https://github.com/google/guava

RateLimiter的代码不长,注释加代码432行,看一下RateLimiter怎么用


 1 package com.cjs.example;
 2 
 3 import com.google.common.util.concurrent.RateLimiter;
 4 import org.springframework.web.bind.annotation.RequestMapping;
 5 import org.springframework.web.bind.annotation.RestController;
 6 
 7 import java.text.SimpleDateFormat;
 8 import java.util.Date;
 9 
10 @RestController
11 public class HelloController {
12 
13     private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
14 
15     private static final RateLimiter rateLimiter = RateLimiter.create(2);
16 
17     /**
18      * tryAcquire尝试获取permit,默认超时时间是0,意思是拿不到就立即返回false
19      */
20     @RequestMapping("/sayHello")
21     public String sayHello() {
22         if (rateLimiter.tryAcquire()) { //  一次拿1个
23             System.out.println(sdf.format(new Date()));
24             try {
25                 Thread.sleep(500);
26             } catch (InterruptedException e) {
27                 e.printStackTrace();
28             }
29         }else {
30             System.out.println("limit");
31         }
32         return "hello";
33     }
34 
35     /**
36      * acquire拿不到就等待,拿到为止
37      */
38     @RequestMapping("/sayHi")
39     public String sayHi() {
40         rateLimiter.acquire(5); //  一次拿5个
41         System.out.println(sdf.format(new Date()));
42         return "hi";
43     }
44 
45 }  

关于RateLimiter:

  • A rate limiter。每个acquire()方法如果必要的话会阻塞直到一个permit可用,然后消费它。获得permit以后不需要释放。
  • RateLimiter在并发环境下使用是安全的:它将限制所有线程调用的总速率。注意,它不保证公平调用。
  • RateLimiter在并发环境下使用是安全的:它将限制所有线程调用的总速率。注意,它不保证公平调用。Rate limiter(直译为:速度限制器)经常被用来限制一些物理或者逻辑资源的访问速率。这和java.util.concurrent.Semaphore正好形成对照。
  • 一个RateLimiter主要定义了发放permits的速率。如果没有额外的配置,permits将以固定的速度分配,单位是每秒多少permits。默认情况下,Permits将会被稳定的平缓的发放。
  • 可以配置一个RateLimiter有一个预热期,在此期间permits的发放速度每秒稳步增长直到到达稳定的速率

基本用法:


final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
void submitTasks(List<Runnable> tasks, Executor executor) {
    for (Runnable task : tasks) {
        rateLimiter.acquire(); // may wait
        executor.execute(task);
    }
}  

实现

SmoothBursty以稳定的速度生成permit

SmoothWarmingUp是渐进式的生成,最终达到最大值趋于稳定

源码片段解读:


public abstract class RateLimiter {

    /**
     * 用给定的吞吐量(“permits per second”)创建一个RateLimiter。
     * 通常是QPS
     */
    public static RateLimiter create(double permitsPerSecond) {
        return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
    }
    
    static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
        RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
        rateLimiter.setRate(permitsPerSecond);
        return rateLimiter;
    }
    
    /**
     * 用给定的吞吐量(QPS)和一个预热期创建一个RateLimiter
     */
    public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
        checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
        return create(permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
    }

    static RateLimiter create(
            double permitsPerSecond,
            long warmupPeriod,
            TimeUnit unit,
            double coldFactor,
            SleepingStopwatch stopwatch) {
        RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
        rateLimiter.setRate(permitsPerSecond);
        return rateLimiter;
    }

    private final SleepingStopwatch stopwatch;

    //
    private volatile Object mutexDoNotUseDirectly;

    private Object mutex() {
        Object mutex = mutexDoNotUseDirectly;
        if (mutex == null) {
            synchronized (this) {
                mutex = mutexDoNotUseDirectly;
                if (mutex == null) {
                    mutexDoNotUseDirectly = mutex = new Object();
                }
            }
        }
        return mutex;
    }
    
    /**
     * 从RateLimiter中获取一个permit,阻塞直到请求可以获得为止
     * @return 休眠的时间,单位是秒,如果没有被限制则是0.0
     */
    public double acquire() {
        return acquire(1);
    }
  
    /**
     * 从RateLimiter中获取指定数量的permits,阻塞直到请求可以获得为止
     */
    public double acquire(int permits) {
        long microsToWait = reserve(permits);
        stopwatch.sleepMicrosUninterruptibly(microsToWait);
        return 1.0 * microsToWait / SECONDS.toMicros(1L);
    }
    
    /**
     * 预定给定数量的permits以备将来使用
     * 直到这些预定数量的permits可以被消费则返回逝去的微秒数
     */
    final long reserve(int permits) {
        checkPermits(permits);
        synchronized (mutex()) {
            return reserveAndGetWaitLength(permits, stopwatch.readMicros());
        }
    }
    
    private static void checkPermits(int permits) {
        checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);
    }
    
    final long reserveAndGetWaitLength(int permits, long nowMicros) {
        long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
        return max(momentAvailable - nowMicros, 0);
    }
}

abstract class SmoothRateLimiter extends RateLimiter { /** The currently stored permits. */ double storedPermits; /** The maximum number of stored permits. */ double maxPermits; /** * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits * per second has a stable interval of 200ms. */ double stableIntervalMicros; /** * The time when the next request (no matter its size) will be granted. After granting a request, * this is pushed further in the future. Large requests push this further than small requests. */ private long nextFreeTicketMicros = 0L; // could be either in the past or future final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { resync(nowMicros); long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 本次可以获取到的permit数量 double freshPermits = requiredPermits - storedPermitsToSpend; // 差值,如果存储的permit大于本次需要的permit数量则此处是0,否则是一个正数 long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); // 计算需要等待的时间(微秒) this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); this.storedPermits -= storedPermitsToSpend; // 减去本次消费的permit数 return returnValue; } void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { // 表示当前可以获得permit double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); // 计算这段时间可以生成多少个permit storedPermits = min(maxPermits, storedPermits + newPermits); // 如果超过maxPermit,则取maxPermit,否则取存储的permit+新生成的permit nextFreeTicketMicros = nowMicros; // 设置下一次可以获得permit的时间点为当前时间 } } }

RateLimiter实现的令牌桶算法,不仅可以应对正常流量的限速,而且可以处理突发暴增的请求,实现平滑限流。

通过代码,我们可以看到它可以预消费,怎么讲呢

nextFreeTicketMicros表示下一次请求获得permits的最早时间。每次授权一个请求以后,这个值会向后推移(PS:想象一下时间轴)即向未来推移。因此,大的请求会比小的请求推得更。这里的大小指的是获取permit的数量。这个应该很好理解,因为上一次请求获取的permit数越多,那么下一次再获取授权时更待的时候会更长,反之,如果上一次获取的少,那么时间向后推移的就少,下一次获得许可的时间更短。可见,都是有代价的。正所谓:要浪漫就要付出代价。

还要注意到一点,就是获取令牌和处理请求是两个动作,而且,并不是每一次都获取一个,也不要想当然的认为一个请求获取一个permit(或者叫令牌),可以再看看前面那幅图

Stopwatch

一个以纳秒为单位度量流逝时间的对象。它是一个相对时间,而不是绝对时间。


Stopwatch stopwatch = Stopwatch.createStarted();
System.out.println("hahah");
stopwatch.stop();
Duration duration = stopwatch.elapsed();
System.out.println(stopwatch);  

Semaphore(信号量)

A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it. Each release() adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly. 

一个信号量维护了一系列permits。

每次调用acquire()方法获取permit,如果必要的话会阻塞直到有一个permit可用为止。

调用release()方法则会释放自己持有的permit,即用完了再还回去。

信号量限制的是并发访问临界资源的线程数。

令牌桶算法 VS 漏桶算法

漏桶

漏桶的出水速度是恒定的,那么意味着如果瞬时大流量的话,将有大部分请求被丢弃掉(也就是所谓的溢出)。

令牌桶

生成令牌的速度是恒定的,而请求去拿令牌是没有速度限制的。这意味,面对瞬时大流量,该算法可以在短时间内请求拿到大量令牌,而且拿令牌的过程并不是消耗很大的事情。

最后,不论是对于令牌桶拿不到令牌被拒绝,还是漏桶的水满了溢出,都是为了保证大部分流量的正常使用,而牺牲掉了少部分流量,这是合理的,如果因为极少部分流量需要保证的话,那么就可能导致系统达到极限而挂掉,得不偿失。

小定律:排队理论

https://en.wikipedia.org/wiki/Little%27s_law

the long-term average number L of customers in a stationary system is equal to the long-term average effective arrival rate λ multiplied by the average time W that a customer spends in the system. Expressed algebraically the law is:

在一个固定系统中,顾客的长期平均数量L等于顾客的长期平均到达速率λ乘以顾客在系统中平均花费的时间W。用公式表示为:

虽然这看起来很容易,但这是一个非常显著的举世瞩目的结果,因为这种关系“不受到达过程的分布,服务分布,服务顺序,或其他任何因素的影响”。这个结果适用于任何系统,特别是适用于系统内的系统。唯一的要求是系统必须是稳定的非抢占式的。

例子

例1:找响应时间

假设有一个应用程序没有简单的方法来度量响应时间。如果系统的平均数量和吞吐量是已知的,那么平均响应时间就是:

mean response time = mean number in system / mean throughput

平均响应时间 = 系统的平均数量 / 平均吞吐量.

 

例2:顾客在店里

想象一下,一家小商店只有一个柜台和一个可供浏览的区域,每次只能有一个人在柜台,并且没有人不买东西就离开。

所以这个系统大致是:进入 --> 浏览 --> 柜台结账 --> 离开

在一个稳定的系统中,人们进入商店的速度就是他们到达商店的速度(我们叫做到达速度),它们离开的速度叫做离开速度。

相比之下,到达速度超过离开速度代表是一个不稳定的系统,这就会造成等待的顾客数量将逐渐增加到无穷大。

前面的小定律告诉我们,商店的平均顾客数量L等于有效的到达速度λ乘以顾客在商店的平均停留时间W。用公式表示为:

假设,顾客以每小时10个的速度到达,并且平均停留时间是0.5小时。那么这就意味着,任意时间商店的平均顾客数量是5

现在假设商店正在考虑做更多的广告,把到达率提高到每小时20。商店必须准备好容纳平均10人,或者必须将每个顾客在商店中的时间减少到0.25小时。商店可以通过更快地结帐或者增加更多的柜台来达到后者的目的。

我们可以把前面的小定律应用到商店系统中。例如,考虑柜台和在柜台前排的队。假设平均有2个人在柜台前排队,我们知道顾客到达速度是每小时10,所以顾客平均必须停留时间为0.2小时。

最后

这是单机(单进程)的限流,是JVM级别的的限流,所有的令牌生成都是在内存中,在分布式环境下不能直接这么用。

如果我们能把permit放到Redis中就可以在分布式环境中用了。

参考

https://blog.csdn.net/jek123456/article/details/77152571

https://blog.csdn.net/syc001/article/details/72841951

https://segmentfault.com/a/1190000012875897

https://blog.csdn.net/charleslei/article/details/53152883

https://www.jianshu.com/p/8f548e469bbe

https://www.cnblogs.com/f-zhao/p/7210158.html

https://m.jb51.net/article/127996.htm

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

令牌桶算法限流 的相关文章

  • 使用ctdb+samba+glusterfs搭建NAS集群系统

    1概述 本文介绍使用开源软件ctdb 43 samba 43 gluster搭建NAS集群系统 1 1 使用的开源软件介绍 1 glusterfs glusterfs是一个开源的分布式文件系统 xff0c 只适用于大文件存储 xff0c 存
  • react中优雅使用svg矢量图

    icon图标可以有很多形式 比如说CSS Sprite 引用字体图标 纯css 简单的icon 等等 优缺点这里不在赘述 xff0c 自行google之 下面就进入正题说说今天的主角svg 1 svg的发展历史 2001年9月4日 xff0
  • 使用自制相机运行 VINS-Mono

    使用自制相机运行VINS Mono 1 相机与IMU标定2 自制相机测试3 运行效果参考资料 1 相机与IMU标定 VINSmono的安装这里就省略了 xff0c 可以参考作者的github网页 2 我所使用的是ZED相机和Xsens IM
  • 标准正态分布变量的累积概率分布函数

    2019独角兽企业重金招聘Python工程师标准 gt gt gt 最近有个期权项目 xff0c 计算理论价时需要使用标准正态分布变量的累积概率分布函数 xff0c excel中可以通过normsdist函数得到该结果 xff0c 但是项目
  • spring boot 中 @ConditionalOnMissingBean和@ConditionalOnBean注解注意事项

    2019独角兽企业重金招聘Python工程师标准 gt gt gt 关于使用 64 Bean注解注入bean导致ConditionOnMissBean和 ConditionOnBean 注解有时候会失效的问题 文档上提示 xff0c 需要注
  • vscode左侧文件不同颜色标识含义

    代码里的左侧颜色标识 红色 xff0c 未加入版本控制 刚clone到本地 绿色 xff0c 已经加入版本控制暂未提交 新增部分 蓝色 xff0c 加入版本控制 xff0c 已提交 xff0c 有改动 xff1b 修改部分 白色 xff0c
  • 读取本地文件转化成MultipartFile

    介绍 现在有个上传文件功能 xff0c 需要将文件上传到oss上 xff0c 但是文件有点多 xff0c 于是使用接口进行上传 但是需要上传文件转换为MultipartFile类型文件进行上传 主要代码 添加pom文件 lt depende
  • 桌面上嵌入窗口(桌面日历)原理探索

    摘要 今天在QQ群里有人问怎样实现将自己的窗口嵌入桌面 xff0c 让它和桌面融为一体 xff0c 就像很多桌面日历软件那样 阅读全文 Richard Wei 2012 05 03 22 07 发表评论 转载于 https www cnbl
  • Git中分支merge和rebase的适用场景及区别

    几乎所有的版本控制工具都有branch功能 xff0c branch主要用于以下几个场景 xff1a 1 xff0c 控制产品OEM 基本上做产品 xff0c 不同的客户都会提出多种不同特性需求 xff0c 最简单的例子就是LOGO和标题完
  • sass安装与教程

    首先下载ruby http dlsw baidu com sw search sp soft ff 22711 rubyinstaller V2 2 2 95 setup 1439890355 exe 安装时注意勾选一下选项 安装完ruby
  • 集成学习原理小结

    集成学习 ensemble learning 可以说是现在非常火爆的机器学习方法了 它本身不是一个单独的机器学习算法 xff0c 而是通过构建并结合多个机器学习器来完成学习任务 也就是我们常说的 博采众长 集成学习可以用于分类问题集成 xf
  • 这款APP明确告诉你,无人机在什么地方可以飞

    美国联邦航空管理局 xff08 FAA xff09 为无人机管制推出了一个新的应用 B4UFLY xff0c 用来向用户显示无人机飞行的合法范围 关于如何有效的对无人机飞行进行管制 xff0c 已经是老生常谈的问题了 xff0c 除了制定相
  • 什么是委托?为什么要使用委托?什么是事件?

    1 什么是委托 xff1f 首先声明一个委托 xff1a public delegate string IsLengthFive string s 下面写几个方法 xff1a public string DoWork string a pu
  • 读取多超声波传感器

    读取多超声波传感器 1 背景2 使用教程2 1 接线说明2 2 上传协议 3 ROS节点使用3 1 下载与配置3 3 常见问题 无串口权限 4 更新程序参考资料 该模块是一个开源模块 xff0c 并提供了配套的ROS节点 xff0c 接收串
  • 系统级性能分析工具perf的介绍与使用

    测试环境 xff1a Ubuntu16 04 43 Kernel xff1a 4 4 0 31 系统级性能优化通常包括两个阶段 xff1a 性能剖析 xff08 performance profiling xff09 和代码优化 性能剖析的
  • VR的商業模式

    1 硬件销售 根据Digi Capital预估 xff0c 在5年内 xff0c AR VR将会有数亿量级的用户数 从免费到高端系统 xff0c 硬件销售将获利最多 届时每由AR VR生成的10美元利润 xff0c 就有4美元收入属于硬件销
  • arm汇编基础(转)

    先看个例子 xff1a void test2 int a int b int c int k 61 a j 61 b m 61 c GCC反汇编 xff1a 00000064 lt test2 gt mov ip sp IP 61 SP 保
  • STL学习思想

    1 模版 xff1a 一定要注意参数和返回值的模版 2 STL一系列的API xff1a 一定要注意返回值 3 容器中的都是值拷贝 而不是引用 在执行插入时 内部实行拷贝动作 所以STL中插入类时 一般都必须 xff1a 无参构造函数 拷贝
  • 中国地质大学(北京)软件工程非全2019研究生复试总结

    呼 今早刚出家门就收到地大发来的拟录取通知 xff0c 算是圆满的给2019考研划上了句号 前言 3 26号去参加了中国地质大学 xff08 北京 xff09 软件工程专业非全日制的复试 因为本人初试分数较低 xff0c 调剂的时候报了不少

随机推荐