贫瘠之地

华北无浪漫,死海扬起帆
多少个夜晚,独自望着天

0%

Redisson RateLimiter

背景

对于单机限流,可以使用 Guava 等工具

如果需要对所有服务进行限流,就需要使用依赖同一个数据资源

简单的方案可以使用 Redis 记录限流相关信息进行实现,Redisson 的 RRateLimiter 就是基于 Redis 实现的全局限流工具,使用了令牌桶的思想

基本使用

创建限流器,尝试设置限流配置

类型为 RateType.OVERALL,为全局限流;permits 数量为 1s 100 个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Slf4j
public class RedissonRateLimiter {

public static void main(String[] args) {
// Redisson 客户端连接 Redis
Config config = new Config();
config.useSingleServer().setAddress("redis://***").setPassword("***");
RedissonClient redissonClient = Redisson.create(config);

// 创建 RRateLimiter
RRateLimiter rateLimiter = redissonClient.getRateLimiter("test.limiter");
// 配置
boolean success = rateLimiter.trySetRate(RateType.OVERALL, 100, 1, RateIntervalUnit.SECONDS);

log.info("success:{}", success);
}
}

修改限流配置

使用 rateLimiter.setRate 方法重新配置限流相关参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
public class RedissonRateLimiter {

public static void main(String[] args) {
Config config = new Config();
config.useSingleServer().setAddress("redis://***").setPassword("***");
RedissonClient redissonClient = Redisson.create(config);

RRateLimiter rateLimiter = redissonClient.getRateLimiter("test.limiter");

// 重新配置
rateLimiter.setRate(RateType.OVERALL, 200, 1, RateIntervalUnit.SECONDS);
}
}

获取令牌

使用 rateLimiter.acquire 等方法获取限流许可

accquiretryAccquire 的区别和大部分限流、锁工具一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
public class RedissonRateLimiter {

public static void main(String[] args) {
Config config = new Config();
config.useSingleServer().setAddress("redis://***").setPassword("***");
RedissonClient redissonClient = Redisson.create(config);

RRateLimiter rateLimiter = redissonClient.getRateLimiter("test.limiter");

// acquire
rateLimiter.acquire(2);

// tryAcquire
boolean success = rateLimiter.tryAcquire(2);
log.info("success:{}", success);
}
}

源码

RRateLimiter

Redisson 中,操作令牌的对象被包装为了 RRateLimiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface RRateLimiter extends RRateLimiterAsync, RExpirable {

boolean trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);

void setRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);

boolean tryAcquire();

boolean tryAcquire(long permits);

void acquire();

void acquire(long permits);

boolean tryAcquire(long timeout, TimeUnit unit);

boolean tryAcquire(long permits, long timeout, TimeUnit unit);

RateLimiterConfig getConfig();

long availablePermits();

}

其实现类 RedissonRateLimiter 继承了 RedissonExpirable

提供了多个方法来拼装业务中需要用到的 key,在后续的 acquire 等方法中将作为 Lua 脚本参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RedissonRateLimiter extends RedissonExpirable implements RRateLimiter {

// 构造方法,主要是 super RedissonExpirable
// RedissonRateLimiter 本身并没有什么成员属性
public RedissonRateLimiter(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}

// 拼装全局 permits key
String getPermitsName() {
return suffixName(getRawName(), "permits");
}

// 拼装单 Redis 实例 permits key
String getClientPermitsName() {
return suffixName(getPermitsName(), getServiceManager().getId());
}

// 拼装全局 value key
String getValueName() {
return suffixName(getRawName(), "value");
}

// 拼装单 Redis 实例 value key
String getClientValueName() {
return suffixName(getValueName(), getServiceManager().getId());
}

整个流程中需要用到以下的 key:

  • name:限流配置
  • suffixName(getRawName(), "permits"):许可授权记录
  • suffixName(getRawName(), "value"):可用许可数量

在下面会详细介绍这几个 key 的作用

限流配置

配置限流参数时,会使用设定的 key name 记录限流的配置信息

setRatetrySetRate 的区别在于一个使用 setNx,一个使用 set 覆盖,并且会删除 permits 和 value 的记录

本质上就是在操作 key 设置限流属性


假设 RRateLimiter 对象设置的 name 为 test.limiter

其中参数 keys:

  1. test.limiter
  2. {test.limiter}:value
  3. {test.limiter}:value:407cd24e-03e1-4745-9684-5aea2ffd4b5f
  4. {test.limiter}:permits
  5. {test.limiter}:permits:407cd24e-03e1-4745-9684-5aea2ffd4b5f

其中 407cd24e-03e1-4745-9684-5aea2ffd4b5f 是 Redisson 生成的标记,为了实现 Redisson 实例级别的限流控制

参数 args:

  1. rate:产出的令牌数量
  2. rateInterval:时间间隔的毫秒值
  3. type:类型;0 是 OVERALL 全局;1 是 PER_CLIENT Redisson 实例级别
1
2
3
4
5
6
7
8
9
10
local valueName = KEYS[2];
local permitsName = KEYS[4];
if ARGV[3] == '1' then
valueName = KEYS[3];
permitsName = KEYS[5];
end
redis.call('hset', KEYS[1], 'rate', ARGV[1]);
redis.call('hset', KEYS[1], 'interval', ARGV[2]);
redis.call('hset', KEYS[1], 'type', ARGV[3]);
redis.call('del', valueName, permitsName);

从脚本来看,主要还是操作 Hash key,设置其 rateintervaltype 成员为参数中的不同值

最后删除 value、permits 的 key(因为变更了 rate,这两个记录就不准确了,Guava 的 RateLimiter 中会平滑变更,Redisson 的实现中粗暴一些,选择了直接删除)

1
2
3
redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);
redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);
return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);

trySetRate 中直接使用 hsetnx,并且没有删除 key 的操作,用于初始化

令牌获取

核心的令牌获取方法在 tryAcquireAsync

acquiretryAcquire 区别在于 tryAcquire 多了 timeout 参数和返回值,用于确定是否等待时间在期望值以内,acquire 则会阻塞,直到获取结果


其中参数 keys:

  1. test.limiter
  2. {test.limiter}:value
  3. {test.limiter}:value:407cd24e-03e1-4745-9684-5aea2ffd4b5f
  4. {test.limiter}:permits
  5. {test.limiter}:permits:407cd24e-03e1-4745-9684-5aea2ffd4b5f

keys 的值和限流配置部分是一样的

参数 args:

  1. value:获取的令牌数量
  2. System.currentTimeMillis:当前时间戳
  3. getServiceManager().generateIdArray():生成的随机字节数,其中随机生成器使用的是 ThreadLocalRandom
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
-- 获取配置,校验是否存在 RateLimiter 配置
local rate = redis.call('hget', KEYS[1], 'rate');
local interval = redis.call('hget', KEYS[1], 'interval');
local type = redis.call('hget', KEYS[1], 'type');
assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')

-- 根据 type 选择不同的 value 和 permits key
local valueName = KEYS[2];"
local permitsName = KEYS[4];"
type == '1' then "
valueName = KEYS[3];
permitsName = KEYS[5];
end;

-- 判断获取的令牌数量是不是已经大于限流器的配置
assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate');

local currentValue = redis.call('get', valueName);
local res;
if currentValue ~= false then
-- 查询时间窗口以外的许可
-- permits 的结构为 score 是发放时间戳;value 是 8 字节随机值 + 许可数量
local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);
local released = 0;
for i, v in ipairs(expiredValues) do
-- 二进制数据,解包规则为 Bc0I
local random, permits = struct.unpack('Bc0I', v);"
-- 记录时间窗口许可数量
released = released + permits;
end;

-- 存在可以释放的许可
if released > 0 then
-- 删除 permits 记录
redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);
-- 重新维护下 value
if tonumber(currentValue) + released > tonumber(rate) then
currentValue = tonumber(rate) - redis.call('zcard', permitsName);
else
currentValue = tonumber(currentValue) + released;
end;
-- 更新
redis.call('set', valueName, currentValue);
end;

-- 是否有足够的令牌满足这次请求
if tonumber(currentValue) < tonumber(ARGV[1]) then
local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores');
-- 没有,返回等待时间
res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));
else
-- 无需等待;更新 permits 和 value
redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1]));
redis.call('decrby', valueName, ARGV[1]);
res = nil;
end;
else
-- value 值不存在,说明是第一次获取令牌
redis.call('set', valueName, rate);
redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1]));
redis.call('decrby', valueName, ARGV[1]);
res = nil;
end;

-- 根据配置更新 value 和 permits 的过期时间
local ttl = redis.call('pttl', KEYS[1]);
if ttl > 0 then
redis.call('pexpire', valueName, ttl);
redis.call('pexpire', permitsName, ttl);
end;
return res;

结合 Lua 脚本中的注释,流程可以简略概括如下:

  1. 校验 RateLimiter 是否存在
  2. 校验获取的令牌数量是不是已经超过 RateLimiter 的 rate 配置;assert
  3. 获取 currentValue,为现有令牌数量
  4. currentValue 存在则查询窗口时间内发放的记录,进行释放
  5. 判断是否有足够令牌满足这次请求;满足直接发放,返回 nil;不满足则返回等待时间 等待时间的计算规则:3 + 令牌产出间隔 - (当前时间 - 最早一次 令牌发放时间)
  6. 根据配置更新 value 和 permits 的过期时间;简单理解,配置不存在了,value 和 permits 记录也没有存在的意义了

流程图

delay 时间处理

Redis 操作最终会返回一个 delay 时间,如果不需要等待,即已经获取到令牌则返回 nil

所以当客户端拿到最后的 delay 值后,将会进行一系列的判断和处理

核心主要是在 tryAcquireAsync 方法,非异步方法就是上游方法使用 get 进行阻塞获取完成结果来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
private CompletableFuture<Boolean> tryAcquireAsync(long permits, long timeoutInMillis) {
long s = System.currentTimeMillis();
RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
return future.thenCompose(delay -> {
// Redis 返回 nil,说明立即签发了令牌
if (delay == null) {
return CompletableFuture.completedFuture(true);
}

// 不限时
if (timeoutInMillis == -1) {
// 返回的 CompletableFuture
CompletableFuture<Boolean> f = new CompletableFuture<>();
getServiceManager().getGroup().schedule(() -> {
// 递归调用 tryAcquireAsync
CompletableFuture<Boolean> r = tryAcquireAsync(permits, timeoutInMillis);
// 转换 CompletableFuture 返回值
commandExecutor.transfer(r, f);
}, delay, TimeUnit.MILLISECONDS);
return f;
}

// 上面操作的耗时(当前时间 - 入口开始时间)
long el = System.currentTimeMillis() - s;
// 剩余期望等待时间
long remains = timeoutInMillis - el;
// 剩余期望等待时间小于 0,返回 false
if (remains <= 0) {
return CompletableFuture.completedFuture(false);
}

CompletableFuture<Boolean> f = new CompletableFuture<>();
// 剩余期望等待时间比需要等待的时间要小
if (remains < delay) {
// 定时返回 false
getServiceManager().getGroup().schedule(() -> {
f.complete(false);
}, remains, TimeUnit.MILLISECONDS);
} else {
// 重新一轮获取
long start = System.currentTimeMillis();
getServiceManager().getGroup().schedule(() -> {
long elapsed = System.currentTimeMillis() - start;
// 过程中超时了
if (remains <= elapsed) {
f.complete(false);
return;
}

CompletableFuture<Boolean> r = tryAcquireAsync(permits, remains - elapsed);
commandExecutor.transfer(r, f);
}, delay, TimeUnit.MILLISECONDS);
}
return f;
}).toCompletableFuture();
}

首先调用了 tryAcquireAsync(RedisCommands.EVAL_LONG, permits) 的重载方法;这个方法处理 delay 结果;重载的方法则是和 Redis 交互令牌获取的逻辑

拿到 Redis 返回时分为几种情况:

  • 为 null 说明不需要等待,已经获取了令牌
  • 没有期望等待时间,生成一个定时任务,delay 为 Redis 返回的等待时间
  • 计算剩余期望等待时间,如果为 0,则直接返回 false
  • 剩余期望等待时间小于 delay,延迟后返回 false
  • 最后开始递归一轮获取

补充

异步任务的递归

Redisson 中的很多设计都是构造一个异步方法,返回值包装为 CompletableFuture,同步接口就是直接调用其 get 方法阻塞等待

对于异步方法如何递归呢,因为最终接口需要返回一个 CompletableFuture,不能 CompletableFuture 嵌套 CompletableFuture,因为上游调用方也不知道递归了几层

Redisson 的方法是在异步递归之前先定义出一个 CompletableFuture 作为返回值,再最后异步执行完后使用 transfer 操作

1
2
3
4
5
6
7
8
9
10
if (timeoutInMillis == -1) {
// 返回的 CompletableFuture
CompletableFuture<Boolean> f = new CompletableFuture<>();
getServiceManager().getGroup().schedule(() -> {
CompletableFuture<Boolean> r = tryAcquireAsync(permits, timeoutInMillis);
// 异步任务中进行 transfer
commandExecutor.transfer(r, f);
}, delay, TimeUnit.MILLISECONDS);
return f;
}

transfer 的操作本质上是使用 whenComplete,将新的最终递归异步任务的返回值(异常)传递给返回值引用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CommandAsyncService implements CommandAsyncExecutor {
...
@Override
public <V> void transfer(CompletionStage<V> future1, CompletableFuture<V> future2) {
future1.whenComplete((res, e) -> {
if (e != null) {
future2.completeExceptionally(e);
return;
}

future2.complete(res);
});
}
...
}

最终实现异步任务的递归操作

参考

详解Redisson分布式限流的实现原理 - 掘金 (juejin.cn)