Guava RateLimiter
限流
限流是保护高并发系统的三把利器之一(限流、缓存、降级);其目的是通过对并发访问或请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务或进行流量整形
限流的类型可大致分为:
- 限制总并发数 - 数据库连接池、线程池
- 限制瞬时并发数 - nginx 的 limitconn 模块、Java Semaphore 限制并发
- 限制时间窗口内的平均速率 - Guava 的 RateLimiter
- 其他 - 根据网络流量、CPU 占用率、内存占用率等作为标准来进行限流
桶漏 & 令牌桶
桶漏和令牌桶指的是限流的两种模型,其目的都是限流以达到限制时间窗口内的平均速率的目的
桶漏是接收请求,并按照一定的速率通过请求
令牌桶是按照固定的速率生产令牌,请求需要获取对应的令牌才能通过
RateLimiter
Guava 是 Java 领域优秀的开源项目,它包含了 Google 在 Java 项目中使用一些核心库
其中就有限流相关的 RateLimiter
工具,底层基于令牌桶思想,提供了平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)多种实现
类图属性如下:
创建
最终的实现类都是 SmoothRateLimiter
的内部类,但是创建方法放在了更上层的 RateLimiter
中(工厂)
调用 RateLimiter.create()
静态方法进行创建
1 | // NO.1 |
SmoothBursty
先看参数列表为 double permitsPerSecond
的方法,SleepingStopwatch.createFromSystemTimer()
创建了一个 SleepingStopwatch
后调用了下一层创建方法
1 |
|
SleepingStopwatch
的目的是记录创建时间和计算相对时间,整个 RateLimiter
的时间都是相对创建时间而言的,也就是时间的运算依赖
SleepingStopwatch
对象
new SmoothBursty(stopwatch, 1.0)
的参数除了时间相关的对象,就是 maxBurstSeconds = 1
了,可以看出默认的 SmoothBursty
其应对突发流量的设置是多存储时间为 1s 的许可
而后调用了
rateLimiter.setRate(permitsPerSecond)
,这个先放在后面
SmoothWarmingUp
参数列表为
double permitsPerSecond, long warmupPeriod, TimeUnit unit
的方法,先对参数进行了校验,随后也调用了下一层的创建方法
1 |
|
流程和 SmoothBursty
类似,多了 warmupPeriod
相关的参数,也是用来实现 WarmingUp 的关键参数
后面也调用了 rateLimiter.setRate(permitsPerSecond)
设置 rate
rate 的设置关键参数是
permitsPerSecond
,这是使用者传入的参数,代表每秒希望生产的许可数量(不一定代表
QPS,因为每次请求许可不一定请求一个)
调用 RateLimiter
的 setRate
方法进行设置,在每次创建 RateLimiter
时,创建方法在实例化后也会调用一次该方法
1 | public final void setRate(double permitsPerSecond) { |
先进行参数校验,随后获取锁调用 doSetRate
方法;这里可以进行一个猜想,设置 rate
和获取许可保证并发的锁一定是同一把,看起来统一封装到了
mutex()
方法里,而后调用了 doSetRate
SmoothRateLimiter 的 doSetRate
在 RateLimiter
中,doSetRate
是一个抽象方法,SmoothRateLimiter
进行了实现
1 | abstract void doSetRate(double permitsPerSecond, long nowMicros); |
1 |
|
先调用 resync
方法重新计算
this.storedPermits
和
this.nextFreeTicketMicros
这两个重要属性;这是一个常用的方法,在后续流程中会有多个操作都调用该方法,具体实现在后面进行分析
根据参数 permitsPerSecond
计算出一秒内许可生产的间隔,赋值给
this.stableIntervalMicros
继续调用 doSetRate
SmoothBursty 的 doSetRate
SmoothRateLimiter
的 doSetRate
也是一个抽象方法,SmoothBursty
进行了实现
1 | abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros); |
1 |
|
这里主要计算了新的 maxPermits
,并根据旧的
maxPermits
,即 oldMaxPermits
计算了
storedPermits
的值
相当于设置了新的 permitsPerSecond
,需要计算新的
maxPermits
,并等比缩放已经存储的许可数量
获取许可(permits)
创建 RateLimiter
对象后,使用 acquire
或者
tryAcquire
方法来获取许可
1 | RateLimiter rateLimiter = RateLimiter.create(10); |
最终的返回值是等待的时间,并且在返回之前该线程一直在等待
着重看下 acquire
方法
1 | public double acquire(int permits) { // 参数 permits 代表一次需要获取的许可数量 |
主要是通过 reserve
方法,返回的是一个需要等待的微秒值
microsToWait
,如果大于 0 则进行 sleep 操作(使用的同样是
Guava 包下的 Uninterruptibles
,先不关注)
最终将等待的微秒换算成单位秒返回
reserve
1 | final long reserve(int permits) { |
这个方法里主要做了两部分操作
- 校验参数
permits
合法性,其实就是是否大于 0checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);
- 使用
synchronized
关键字上锁(mutex
方法返回的是this.mutexDoNotUseDirectly
一个Object
对象),上锁后执行reserveAndGetWaitLength
方法
reserveAndGetWaitLength
1 | final long reserveAndGetWaitLength(int permits, long nowMicros) { |
通过方法名 “获取等待长度”,可以知道这里主要是通过
reserveEarliestAvailable
获取一个时间(这个时间在后面的方法中可知是下一次获取许可的时间)后,在次方法最终确认需要等待的时间并返回
max(momentAvailable - nowMicros, 0)
,如果不需要等待(momentAvailable - nowMicros < 0
),则返回
0
arliestAvailable
RateLimiter
是一个抽象类,这个方法是一个抽象方法,由
SmoothRateLimiter
进行了实现
1 | abstract long reserveEarliestAvailable(int permits, long nowMicros); |
1 |
|
第一行就调用了方法 resync()
,该方法的目的是重新计算
this.storedPermits
和
this.nextFreeTicketMicros
这两个重要属性
接下来的几行操作可以概括为:如果需要获取的许可大于当前已经存储的许可,那么计算出超出的许可数量
freshPermits
,调用 storedPermitsToWaitTime
方法返回值加上 超出的许可数量 × 许可生产间隔
即可算出需要额外等待的时长;这里对于 SmoothBursty
来说,返回固定是 0,也就是额外的等待时间就是
超出的许可数量 × 许可生产间隔
最后更新这次操作后 this.nextFreeTicketMicros
和
this.storedPermits
的值,返回一开始保存的
this.nextFreeTicketMicros
这里需要思考一个问题:为什么返回的是
resync
后的
nextFreeTicketMicros
?如果令牌数量不够,这个值不是被向后更新了吗?这里就体现出
RateLimiter
整体的一个思想,即每一个请求过来本质上都能拿到许可,无非是等待多长时间才能执行,对于超出部分来说,当前线程不用等待,将这部分时间全部交给下一个线程来进行等待了,我自己概括起来就是
惰性计算 + 以当前线程为主
resync
1 | void resync(long nowMicros) { |
该方法的目的是重新计算 this.storedPermits
和
this.nextFreeTicketMicros
这两个重要属性
如果当前时间超过了之前计算的下一次获取许可的时间,则开始更新:
- 先计算新产生的许可数量(
coolDownIntervalMicros()
返回的是许可生产的固定间隔) - 更新
this.storedPermits
为合适的值(SmoothBursty
最多只能多存储 1s 的许可) - 更新
this.nextFreeTicketMicros
为参数nowMicros
预热(WarmingUp)
上面的分析主要是对 SmoothBursty
进行了分析,还提供了一种实现是 SmoothWarmingUp
SmoothWarmingUp
适用于资源需要预热的场景,相比
SmoothBursty
是对突发流量进行保证,通过
maxBurstSeconds
来计算额外存储的令牌数量,SmoothWarmingUp
使用
warmupPeriod
参数来设置预热的时长
由于预热机制的存在,其生产许可的速度是应该是动态的,如下图
其许可数量越多,说明还没到稳定时期,许可的生产间隔就要越长;许可少说明处于稳定期
关键属性计算
根据该图可以计算出一些数值,首先需要确定的是,从
thresholdPermits
到 0 的时间,是从 maxPermits
到 thresholdPermits
时间的一半,也就是梯形的面积是长方形面积的 2 倍,梯形的面积是
warmupPeriod
因为在创建 SmoothWarmingUp
默认的 coldFactor 为 3
SleepingStopwatch.createFromSystemTimer(), permitsPerSecond, warmupPeriod, unit, 3.0);
梯形面积为 warmupPeriod
,而长方形面积为
stableInterval × thresholdPermits
,即:
1 | warmupPeriod = 2 * stableInterval * thresholdPermits |
由此,我们得出 thresholdPermits
的值:
1 | thresholdPermits = 0.5 * warmupPeriod / stableInterval |
然后我们根据梯形面积的计算公式:
1 | warmupPeriod = 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits) |
得出 maxPermits
为:
1 | maxPermits = thresholdPermits + 2.0 * warmupPeriod / (stableInterval + coldInterval) |
这样,我们就得到了 thresholdPermits
和
maxPermits
的值
最后是冷却时间的间隔,对应到图中代表的是斜线的斜率,即由稳定到冷却中的速度;在代码中为:
该方法 SmoothBursty
的实现始终返回 0
1 |
|
doSetRate
上面分析了 SmoothBursty
的 doSetRate
实现,这里来看 SmoothWarmingUp
的实现
1 |
|
主要是计算了相关的属性并赋值给成员变量,计算方法都和上图一致
acquire 的 storedPermitsToWaitTime
acquire
流程和 SmoothBursty
基本一致,区别在于 storedPermitsToWaitTime
方法的实现
1 |
|
waitMicros
由两部分组成,一部分是从
storedPermits
中获取花费的时间,一部分是等待
freshPermits
产生花费的时间;SmoothBursty
的该方法实现固定返回 0,即不存在从 storedPermits
获取许可的花费,而 SmoothWarmingUp
实现较为复杂
由于需要预热,所以从存储的许可中中取许可需要花费一定的时间,其实就是要计算下图中,阴影部分的面积
1 |
|
最终将这部分等待时间返回,达到预热等待的目的
重点问题
许可从哪来
经过上面的分析,可以得出,所谓的许可是惰性的运算,并不是表面理解的可能存在一个线程在生产许可,而是当请求进入时才会计算许可相关的数据(下一次许可时间、存储的许可数量、满足当前请求许可需要等待的时间等),然后对该请求行为进行处理
并发安全是如何保证的
并发主要出现在两个地方,setRate
设置速率方法和
acquire
获取许可方法内,并且这两个方法也互斥
关键的方法在 reserve()
内
1 | final long reserve(int permits) { |
那么着重关注 mutex()
返回了什么对象
1 | // Can't be initialized in the constructor because mocks don't call the constructor. |
这里会有几个疑问与答案:
Object mutex = mutexDoNotUseDirectly
为什么不直接操作this.mutexDoNotUseDirectly
,要使用一个临时变量mutexDoNotUseDirectly
锁对象的创建有必要使用懒汉式吗?RateLimiter
在实例化后直接执行了reserve
方法,那么这个懒加载的单例意义是什么- 为什么不使用
synchronized (this)
,包装了一个方法mutex()
来返回锁对象
第一个问题:减少 volatile
对内存的读请求;mutexDoNotUseDirectly
是可见的,每次读取都会访问内存,使用临时变量对于不需要可见性的场景能减少访问内存的次数
It avoids an additional volatile read of the field once it’s determined to be non-null. 详见 issue:https://github.com/google/guava/issues/3381
第二个问题:源代码注释中也已经解释了目的:// Can't be initialized in the constructor because mocks don't call the constructor.
作者是考虑到使用 Mockito 框架时,用 mock 方法创建
RateLimiter
的 mock
对象不会执行其构造器,该锁对象不会被实例化;并且成员变量直接赋值也会被解释为构造器的一部分
Inline field initialization is syntactic sugar for initializing from the constructor. 详见 issue:https://github.com/google/guava/issues/3066
第三个问题:更方便控制锁粒度,并且避免使用
this
作为锁对象是一种规范,因为无法得知外部是否也是用
this
作为锁来使用
可以参考该回答:https://stackoverflow.com/questions/12397427/what-is-different-between-method-synchronized-vs-object-synchronized
参考
超详细的Guava RateLimiter限流原理解析 - 知乎 (zhihu.com)
Guava限流器RateLimiter中mutexDoNotUseDirectly/锁的使用_DengDengLei的博客-CSDN博客
RateLimiter 源码分析(Guava 和 Sentinel 实现)_一直不懂的博客-CSDN博客_sentinel ratelimiter