使用Guava RateLimiter限流以及源码解析

2023-11-07

 

前言

在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流

  • 缓存 缓存的目的是提升系统访问速度和增大系统处理容量
  • 降级 降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开
  • 限流 限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理

常用的限流算法

漏桶算法

漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

img

令牌桶算法

对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。如图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

img

RateLimiter使用以及源码解析

Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方便,而且十分高效。

RateLimiter使用

首先简单介绍下RateLimiter的使用,

 

public void testAcquire() {
      RateLimiter limiter = RateLimiter.create(1);

      for(int i = 1; i < 10; i = i + 2 ) {
          double waitTime = limiter.acquire(i);
          System.out.println("cutTime=" + System.currentTimeMillis() + " acq:" + i + " waitTime:" + waitTime);
      }
  }

输出结果:

 

cutTime=1535439657427 acq:1 waitTime:0.0
cutTime=1535439658431 acq:3 waitTime:0.997045
cutTime=1535439661429 acq:5 waitTime:2.993028
cutTime=1535439666426 acq:7 waitTime:4.995625
cutTime=1535439673426 acq:9 waitTime:6.999223

首先通过RateLimiter.create(1);创建一个限流器,参数代表每秒生成的令牌数,通过limiter.acquire(i);来以阻塞的方式获取令牌,当然也可以通过tryAcquire(int permits, long timeout, TimeUnit unit)来设置等待超时时间的方式获取令牌,如果超timeout为0,则代表非阻塞,获取不到立即返回。

从输出来看,RateLimiter支持预消费,比如在acquire(5)时,等待时间是3秒,是上一个获取令牌时预消费了3个两排,固需要等待3*1秒,然后又预消费了5个令牌,以此类推

RateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费),在使用过程中需要注意这一点,具体实现原理后面再分析。

RateLimiter实现原理

Guava有两种限流模式,一种为稳定模式(SmoothBursty:令牌生成速度恒定),一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值) 两种模式实现思路类似,主要区别在等待时间的计算上,本篇重点介绍SmoothBursty

RateLimiter的创建

通过调用RateLimiter的create接口来创建实例,实际是调用的SmoothBuisty稳定模式创建的实例。

 

public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
  }

  static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }

SmoothBursty中的两个构造参数含义:

  • SleepingStopwatch:guava中的一个时钟类实例,会通过这个来计算时间及令牌
  • maxBurstSeconds:官方解释,在ReteLimiter未使用时,最多保存几秒的令牌,默认是1

在解析SmoothBursty原理前,重点解释下SmoothBursty中几个属性的含义

 

/**
 * The work (permits) of how many seconds can be saved up if this RateLimiter is unused?
 * 在RateLimiter未使用时,最多存储几秒的令牌
 * */
 final double maxBurstSeconds;
 

/**
 * The currently stored permits.
 * 当前存储令牌数
 */
double storedPermits;

/**
 * The maximum number of stored permits.
 * 最大存储令牌数 = maxBurstSeconds * stableIntervalMicros(见下文)
 */
double maxPermits;

/**
 * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
 * per second has a stable interval of 200ms.
 * 添加令牌时间间隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌数)
 */
double stableIntervalMicros;

/**
 * The time when the next request (no matter its size) will be granted. After granting a request,
 * this is pushed further in the future. Large requests push this further than small requests.
 * 下一次请求可以获取令牌的起始时间
 * 由于RateLimiter允许预消费,上次请求预消费令牌后
 * 下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌
 */
private long nextFreeTicketMicros = 0L; // could be either in the past or future

接下来介绍几个关键函数

  • setRate

 

public final void setRate(double permitsPerSecond) {
  checkArgument(
      permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
  synchronized (mutex()) {
    doSetRate(permitsPerSecond, stopwatch.readMicros());
  }
}

通过这个接口设置令牌通每秒生成令牌的数量,内部时间通过调用SmoothRateLimiterdoSetRate来实现

  • doSetRate

 

@Override
  final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }

这里先通过调用resync生成令牌以及更新下一期令牌生成时间,然后更新stableIntervalMicros,最后又调用了SmoothBurstydoSetRate

  • resync

 

/**
 * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
 * 基于当前时间,更新下一次请求令牌的时间,以及当前存储的令牌(可以理解为生成令牌)
 */
void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
}

根据令牌桶算法,桶中的令牌是持续生成存放的,有请求时需要先从桶中拿到令牌才能开始执行,谁来持续生成令牌存放呢?

一种解法是,开启一个定时任务,由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口需要分别对每个用户做访问频率限制,假设系统中存在6W用户,则至多需要开启6W个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。

另一种解法则是延迟计算,如上resync函数。该函数会在每次获取令牌之前调用,其实现思路为,若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据。这样一来,只需要在获取令牌时计算一次即可。

  • SmoothBursty的doSetRate

 

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  maxPermits = maxBurstSeconds * permitsPerSecond;
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    // if we don't special-case this, we would get storedPermits == NaN, below
    // Double.POSITIVE_INFINITY 代表无穷啊
    storedPermits = maxPermits;
  } else {
    storedPermits =
        (oldMaxPermits == 0.0)
            ? 0.0 // initial state
            : storedPermits * maxPermits / oldMaxPermits;
  }
}

桶中可存放的最大令牌数由maxBurstSeconds计算而来,其含义为最大存储maxBurstSeconds秒生成的令牌。
该参数的作用在于,可以更为灵活地控制流量。如,某些接口限制为300次/20秒,某些接口限制为50次/45秒等。也就是流量不局限于qps

RateLimiter几个常用接口分析

在了解以上概念后,就非常容易理解RateLimiter暴露出来的接口

 

@CanIgnoreReturnValue
public double acquire() {
  return acquire(1);
}

/**
* 获取令牌,返回阻塞的时间
**/
@CanIgnoreReturnValue
public double acquire(int permits) {
  long microsToWait = reserve(permits);
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

final long reserve(int permits) {
  checkPermits(permits);
  synchronized (mutex()) {
    return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  }
}

acquire函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回,主要通过reserve返回需要等待的时间,reserve中通过调用reserveAndGetWaitLength获取等待时间

 

/**
 * Reserves next ticket and returns the wait time that the caller must wait for.
 *
 * @return the required wait time, never negative
 */
final long reserveAndGetWaitLength(int permits, long nowMicros) {
  long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
  return max(momentAvailable - nowMicros, 0);
}

最后调用了reserveEarliestAvailable

 

@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  resync(nowMicros);
  long returnValue = nextFreeTicketMicros;
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
  double freshPermits = requiredPermits - storedPermitsToSpend;
  long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
          + (long) (freshPermits * stableIntervalMicros);

  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}

 

首先通过resync生成令牌以及同步nextFreeTicketMicros时间戳,freshPermits从令牌桶中获取令牌后还需要的令牌数量,通过storedPermitsToWaitTime计算出获取freshPermits还需要等待的时间,在稳定模式中,这里就是(long) (freshPermits * stableIntervalMicros) ,然后更新nextFreeTicketMicros以及storedPermits,这次获取令牌需要的等待到的时间点, reserveAndGetWaitLength返回需要等待的时间间隔。

从`reserveEarliestAvailable`可以看出RateLimiter的预消费原理,以及获取令牌的等待时间时间原理(可以解释示例结果),再获取令牌不足时,并没有等待到令牌全部生成,而是更新了下次获取令牌时的nextFreeTicketMicros,从而影响的是下次获取令牌的等待时间。



 `reserve`这里返回等待时间后,`acquire`通过调用`stopwatch.sleepMicrosUninterruptibly(microsToWait);`进行sleep操作,这里不同于Thread.sleep(), 这个函数的sleep是uninterruptibly的,内部实现:

 

public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
    //sleep 阻塞线程 内部通过Thread.sleep()
  boolean interrupted = false;
  try {
    long remainingNanos = unit.toNanos(sleepFor);
    long end = System.nanoTime() + remainingNanos;
    while (true) {
      try {
        // TimeUnit.sleep() treats negative timeouts just like zero.
        NANOSECONDS.sleep(remainingNanos);
        return;
      } catch (InterruptedException e) {
        interrupted = true;
        remainingNanos = end - System.nanoTime();
        //如果被interrupt可以继续,更新sleep时间,循环继续sleep
      }
    }
  } finally {
    if (interrupted) {
      Thread.currentThread().interrupt();
      //如果被打断过,sleep过后再真正中断线程
    }
  }
}

 

sleep之后,`acquire`返回sleep的时间,阻塞结束,获取到令牌。

 

public boolean tryAcquire(int permits) {
  return tryAcquire(permits, 0, MICROSECONDS);
}

public boolean tryAcquire() {
  return tryAcquire(1, 0, MICROSECONDS);
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  long timeoutMicros = max(unit.toMicros(timeout), 0);
  checkPermits(permits);
  long microsToWait;
  synchronized (mutex()) {
    long nowMicros = stopwatch.readMicros();
    if (!canAcquire(nowMicros, timeoutMicros)) {
      return false;
    } else {
      microsToWait = reserveAndGetWaitLength(permits, nowMicros);
    }
  }
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return true;
}

private boolean canAcquire(long nowMicros, long timeoutMicros) {
  return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}

@Override
final long queryEarliestAvailable(long nowMicros) {
  return nextFreeTicketMicros;
}

tryAcquire函数可以尝试在timeout时间内获取令牌,如果可以则挂起等待相应时间并返回true,否则立即返回false
canAcquire用于判断timeout时间内是否可以获取令牌,通过判断当前时间+超时时间是否大于nextFreeTicketMicros 来决定是否能够拿到足够的令牌数,如果可以获取到,则过程同acquire,线程sleep等待,如果通过canAcquire在此超时时间内不能回去到令牌,则可以快速返回,不需要等待timeout后才知道能否获取到令牌。

到此,Guava RateLimiter稳定模式的实现原理基本已经清楚,如发现文中错误的地方,劳烦指正!


 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用Guava RateLimiter限流以及源码解析 的相关文章

随机推荐

  • 什么是低信噪比图像及处理方法

    信号处理领域的信噪比即SNR Singal to Noise Ration 又称讯噪比 即放大器的输出信号的电压与同时输出的噪声电压的比 常常用分贝数表示 设备的信噪比越高表明它产生的杂音越少 一般来说 信噪比越大 说明混在信号里的噪声越小
  • python (一维、二维)列表的初始化

    一维列表的初始化 初始一个长度为5的列表 方式1 a 0 5 0 0 0 0 0 方式2 a 0 for in range 5 0 0 0 0 0 二维列表的初始化 初始一个2 5的列表 方式1 b 0 5 for in range 2 0
  • Hibernate环境搭建(小实例)

    Hibernate是一个开源的对象关系映射框架 在学习之前 首先让我们先了解一下Hibernate环境是如何搭建的 废话不多说 直接进入正题 建项目 引Jar包 首先 我们需要创建一个Java项目 创建好项目之后 就需要引入与Hiberna
  • unity的HDR效果

    http blog csdn net wolf96 article details 44057915 文章开始先放两组效果 文章结尾再放两组效果 本文测试场景资源来自浅墨大神 shader效果为本文效果 HDR 人们有限的视觉系统 只支持1
  • 【DS】单链表@线性表 —— 增删查改

    目录 0 引 1 链表的概念和结构 2 链表的分类 3 链表的实现 3 1 打印 申请新节点 销毁 3 1 1 打印 3 1 2 申请新节点 3 1 3 销毁 3 2 尾插 尾删 3 2 1 尾插 3 2 2 尾删 3 3 头插 头删 3
  • 物联网场景中,我们如何选择时序数据库 ?

    如今时序数据的应用场景十分广泛 许多类型的数据都是时间序列数据 金融市场交易 传感器测量 水冷 高温 地震 服务器监控 CPU 内存 磁盘 资源消耗 能源 电力 人体健康 心率 血氧浓度 网络访问 通过保留数据固有的时间序列性质 我们可以记
  • mysql:ER_TRUNCATED_WRONG_VALUE_FOR_FIELD: Incorrect string value:

    发现某个组件的表单输入报错 Error ER TRUNCATED WRONG VALUE FOR FIELD Incorrect string value xE6 x88 x91 xE4 xBB xAC for column content
  • 【算法与数据结构】235、LeetCode二叉搜索树的最近公共祖先

    文章目录 一 题目 二 解法 三 完整代码 所有的LeetCode题解索引 可以看这篇文章 算法和数据结构 LeetCode题解 一 题目 二 解法 思路分析 本题和这道题类似 算法与数据结构 236 LeetCode二叉树的最近公共祖先
  • Linux下Samba的配置

    参考 http www cnblogs com mchina archive 2012 12 18 2816717 html 前言 为了实现windows 和 Linux以及其他操作系统之间的资源共享 软件商推出nfs 和samba两种解决
  • Air101

    目录 1 合宙Air101 固件编译可参考 PinOut V2 1092400 管脚映射表 PinOut V2 1091800 2 Air103 最新固件下载 固件编译可参考 PinOut V3 21112201 管脚映射表 资料链接 Pi
  • Solid JS基础

    Solid js 用于构建用户界面的声明式 高效且灵活的 JavaScript 库 您可以在 官方教程 中尝试下面提到的部分例子 本文引用并简化了官方教程中的部分例子 本文讲述部分 solid 主要内容 更多详细内容 移步 Solid AP
  • 编码器的使用

    首先来看一下增量式编码器的输出信号和它的信号倍频技术 增量式编码器输出的脉冲波形信号形式常见的有两种 一种是占空比50 的方波 通道A和B相位差为90 另一种则是正弦波这类模拟信号 通道A和B相位差同样为90 对于第1种形式的方波信号 如果
  • MATLAB量化浮点数

    在做算法设计和验证时 常在matlab进行浮点验证 然后量化后在用在FPGA上 对于类似与FIR这些滤波器系数 matlab直接可以export出来 但是在验证麦克风或者ADC出来的24bit补码这类时常常需要使用matlab生成定点数进行
  • 有关DHCP、链路聚合、NAT、ACL、Telnet配置小综合实验

    实验需求 1 局域网中存在VLAN10 VLAN20 VLAN30和VLAN40四个部门 IP网段 应192 168 10 0 24 192 168 20 0 24 192 168 30 0 24 192 168 40 0 24 2 业务V
  • Linux虚拟机 Ubuntu16 cheese命令打开摄像头黑屏,以及mjpg-stream框架不显示视频界面。

    在Linux虚拟机 Ubuntu16打开笔记本的摄像头时 用 ubuntu16的cheese命令显示黑屏 如下图 解决方法 1 查看虚拟机是否已经已连接上摄像头 显示断开连接 连接主机 D 则虚拟机已连接上摄像头 2 确认摄像头 笔记本的摄
  • 搭建环境【2】windows主机和ubuntu互传文件的4种方法

    我的ubuntu系统是安装在 VMware 虚拟机中的 两者之间经常要互传文件 下面介绍4种常用的互传文件方法 1 共享文件夹方式互传 在虚拟机中需要开启共享文件夹的功能 首先虚拟机中的ubuntu要求是已经开机了的状态 然后进行设置 虚拟
  • 山东轻工业学院高校俱乐部主席“我和我的CSDN高校俱乐部”

    今天天气不错 上午收到了CSDN邮寄过来的2012年度优秀主席证书以及奖品 心情也不错 感谢CSDN教育事业部全体同事一年来对我们工作的支持与帮助 同时也感谢那些奋战在一线的兄弟姐妹们 你们为轻工学院CSDN高校俱乐部所做的一切 我永远铭记
  • ThreadLocal - ThreadlMap与弱引用

    ThreadLocal源码 在看ThreadLocal源码的时候 其中嵌套类ThreadLocalMap中的Entry继承了WeakReferenc static class ThreadLocalMap static class Entr
  • C++ string字符串修改和替换方法详解

    字符串内容的变化包括修改和替换两种 本节将分别讲解字符串内容的修改和字符串内容的替换 字符串内容的修改 可以通过使用多个函数修改字符串的值 例如 assign operator erase 交换 swap 插入 insert 等 另外 还可
  • 使用Guava RateLimiter限流以及源码解析

    前言 在开发高并发系统时有三把利器用来保护系统 缓存 降级和限流 缓存 缓存的目的是提升系统访问速度和增大系统处理容量 降级 降级是当服务出现问题或者影响到核心流程时 需要暂时屏蔽掉 待高峰或者问题解决后再打开 限流 限流的目的是通过对并发