服务容错(三) 限流

限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务(定向到错误页或告知资源没有了)、排队或等待(比如秒杀、评论、下单)、降级(返回兜底数据或默认数据,如商品详情页库存默认有货)。

一般开发高并发系统常见的限流有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。

Dubbo

Dubbo关于限速的类在dubbo-rpc-api项目中org.apache.dubbo.rpc.filter包里。(2.7.1)

TPSLimitFilter

首先看TPSFilterLimiter,TPSFilterLimiter是一个简单的流量流入限制器,作用于服务提供者。当有请求进来时,看是否有容量能继续处理该请求,有则执行请求,否则抛出RpcException。

判断容量流程:

  1. 当前时间是否大于上次容量重置时间+设置的重置时间间隔,是则将容量重置为预设容量,并更新重置时间

  2. 如果当前容量大于0,则尝试减去1,成功则放行,否则失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TpsLimitFilter implements Filter {
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 判断限流器是否允许请求
if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
throw new RpcException(
"Failed to invoke service " +
invoker.getInterface().getName() +
"." +
invocation.getMethodName() +
" because exceed max service tps.");
}

return invoker.invoke(invocation);
}

}

跳转至tpsLimiter.isAllowable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public boolean isAllowable(URL url, Invocation invocation) {
// 获取URL中请求频率限制(数量)
int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
// 获取URL中请求数量重置时间(单位时间)
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
Constants.DEFAULT_TPS_LIMIT_INTERVAL);
String serviceKey = url.getServiceKey();
if (rate > 0) {
// 状态信息,无则新建
StatItem statItem = stats.get(serviceKey);
if (statItem == null) {
stats.putIfAbsent(serviceKey,
new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
}
return statItem.isAllowable();
} else {
StatItem statItem = stats.get(serviceKey);
if (statItem != null) {
stats.remove(serviceKey);
}
}

return true;
}

继续转入statItem.isAllowable()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean isAllowable() {
long now = System.currentTimeMillis();
// 当前时间是否大于上次重置时间+重置时间间隔
if (now > lastResetTime + interval) {
// 重置计数,更新重置时间
token.set(rate);
lastResetTime = now;
}

int value = token.get();
boolean flag = false;
// 尝试获取资源
while (value > 0 && !flag) {
flag = token.compareAndSet(value, value - 1);
value = token.get();
}

return flag;
}
ExecuteLimitFilter

ExecuteLimitFilter是一个限制同时执行数的限制器,同TPSLimitFilter相同作用于服务提供者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ExecuteLimitFilter implements Filter {

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
// 获取最大并发执行数
int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
// RpcStatus中保存了调用的一些信息,如当前执行数,执行总次数,失败次数等
// beginCount方法尝试增加method的active即当前执行数,如果原子增加后大于max则减去并返回false,否则成功,且增加URL的active
if (!RpcStatus.beginCount(url, methodName, max)) {
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " +
url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
"\" /> limited.");
}

// 成功获取容量,继续执行
long begin = System.currentTimeMillis();
boolean isSuccess = true;
try {
return invoker.invoke(invocation);
} catch (Throwable t) {
isSuccess = false;
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
} finally {
// 释放资源,减去active,并对URL和method进行统计
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
}
}

}
ActiveLimitFilter

ActiveLimitFilter是一个限制并发执行数(或占用连接的请求数)的限制器。有别于上面两个限制器,该限制器作用于消费端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
// 获取最大活跃数
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
// 根据url和method获取状态信息
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
// 同ExecuteLimitFilter,尝试获取资源
if (!count.beginCount(url, methodName, max)) {
// 获取失败
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
// 超时剩余时间
long remain = timeout;
synchronized (count) {
// 重复尝试获取资源,如获取失败则释放锁并进行等待
// 如果因资源释放被唤醒,则竞争资源
// 或因超时被唤醒
// 在时间耗尽后,则抛出错误
while (!count.beginCount(url, methodName, max)) {
try {
count.wait(remain);
} catch (InterruptedException e) {
// ignore
}
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + count.getActive()
+ ". max concurrent invoke limit: " + max);
}
}
}
}

// 获取资源成功
boolean isSuccess = true;
long begin = System.currentTimeMillis();
try {
return invoker.invoke(invocation);
} catch (RuntimeException t) {
isSuccess = false;
throw t;
} finally {
// 释放资源,并通知其他等待资源的线程
count.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
if (max > 0) {
synchronized (count) {
count.notifyAll();
}
}
}
}

Spring Cloud Gateway

在之前的实践有提及spring cloud gateway的限流,现在来看看其实现。(版本2.1.1)

其代码在spring-cloud-gateway-core下的 org.springframework.cloud.gateway.filter.ratelimit 包中。

查看 RedisRateLimiter 类的 isAllowed 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public Mono<Response> isAllowed(String routeId, String id) {
// 检查组件是否初始化成功
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}

Config routeConfig = loadConfiguration(routeId);

// 允许用户每秒处理多少个请求;令牌桶的填充速率。
int replenishRate = routeConfig.getReplenishRate();

// 令牌桶的容量,允许在一秒钟内的最大请求数
int burstCapacity = routeConfig.getBurstCapacity();

try {
// 获取redis keys, 包括一个tokenKey和一个timestampKey
List<String> keys = getKeys(id);

// The arguments to the LUA script. time() returns unixtime in seconds.
// 生成用于执行LUA脚本的参数
// 1. replenishRate, 2. burstCapacity
// 3. 当前时间戳, 4. 请求数 这里恒为1
List<String> scriptArgs = Arrays.asList(replenishRate + "",
burstCapacity + "", Instant.now().getEpochSecond() + "", "1");
// Lua脚本返回参数:允许的请求数,剩下的令牌数
Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys,
scriptArgs);

// 失败处理,进行记录,将结果写入Header回应

return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}).map(results -> {
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);

Response response = new Response(allowed,
getHeaders(routeConfig, tokensLeft));

if (log.isDebugEnabled()) {
log.debug("response: " + response);
}
return response;
});
}
catch (Exception e) {
log.error("Error determining if user allowed from redis", e);
}
return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
}

来查看lua脚本,位于同一项目下resources/META-INF/scripts下。

关键字为 1.tokenKey 和 2.timestampKey

脚本入参依次为:1. replenishRate(令牌填充速率) 2. burstCapacity(最大请求数) 3. 当前时间戳, 4. 请求数 这里恒为1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

-- 每次填充时间
local fill_time = capacity/rate
-- 存活时间
local ttl = math.floor(fill_time*2)

-- 从redis获取剩余令牌数
local last_tokens = tonumber(redis.call("get", tokens_key))
-- 不存在则进行初始化
if last_tokens == nil then
last_tokens = capacity
end

-- 从redis获取上次刷新时间
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end

-- 上次刷新至今经过的时间
local delta = math.max(0, now-last_refreshed)
-- 填充token后token数量
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
-- 判断是否能够允许
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0

-- 如果可行,则减去请求数
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end

-- 结果写入redis
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)

-- 返回允许数量,剩余token数量
return { allowed_num, new_tokens }