背景
对于单机限流,可以使用 Guava 等工具
如果需要对所有服务进行限流,就需要使用依赖同一个数据资源
简单的方案可以使用 Redis 记录限流相关信息进行实现,Redisson 的
RRateLimiter
就是基于 Redis
实现的全局限流工具,使用了令牌桶的思想
基本使用
创建限流器,尝试设置限流配置
类型为 RateType.OVERALL
,为全局限流;permits 数量为 1s
100 个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Slf4j public class RedissonRateLimiter { public static void main (String[] args) { Config config = new Config (); config.useSingleServer().setAddress("redis://***" ).setPassword("***" ); RedissonClient redissonClient = Redisson.create(config); RRateLimiter rateLimiter = redissonClient.getRateLimiter("test.limiter" ); boolean success = rateLimiter.trySetRate(RateType.OVERALL, 100 , 1 , RateIntervalUnit.SECONDS); log.info("success:{}" , success); } }
修改限流配置
使用 rateLimiter.setRate
方法重新配置限流相关参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Slf4j public class RedissonRateLimiter { public static void main (String[] args) { Config config = new Config (); config.useSingleServer().setAddress("redis://***" ).setPassword("***" ); RedissonClient redissonClient = Redisson.create(config); RRateLimiter rateLimiter = redissonClient.getRateLimiter("test.limiter" ); rateLimiter.setRate(RateType.OVERALL, 200 , 1 , RateIntervalUnit.SECONDS); } }
获取令牌
使用 rateLimiter.acquire
等方法获取限流许可
accquire
和 tryAccquire
的区别和大部分限流、锁工具一致
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Slf4j public class RedissonRateLimiter { public static void main (String[] args) { Config config = new Config (); config.useSingleServer().setAddress("redis://***" ).setPassword("***" ); RedissonClient redissonClient = Redisson.create(config); RRateLimiter rateLimiter = redissonClient.getRateLimiter("test.limiter" ); rateLimiter.acquire(2 ); boolean success = rateLimiter.tryAcquire(2 ); log.info("success:{}" , success); } }
源码
RRateLimiter
Redisson 中,操作令牌的对象被包装为了 RRateLimiter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public interface RRateLimiter extends RRateLimiterAsync , RExpirable { boolean trySetRate (RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit) ; void setRate (RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit) ; boolean tryAcquire () ; boolean tryAcquire (long permits) ; void acquire () ; void acquire (long permits) ; boolean tryAcquire (long timeout, TimeUnit unit) ; boolean tryAcquire (long permits, long timeout, TimeUnit unit) ; RateLimiterConfig getConfig () ; long availablePermits () ; }
其实现类 RedissonRateLimiter
继承了
RedissonExpirable
提供了多个方法来拼装业务中需要用到的 key,在后续的 acquire
等方法中将作为 Lua 脚本参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class RedissonRateLimiter extends RedissonExpirable implements RRateLimiter { public RedissonRateLimiter (CommandAsyncExecutor commandExecutor, String name) { super (commandExecutor, name); } String getPermitsName () { return suffixName(getRawName(), "permits" ); } String getClientPermitsName () { return suffixName(getPermitsName(), getServiceManager().getId()); } String getValueName () { return suffixName(getRawName(), "value" ); } String getClientValueName () { return suffixName(getValueName(), getServiceManager().getId()); }
整个流程中需要用到以下的 key:
name:限流配置
suffixName(getRawName(), "permits"):许可授权记录
suffixName(getRawName(), "value"):可用许可数量
在下面会详细介绍这几个 key 的作用
限流配置
配置限流参数时,会使用设定的 key name 记录限流的配置信息
setRate
和 trySetRate
的区别在于一个使用
setNx
,一个使用 set
覆盖,并且会删除 permits
和 value 的记录
本质上就是在操作 key 设置限流属性
假设 RRateLimiter
对象设置的 name 为
test.limiter
其中参数 keys:
test.limiter
{test.limiter}:value
{test.limiter}:value:407cd24e-03e1-4745-9684-5aea2ffd4b5f
{test.limiter}:permits
{test.limiter}:permits:407cd24e-03e1-4745-9684-5aea2ffd4b5f
其中 407cd24e-03e1-4745-9684-5aea2ffd4b5f 是 Redisson
生成的标记,为了实现 Redisson 实例级别的限流控制
参数 args:
rate:产出的令牌数量
rateInterval:时间间隔的毫秒值
type:类型;0 是 OVERALL 全局;1 是 PER_CLIENT Redisson
实例级别
1 2 3 4 5 6 7 8 9 10 local valueName = KEYS[2 ];local permitsName = KEYS[4 ];if ARGV[3 ] == '1' then valueName = KEYS[3 ]; permitsName = KEYS[5 ]; end redis.call('hset' , KEYS[1 ], 'rate' , ARGV[1 ]); redis.call('hset' , KEYS[1 ], 'interval' , ARGV[2 ]); redis.call('hset' , KEYS[1 ], 'type' , ARGV[3 ]); redis.call('del' , valueName, permitsName);
从脚本来看,主要还是操作 Hash key,设置其
rate
、interval
、type
成员为参数中的不同值
最后删除 value、permits 的 key(因为变更了
rate,这两个记录就不准确了,Guava 的 RateLimiter 中会平滑变更,Redisson
的实现中粗暴一些,选择了直接删除)
1 2 3 redis.call('hsetnx' , KEYS[1 ], 'rate' , ARGV[1 ]); redis.call('hsetnx' , KEYS[1 ], 'interval' , ARGV[2 ]); return redis.call('hsetnx' , KEYS[1 ], 'type' , ARGV[3 ]);
在 trySetRate
中直接使用
hsetnx
,并且没有删除 key 的操作,用于初始化
令牌获取
核心的令牌获取方法在 tryAcquireAsync
中
acquire
和 tryAcquire
区别在于
tryAcquire
多了 timeout
参数和返回值,用于确定是否等待时间在期望值以内,acquire
则会阻塞,直到获取结果
其中参数 keys:
test.limiter
{test.limiter}:value
{test.limiter}:value:407cd24e-03e1-4745-9684-5aea2ffd4b5f
{test.limiter}:permits
{test.limiter}:permits:407cd24e-03e1-4745-9684-5aea2ffd4b5f
keys 的值和限流配置部分是一样的
参数 args:
value:获取的令牌数量
System.currentTimeMillis:当前时间戳
getServiceManager().generateIdArray():生成的随机字节数,其中随机生成器使用的是
ThreadLocalRandom
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 -- 获取配置,校验是否存在 RateLimiter 配置 local rate = redis.call('hget' , KEYS[1 ], 'rate' );local interval = redis.call('hget' , KEYS[1 ], 'interval' );local type = redis.call('hget' , KEYS[1 ], 'type' );assert (rate ~= false and interval ~= false and type ~= false , 'RateLimiter is not initialized' )-- 根据 type 选择不同的 value 和 permits key local valueName = KEYS[2 ];" local permitsName = KEYS[4];" type == '1' then " valueName = KEYS[3]; permitsName = KEYS[5]; end; -- 判断获取的令牌数量是不是已经大于限流器的配置 assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); local currentValue = redis.call('get', valueName); local res; if currentValue ~= false then -- 查询时间窗口以外的许可 -- permits 的结构为 score 是发放时间戳;value 是 8 字节随机值 + 许可数量 local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); local released = 0; for i, v in ipairs(expiredValues) do -- 二进制数据,解包规则为 Bc0I local random, permits = struct.unpack('Bc0I', v);" -- 记录时间窗口许可数量 released = released + permits; end; -- 存在可以释放的许可 if released > 0 then -- 删除 permits 记录 redis.call('zremrangebyscore' , permitsName, 0 , tonumber(ARGV[2 ]) - interval); -- 重新维护下 value if tonumber (currentValue) + released > tonumber(rate) then currentValue = tonumber(rate) - redis.call('zcard' , permitsName); else currentValue = tonumber(currentValue) + released; end; -- 更新 redis.call('set' , valueName, currentValue); end; -- 是否有足够的令牌满足这次请求 if tonumber (currentValue) < tonumber(ARGV[1 ]) then local firstValue = redis.call('zrange' , permitsName, 0 , 0 , 'withscores' ); -- 没有,返回等待时间 res = 3 + interval - (tonumber(ARGV[2 ]) - tonumber(firstValue[2 ])); else -- 无需等待;更新 permits 和 value redis.call('zadd' , permitsName, ARGV[2 ], struct.pack('Bc0I' , string.len(ARGV[3 ]), ARGV[3 ], ARGV[1 ])); redis.call('decrby' , valueName, ARGV[1 ]); res = nil; end; else -- value 值不存在,说明是第一次获取令牌 redis.call('set' , valueName, rate); redis.call('zadd' , permitsName, ARGV[2 ], struct.pack('Bc0I' , string.len(ARGV[3 ]), ARGV[3 ], ARGV[1 ])); redis.call('decrby' , valueName, ARGV[1 ]); res = nil; end; -- 根据配置更新 value 和 permits 的过期时间 local ttl = redis.call('pttl' , KEYS[1 ]);if ttl > 0 then redis.call('pexpire' , valueName, ttl); redis.call('pexpire' , permitsName, ttl); end; return res;
结合 Lua 脚本中的注释,流程可以简略概括如下:
校验 RateLimiter 是否存在
校验获取的令牌数量是不是已经超过 RateLimiter 的 rate
配置;assert
获取 currentValue,为现有令牌数量
currentValue 存在则查询窗口时间内发放的记录,进行释放
判断是否有足够令牌满足这次请求;满足直接发放,返回
nil;不满足则返回等待时间 等待时间的计算规则:3 + 令牌产出间隔 -
(当前时间 - 最早一次 令牌发放时间)
根据配置更新 value 和 permits
的过期时间;简单理解,配置不存在了,value 和 permits
记录也没有存在的意义了
流程图
delay 时间处理
Redis 操作最终会返回一个 delay
时间,如果不需要等待,即已经获取到令牌则返回 nil
所以当客户端拿到最后的 delay 值后,将会进行一系列的判断和处理
核心主要是在 tryAcquireAsync
方法,非异步方法就是上游方法使用 get 进行阻塞获取完成结果来实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 private CompletableFuture<Boolean> tryAcquireAsync (long permits, long timeoutInMillis) { long s = System.currentTimeMillis(); RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits); return future.thenCompose(delay -> { if (delay == null ) { return CompletableFuture.completedFuture(true ); } if (timeoutInMillis == -1 ) { CompletableFuture<Boolean> f = new CompletableFuture <>(); getServiceManager().getGroup().schedule(() -> { CompletableFuture<Boolean> r = tryAcquireAsync(permits, timeoutInMillis); commandExecutor.transfer(r, f); }, delay, TimeUnit.MILLISECONDS); return f; } long el = System.currentTimeMillis() - s; long remains = timeoutInMillis - el; if (remains <= 0 ) { return CompletableFuture.completedFuture(false ); } CompletableFuture<Boolean> f = new CompletableFuture <>(); if (remains < delay) { getServiceManager().getGroup().schedule(() -> { f.complete(false ); }, remains, TimeUnit.MILLISECONDS); } else { long start = System.currentTimeMillis(); getServiceManager().getGroup().schedule(() -> { long elapsed = System.currentTimeMillis() - start; if (remains <= elapsed) { f.complete(false ); return ; } CompletableFuture<Boolean> r = tryAcquireAsync(permits, remains - elapsed); commandExecutor.transfer(r, f); }, delay, TimeUnit.MILLISECONDS); } return f; }).toCompletableFuture(); }
首先调用了
tryAcquireAsync(RedisCommands.EVAL_LONG, permits)
的重载方法;这个方法处理 delay 结果;重载的方法则是和 Redis
交互令牌获取的逻辑
拿到 Redis 返回时分为几种情况:
为 null 说明不需要等待,已经获取了令牌
没有期望等待时间,生成一个定时任务,delay 为 Redis
返回的等待时间
计算剩余期望等待时间,如果为 0,则直接返回 false
剩余期望等待时间小于 delay,延迟后返回 false
最后开始递归一轮获取
补充
异步任务的递归
Redisson 中的很多设计都是构造一个异步方法,返回值包装为
CompletableFuture
,同步接口就是直接调用其 get
方法阻塞等待
对于异步方法如何递归呢,因为最终接口需要返回一个
CompletableFuture
,不能 CompletableFuture
嵌套
CompletableFuture
,因为上游调用方也不知道递归了几层
Redisson 的方法是在异步递归之前先定义出一个
CompletableFuture
作为返回值,再最后异步执行完后使用
transfer 操作
1 2 3 4 5 6 7 8 9 10 if (timeoutInMillis == -1 ) { CompletableFuture<Boolean> f = new CompletableFuture <>(); getServiceManager().getGroup().schedule(() -> { CompletableFuture<Boolean> r = tryAcquireAsync(permits, timeoutInMillis); commandExecutor.transfer(r, f); }, delay, TimeUnit.MILLISECONDS); return f; }
transfer 的操作本质上是使用
whenComplete
,将新的最终递归异步任务的返回值(异常)传递给返回值引用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class CommandAsyncService implements CommandAsyncExecutor { ... @Override public <V> void transfer (CompletionStage<V> future1, CompletableFuture<V> future2) { future1.whenComplete((res, e) -> { if (e != null ) { future2.completeExceptionally(e); return ; } future2.complete(res); }); } ... }
最终实现异步任务的递归操作
参考
详解Redisson分布式限流的实现原理
- 掘金 (juejin.cn)