贫瘠之地

华北无浪漫,死海扬起帆
多少个夜晚,独自望着天

0%

Guava RateLimiter

限流

限流是保护高并发系统的三把利器之一(限流、缓存、降级);其目的是通过对并发访问或请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务或进行流量整形

限流的类型可大致分为:

  • 限制总并发数 - 数据库连接池、线程池
  • 限制瞬时并发数 - nginx 的 limitconn 模块、Java Semaphore 限制并发
  • 限制时间窗口内的平均速率 - Guava 的 RateLimiter
  • 其他 - 根据网络流量、CPU 占用率、内存占用率等作为标准来进行限流

桶漏 & 令牌桶

桶漏和令牌桶指的是限流的两种模型,其目的都是限流以达到限制时间窗口内的平均速率的目的

桶漏是接收请求,并按照一定的速率通过请求

令牌桶是按照固定的速率生产令牌,请求需要获取对应的令牌才能通过

RateLimiter

Guava 是 Java 领域优秀的开源项目,它包含了 Google 在 Java 项目中使用一些核心库

其中就有限流相关的 RateLimiter 工具,底层基于令牌桶思想,提供了平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)多种实现

类图属性如下:

创建

最终的实现类都是 SmoothRateLimiter 的内部类,但是创建方法放在了更上层的 RateLimiter 中(工厂)

调用 RateLimiter.create() 静态方法进行创建

1
2
3
4
5
6
7
8
9
10
11
// NO.1
public static RateLimiter create(double permitsPerSecond) {
return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
}

// NO.2
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
return create(
SleepingStopwatch.createFromSystemTimer(), permitsPerSecond, warmupPeriod, unit, 3.0);
}

SmoothBursty

先看参数列表为 double permitsPerSecond 的方法,SleepingStopwatch.createFromSystemTimer() 创建了一个 SleepingStopwatch 后调用了下一层创建方法

1
2
3
4
5
6
@VisibleForTesting
static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

SleepingStopwatch 的目的是记录创建时间和计算相对时间,整个 RateLimiter 的时间都是相对创建时间而言的,也就是时间的运算依赖 SleepingStopwatch 对象

new SmoothBursty(stopwatch, 1.0) 的参数除了时间相关的对象,就是 maxBurstSeconds = 1 了,可以看出默认的 SmoothBursty 其应对突发流量的设置是多存储时间为 1s 的许可

而后调用了 rateLimiter.setRate(permitsPerSecond),这个先放在后面

SmoothWarmingUp

参数列表为 double permitsPerSecond, long warmupPeriod, TimeUnit unit 的方法,先对参数进行了校验,随后也调用了下一层的创建方法

1
2
3
4
5
6
7
8
9
10
11
@VisibleForTesting
static RateLimiter create(
SleepingStopwatch stopwatch,
double permitsPerSecond,
long warmupPeriod,
TimeUnit unit,
double coldFactor) {
RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

流程和 SmoothBursty 类似,多了 warmupPeriod 相关的参数,也是用来实现 WarmingUp 的关键参数

后面也调用了 rateLimiter.setRate(permitsPerSecond)

设置 rate

rate 的设置关键参数是 permitsPerSecond,这是使用者传入的参数,代表每秒希望生产的许可数量(不一定代表 QPS,因为每次请求许可不一定请求一个)

调用 RateLimitersetRate 方法进行设置,在每次创建 RateLimiter 时,创建方法在实例化后也会调用一次该方法

1
2
3
4
5
6
7
public final void setRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) { // 获取锁
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}

先进行参数校验,随后获取锁调用 doSetRate 方法;这里可以进行一个猜想,设置 rate 和获取许可保证并发的锁一定是同一把,看起来统一封装到了 mutex() 方法里,而后调用了 doSetRate

SmoothRateLimiter 的 doSetRate

RateLimiter 中,doSetRate 是一个抽象方法,SmoothRateLimiter 进行了实现

1
abstract void doSetRate(double permitsPerSecond, long nowMicros);
1
2
3
4
5
6
7
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}

先调用 resync 方法重新计算 this.storedPermitsthis.nextFreeTicketMicros 这两个重要属性;这是一个常用的方法,在后续流程中会有多个操作都调用该方法,具体实现在后面进行分析

根据参数 permitsPerSecond 计算出一秒内许可生产的间隔,赋值给 this.stableIntervalMicros

继续调用 doSetRate

SmoothBursty 的 doSetRate

SmoothRateLimiterdoSetRate 也是一个抽象方法,SmoothBursty 进行了实现

1
abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
// 等比缩放已经存储的许可数量
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}

这里主要计算了新的 maxPermits,并根据旧的 maxPermits,即 oldMaxPermits 计算了 storedPermits 的值

相当于设置了新的 permitsPerSecond,需要计算新的 maxPermits,并等比缩放已经存储的许可数量

获取许可(permits)

创建 RateLimiter 对象后,使用 acquire 或者 tryAcquire 方法来获取许可

1
2
RateLimiter rateLimiter = RateLimiter.create(10);
double sleepTime = rateLimiter.acquire(1);

最终的返回值是等待的时间,并且在返回之前该线程一直在等待

着重看下 acquire 方法

1
2
3
4
5
public double acquire(int permits) { // 参数 permits 代表一次需要获取的许可数量
long microsToWait = reserve(permits); // 调用 reserve 方法
stopwatch.sleepMicrosUninterruptibly(microsToWait); // microsToWait > 0 则 sleep
return 1.0 * microsToWait / SECONDS.toMicros(1L); // 换算成 s 返回
}

主要是通过 reserve 方法,返回的是一个需要等待的微秒值 microsToWait,如果大于 0 则进行 sleep 操作(使用的同样是 Guava 包下的 Uninterruptibles,先不关注)

最终将等待的微秒换算成单位秒返回

reserve

1
2
3
4
5
6
final long reserve(int permits) {
checkPermits(permits); // 参数校验
synchronized (mutex()) { // 上锁
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}

这个方法里主要做了两部分操作

  1. 校验参数 permits 合法性,其实就是是否大于 0 checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);
  2. 使用 synchronized 关键字上锁(mutex 方法返回的是 this.mutexDoNotUseDirectly 一个 Object 对象),上锁后执行 reserveAndGetWaitLength 方法

reserveAndGetWaitLength

1
2
3
4
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0); // 计算一下是不是需要 wait
}

通过方法名 “获取等待长度”,可以知道这里主要是通过 reserveEarliestAvailable 获取一个时间(这个时间在后面的方法中可知是下一次获取许可的时间)后,在次方法最终确认需要等待的时间并返回 max(momentAvailable - nowMicros, 0),如果不需要等待(momentAvailable - nowMicros < 0),则返回 0

arliestAvailable

RateLimiter 是一个抽象类,这个方法是一个抽象方法,由 SmoothRateLimiter 进行了实现

1
abstract long reserveEarliestAvailable(int permits, long nowMicros);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros); // 重新计算 storedPermits 和 nextFreeTicketMicros
long returnValue = nextFreeTicketMicros; // 保存 nextFreeTicketMicros,因为后面可能会被更新
double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 仓库能提供的许可
double freshPermits = requiredPermits - storedPermitsToSpend; // 这次请求超出仓库能力的许可
long waitMicros = // 等待时间
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);

this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 根据等待时间延长下一次能够获取许可的时间
this.storedPermits -= storedPermitsToSpend; // 调整仓库许可数量;即 freshPermits > 0,storedPermits 一定 = 0
return returnValue;
}

第一行就调用了方法 resync(),该方法的目的是重新计算 this.storedPermitsthis.nextFreeTicketMicros 这两个重要属性

接下来的几行操作可以概括为:如果需要获取的许可大于当前已经存储的许可,那么计算出超出的许可数量 freshPermits,调用 storedPermitsToWaitTime 方法返回值加上 超出的许可数量 × 许可生产间隔 即可算出需要额外等待的时长;这里对于 SmoothBursty 来说,返回固定是 0,也就是额外的等待时间就是 超出的许可数量 × 许可生产间隔

最后更新这次操作后 this.nextFreeTicketMicrosthis.storedPermits 的值,返回一开始保存的 this.nextFreeTicketMicros

这里需要思考一个问题:为什么返回的是 resync 后的 nextFreeTicketMicros?如果令牌数量不够,这个值不是被向后更新了吗?这里就体现出 RateLimiter 整体的一个思想,即每一个请求过来本质上都能拿到许可,无非是等待多长时间才能执行,对于超出部分来说,当前线程不用等待,将这部分时间全部交给下一个线程来进行等待了,我自己概括起来就是 惰性计算 + 以当前线程为主

resync

1
2
3
4
5
6
7
8
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}

该方法的目的是重新计算 this.storedPermitsthis.nextFreeTicketMicros 这两个重要属性

如果当前时间超过了之前计算的下一次获取许可的时间,则开始更新:

  1. 先计算新产生的许可数量(coolDownIntervalMicros() 返回的是许可生产的固定间隔)
  2. 更新 this.storedPermits 为合适的值(SmoothBursty 最多只能多存储 1s 的许可)
  3. 更新 this.nextFreeTicketMicros 为参数 nowMicros

预热(WarmingUp)

上面的分析主要是对 SmoothBursty 进行了分析,还提供了一种实现是 SmoothWarmingUp

SmoothWarmingUp 适用于资源需要预热的场景,相比 SmoothBursty 是对突发流量进行保证,通过 maxBurstSeconds 来计算额外存储的令牌数量,SmoothWarmingUp 使用 warmupPeriod 参数来设置预热的时长

由于预热机制的存在,其生产许可的速度是应该是动态的,如下图

其许可数量越多,说明还没到稳定时期,许可的生产间隔就要越长;许可少说明处于稳定期

关键属性计算

根据该图可以计算出一些数值,首先需要确定的是,从 thresholdPermits 到 0 的时间,是从 maxPermitsthresholdPermits 时间的一半,也就是梯形的面积是长方形面积的 2 倍,梯形的面积是 warmupPeriod

因为在创建 SmoothWarmingUp 默认的 coldFactor 为 3 SleepingStopwatch.createFromSystemTimer(), permitsPerSecond, warmupPeriod, unit, 3.0);

梯形面积为 warmupPeriod,而长方形面积为 stableInterval × thresholdPermits,即:

1
warmupPeriod = 2 * stableInterval * thresholdPermits

由此,我们得出 thresholdPermits 的值:

1
thresholdPermits = 0.5 * warmupPeriod / stableInterval

然后我们根据梯形面积的计算公式:

1
warmupPeriod = 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits)

得出 maxPermits 为:

1
maxPermits = thresholdPermits + 2.0 * warmupPeriod / (stableInterval + coldInterval)

这样,我们就得到了 thresholdPermitsmaxPermits 的值

最后是冷却时间的间隔,对应到图中代表的是斜线的斜率,即由稳定到冷却中的速度;在代码中为:

该方法 SmoothBursty 的实现始终返回 0

1
2
3
4
@Override
double coolDownIntervalMicros() {
return warmupPeriodMicros / maxPermits;
}

doSetRate

上面分析了 SmoothBurstydoSetRate 实现,这里来看 SmoothWarmingUp 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
double coldIntervalMicros = stableIntervalMicros * coldFactor; // coldFactor 是固定的 3
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
maxPermits =
thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); // 计算斜率,对边 / 临边
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}

主要是计算了相关的属性并赋值给成员变量,计算方法都和上图一致

acquire 的 storedPermitsToWaitTime

acquire 流程和 SmoothBursty 基本一致,区别在于 storedPermitsToWaitTime 方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 在于这里的方法
+ (long) (freshPermits * stableIntervalMicros);

this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}

waitMicros 由两部分组成,一部分是从 storedPermits 中获取花费的时间,一部分是等待 freshPermits 产生花费的时间;SmoothBursty 的该方法实现固定返回 0,即不存在从 storedPermits 获取许可的花费,而 SmoothWarmingUp 实现较为复杂

由于需要预热,所以从存储的许可中中取许可需要花费一定的时间,其实就是要计算下图中,阴影部分的面积

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
// 如果右边梯形部分有 permits,那么先从右边部分获取 permits,计算梯形部分的阴影部分的面积
if (availablePermitsAboveThreshold > 0.0) {
// 从右边部分获取的 permits 数量
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// TODO(cpovirk): Figure out a good name for this variable.
// 梯形面积公式:(上底 + 下底) × 高 / 2
double length = permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
permitsToTake -= permitsAboveThresholdToTake;
}
// measuring the integral on the left part of the function (the horizontal line)
// 加上长方形部分的阴影面积
micros += (stableIntervalMicros * permitsToTake);
return micros;
}

// 对于给定的 x 值,计算 y 值
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}

最终将这部分等待时间返回,达到预热等待的目的

重点问题

许可从哪来

经过上面的分析,可以得出,所谓的许可是惰性的运算,并不是表面理解的可能存在一个线程在生产许可,而是当请求进入时才会计算许可相关的数据(下一次许可时间、存储的许可数量、满足当前请求许可需要等待的时间等),然后对该请求行为进行处理

并发安全是如何保证的

并发主要出现在两个地方,setRate 设置速率方法和 acquire 获取许可方法内,并且这两个方法也互斥

关键的方法在 reserve()

1
2
3
4
5
6
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}

那么着重关注 mutex() 返回了什么对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Can't be initialized in the constructor because mocks don't call the constructor.
private volatile Object mutexDoNotUseDirectly;

private Object mutex() {
Object mutex = mutexDoNotUseDirectly;
if (mutex == null) {
synchronized (this) {
mutex = mutexDoNotUseDirectly;
if (mutex == null) {
mutexDoNotUseDirectly = mutex = new Object();
}
}
}
return mutex;
}

这里会有几个疑问与答案:

  • Object mutex = mutexDoNotUseDirectly 为什么不直接操作 this.mutexDoNotUseDirectly,要使用一个临时变量
  • mutexDoNotUseDirectly 锁对象的创建有必要使用懒汉式吗?RateLimiter 在实例化后直接执行了 reserve 方法,那么这个懒加载的单例意义是什么
  • 为什么不使用 synchronized (this),包装了一个方法 mutex() 来返回锁对象

第一个问题:减少 volatile 对内存的读请求;mutexDoNotUseDirectly 是可见的,每次读取都会访问内存,使用临时变量对于不需要可见性的场景能减少访问内存的次数

It avoids an additional volatile read of the field once it’s determined to be non-null. 详见 issue:https://github.com/google/guava/issues/3381

第二个问题:源代码注释中也已经解释了目的:// Can't be initialized in the constructor because mocks don't call the constructor.

作者是考虑到使用 Mockito 框架时,用 mock 方法创建 RateLimiter 的 mock 对象不会执行其构造器,该锁对象不会被实例化;并且成员变量直接赋值也会被解释为构造器的一部分

Inline field initialization is syntactic sugar for initializing from the constructor. 详见 issue:https://github.com/google/guava/issues/3066

第三个问题:更方便控制锁粒度,并且避免使用 this 作为锁对象是一种规范,因为无法得知外部是否也是用 this 作为锁来使用

可以参考该回答:https://stackoverflow.com/questions/12397427/what-is-different-between-method-synchronized-vs-object-synchronized

参考

超详细的Guava RateLimiter限流原理解析 - 知乎 (zhihu.com)

Guava限流器RateLimiter中mutexDoNotUseDirectly/锁的使用_DengDengLei的博客-CSDN博客

RateLimiter 源码分析(Guava 和 Sentinel 实现)_一直不懂的博客-CSDN博客_sentinel ratelimiter