限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务(定向到错误页或告知资源没有了)、排队或等待(比如秒杀、评论、下单)、降级(返回兜底数据或默认数据,如商品详情页库存默认有货)。
一般开发高并发系统常见的限流有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。
Dubbo Dubbo关于限速的类在dubbo-rpc-api项目中org.apache.dubbo.rpc.filter包里。(2.7.1)
TPSLimitFilter 首先看TPSFilterLimiter,TPSFilterLimiter是一个简单的流量流入限制器,作用于服务提供者。当有请求进来时,看是否有容量能继续处理该请求,有则执行请求,否则抛出RpcException。
判断容量流程:
当前时间是否大于上次容量重置时间+设置的重置时间间隔,是则将容量重置为预设容量,并更新重置时间
如果当前容量大于0,则尝试减去1,成功则放行,否则失败
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class TpsLimitFilter implements Filter { private final TPSLimiter tpsLimiter = new DefaultTPSLimiter(); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 判断限流器是否允许请求 if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) { throw new RpcException( "Failed to invoke service " + invoker.getInterface().getName() + "." + invocation.getMethodName() + " because exceed max service tps."); } return invoker.invoke(invocation); } }
跳转至tpsLimiter.isAllowable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public boolean isAllowable(URL url, Invocation invocation) { // 获取URL中请求频率限制(数量) int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1); // 获取URL中请求数量重置时间(单位时间) long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY, Constants.DEFAULT_TPS_LIMIT_INTERVAL); String serviceKey = url.getServiceKey(); if (rate > 0) { // 状态信息,无则新建 StatItem statItem = stats.get(serviceKey); if (statItem == null) { stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval)); statItem = stats.get(serviceKey); } return statItem.isAllowable(); } else { StatItem statItem = stats.get(serviceKey); if (statItem != null) { stats.remove(serviceKey); } } return true; }
继续转入statItem.isAllowable()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public boolean isAllowable() { long now = System.currentTimeMillis(); // 当前时间是否大于上次重置时间+重置时间间隔 if (now > lastResetTime + interval) { // 重置计数,更新重置时间 token.set(rate); lastResetTime = now; } int value = token.get(); boolean flag = false; // 尝试获取资源 while (value > 0 && !flag) { flag = token.compareAndSet(value, value - 1); value = token.get(); } return flag; }
ExecuteLimitFilter ExecuteLimitFilter是一个限制同时执行数的限制器,同TPSLimitFilter相同作用于服务提供者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class ExecuteLimitFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); // 获取最大并发执行数 int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0); // RpcStatus中保存了调用的一些信息,如当前执行数,执行总次数,失败次数等 // beginCount方法尝试增加method的active即当前执行数,如果原子增加后大于max则减去并返回false,否则成功,且增加URL的active if (!RpcStatus.beginCount(url, methodName, max)) { throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); } // 成功获取容量,继续执行 long begin = System.currentTimeMillis(); boolean isSuccess = true; try { return invoker.invoke(invocation); } catch (Throwable t) { isSuccess = false; if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter", t); } } finally { // 释放资源,减去active,并对URL和method进行统计 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess); } } }
ActiveLimitFilter ActiveLimitFilter是一个限制并发执行数(或占用连接的请求数)的限制器。有别于上面两个限制器,该限制器作用于消费端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); // 获取最大活跃数 int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0); // 根据url和method获取状态信息 RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); // 同ExecuteLimitFilter,尝试获取资源 if (!count.beginCount(url, methodName, max)) { // 获取失败 long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0); long start = System.currentTimeMillis(); // 超时剩余时间 long remain = timeout; synchronized (count) { // 重复尝试获取资源,如获取失败则释放锁并进行等待 // 如果因资源释放被唤醒,则竞争资源 // 或因超时被唤醒 // 在时间耗尽后,则抛出错误 while (!count.beginCount(url, methodName, max)) { try { count.wait(remain); } catch (InterruptedException e) { // ignore } long elapsed = System.currentTimeMillis() - start; remain = timeout - elapsed; if (remain <= 0) { throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + count.getActive() + ". max concurrent invoke limit: " + max); } } } } // 获取资源成功 boolean isSuccess = true; long begin = System.currentTimeMillis(); try { return invoker.invoke(invocation); } catch (RuntimeException t) { isSuccess = false; throw t; } finally { // 释放资源,并通知其他等待资源的线程 count.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess); if (max > 0) { synchronized (count) { count.notifyAll(); } } } }
Spring Cloud Gateway 在之前的实践有提及spring cloud gateway的限流,现在来看看其实现。(版本2.1.1)
其代码在spring-cloud-gateway-core下的 org.springframework.cloud.gateway.filter.ratelimit 包中。
查看 RedisRateLimiter 类的 isAllowed 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public Mono<Response> isAllowed(String routeId, String id) { // 检查组件是否初始化成功 if (!this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); } Config routeConfig = loadConfiguration(routeId); // 允许用户每秒处理多少个请求;令牌桶的填充速率。 int replenishRate = routeConfig.getReplenishRate(); // 令牌桶的容量,允许在一秒钟内的最大请求数 int burstCapacity = routeConfig.getBurstCapacity(); try { // 获取redis keys, 包括一个tokenKey和一个timestampKey List<String> keys = getKeys(id); // The arguments to the LUA script. time() returns unixtime in seconds. // 生成用于执行LUA脚本的参数 // 1. replenishRate, 2. burstCapacity // 3. 当前时间戳, 4. 请求数 这里恒为1 List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1"); // Lua脚本返回参数:允许的请求数,剩下的令牌数 Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs); // 失败处理,进行记录,将结果写入Header回应 return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))) .reduce(new ArrayList<Long>(), (longs, l) -> { longs.addAll(l); return longs; }).map(results -> { boolean allowed = results.get(0) == 1L; Long tokensLeft = results.get(1); Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft)); if (log.isDebugEnabled()) { log.debug("response: " + response); } return response; }); } catch (Exception e) { log.error("Error determining if user allowed from redis", e); } return Mono.just(new Response(true, getHeaders(routeConfig, -1L))); }
来查看lua脚本,位于同一项目下resources/META-INF/scripts下。
关键字为 1.tokenKey 和 2.timestampKey
脚本入参依次为:1. replenishRate(令牌填充速率) 2. burstCapacity(最大请求数) 3. 当前时间戳, 4. 请求数 这里恒为1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 local tokens_key = KEYS[1] local timestamp_key = KEYS[2] local rate = tonumber(ARGV[1]) local capacity = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local requested = tonumber(ARGV[4]) -- 每次填充时间 local fill_time = capacity/rate -- 存活时间 local ttl = math.floor(fill_time*2) -- 从redis获取剩余令牌数 local last_tokens = tonumber(redis.call("get", tokens_key)) -- 不存在则进行初始化 if last_tokens == nil then last_tokens = capacity end -- 从redis获取上次刷新时间 local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end -- 上次刷新至今经过的时间 local delta = math.max(0, now-last_refreshed) -- 填充token后token数量 local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) -- 判断是否能够允许 local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 -- 如果可行,则减去请求数 if allowed then new_tokens = filled_tokens - requested allowed_num = 1 end -- 结果写入redis redis.call("setex", tokens_key, ttl, new_tokens) redis.call("setex", timestamp_key, ttl, now) -- 返回允许数量,剩余token数量 return { allowed_num, new_tokens }