服务容错(二) 熔断降级

熔断降级

当某个服务持续不可用时,我们不希望继续调用它造成其他服务卡住不可用;此时超时机制并不能改善该状况,会导致整个调用链时间增长且增大负载,而重试机制更会恶化该状况,此时就需要熔断了。

熔断在服务持续不可用时,从消费者的可用服务提供者的列表中将该服务提供者标记为短路状态,在调用时忽略该提供者以避免上述提及状况。

降级则是在服务提供者不可用时进行本地操作调用。

Dubbo

Dubbo本身并未提供熔断机制,仅提供简单的降级操作,通过配置mock值:

  • return => return null
  • fail => default
  • force => default
  • fail:throw/return foo => throw/return foo
  • force:throw/return foo => throw/return foo
  • fail/force:className => className::method()

这里的返回foo可以是对应的json字符串,true,false,null,数字,字符串…

其中 default 会去寻找 fooServiceMock类 下的同名方法来执行。如果指定了类,则执行对应类的同名方法。

其实现在org.apache.dubbo.rpc.cluster.support.wrapper包下的MockClusterInvoker类中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class MockClusterInvoker<T> implements Invoker<T> {
...
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;

// 从配置中获取mock参数值
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
// 服务提供者未设定或设定为false则直接进行rpc调用
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
//no mock
result = this.invoker.invoke(invocation);

// 服务提供者设定为强制使用本地mock
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);

// 正常调用,在失败后才降级
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}

if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}
// 失败后的降级调用
// 寻找对应的mockInvoker调用invoke
result = doMockInvoke(invocation, e);
}
}
return result;
}


...
}

mock进行调用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
final public class MockInvoker<T> implements Invoker<T> {
...
public Result invoke(Invocation invocation) throws RpcException {
// 获取url中mock属性
String mock = getUrl().getParameter(invocation.getMethodName() + "." + Constants.MOCK_KEY);
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(this);
}
if (StringUtils.isBlank(mock)) {
mock = getUrl().getParameter(Constants.MOCK_KEY);
}

if (StringUtils.isBlank(mock)) {
throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url));
}
// 解析并修正mock参数
// 将fail/force去掉,如果未设置参数则改为default/null etc
// eg. fail:throw/return foo => throw/return foo
mock = normalizeMock(URL.decode(mock));

// return行为
if (mock.startsWith(Constants.RETURN_PREFIX)) {
mock = mock.substring(Constants.RETURN_PREFIX.length()).trim();
// 将mock字符串解析为对应方法的返回值类型并返回
try {
Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
Object value = parseMockValue(mock, returnTypes);
return new RpcResult(value);
} catch (Exception ew) {
throw new RpcException("mock return invoke error. method :" + invocation.getMethodName()
+ ", mock:" + mock + ", url: " + url, ew);
}

// 抛出行为
} else if (mock.startsWith(Constants.THROW_PREFIX)) {
mock = mock.substring(Constants.THROW_PREFIX.length()).trim();
// mock空时抛出默认异常信息
if (StringUtils.isBlank(mock)) {
throw new RpcException("mocked exception for service degradation.");
} else { // user customized class
// mock作为类名,获取该异常类进行抛出
// 如果throws缓存小于1000(硬编码)则将该异常类缓存
Throwable t = getThrowable(mock);
throw new RpcException(RpcException.BIZ_EXCEPTION, t);
}

// 执行自定义类
} else { //impl mock
try {
// 如果mock为default或空,则寻找fooServiceMock类下的同名方法,否则寻找类名为mock的同名方法
// 如果mocks缓存数量小于10000(硬编码)还会将该mockInvoker进行缓存
Invoker<T> invoker = getInvoker(mock);
return invoker.invoke(invocation);
} catch (Throwable t) {
throw new RpcException("Failed to create mock implementation class " + mock, t);
}
}
}
...
}

Hystrix

Hystrix 是 Netflix 公司为解决分布式服务容错所提供的一个组件,它也提供了熔断降级的功能。

流程

hystrix-command-flow-chart

上图是 Hystrix 的流程图,当发起一个请求时:

1.构建一个 Command

构建 Command 对象,把依赖调用封装进 run 方法中;它可以是 HystrixCommand 来处理单个响应,或使用 HystrixObservableCommand 来处理一系列的观察事件(参见RxJava)。但事实上,HystrixCommand 最终仍然是通过 Observable 来实现的。

2.调用方法执行命令

1
2
3
4
K             value   = command.execute(); // 阻塞
Future<K> fValue = command.queue(); // 异步
Observable<K> ohValue = command.observe(); //hot observable 参见RxJava
Observable<K> ocValue = command.toObservable(); //cold observable 参见RxJava

3.查询本地缓存

如果该 Command 的本地缓存开启,则查询本地缓存,如命中则直接返回。

4.短路状态查询

如果短路,则直接跳到第8步,进行降级操作。

5.容量查询

查询线程池/队列/信号量是否有足够的资源执行 Command ,如无,则跳至 8 ,进行降级操作。

6.执行 Command

执行操作,如果执行失败或超时,则跳至 8 ,进行降级操作。

7.结果反馈

将5,6的执行结果反馈,计入状态统计。如果达到熔断指标则将服务短路。

8.降级

执行降级操作,如果没有降级方法或降级方法执行失败,则抛出错误。

9.返回结果

熔断机制

下面的图展示了HystrixCommand和HystrixObservableCommand如何与HystrixCircuitBroker进行交互。

circuit-breaker.png

短路逻辑如下:

  1. 请求满足了一定的阈值(HystrixCommandProperties.circuitBreakerRequestVolumeThreshold())

  2. 错误发生所占的百分比超过了设定的错误发生的阈值(HystrixCommandProperties.circuitBreakerErrorThresholdPercentage())

  3. 短路器状态由CLOSE转为OPEN

  4. 当短路器状态转为OPEN,则熔断所有的请求

  5. 一定时间之后HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds(),下一个的请求会被通过(Half Open),如果该请求执行失败,回路器会在睡眠窗口期间返回OPEN;如果该请求成功,重置为CLOSE,重新从1开始判断。

代码其实现如下:

在hystrix-core项目中,AbstractCommand类下首先与短路器进行交互。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_cmd);

/* determine if we're allowed to execute */
// 短路器是否允许请求,参见流程图右上
// 即是否短路
if (circuitBreaker.allowRequest()) {
// 获取信号量
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};

final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};

// 尝试获取信号量
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
// 执行命令,并添加相应处理
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
// 获取信号量失败,则进行降级处理
return handleSemaphoreRejectionViaFallback();
}
} else {
// 处于短路状态,进行降级处理
return handleShortCircuitViaFallback();
}
}

...

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

...

// 成功回调,重置短路状态
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
}
}
};

// 失败回调,对Exception进行相应处理
// 在handler里,则会进行降级操作(Fallback)
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}

return handleFailureViaFallback(e);
}
}
};

...

Observable<R> execution;
// 是否开启超时设定,是则添加超时操作
// executeCommandWithSpecifiedIsolation:
// 线程隔离,状态判断etc...
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}

return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}

再来看短路器 HystrixCircuitBreaker,这是个接口,其实现为HystrixCircuitBreakerImpl。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
private final HystrixCommandProperties properties;
private final HystrixCommandMetrics metrics;

/* track whether this circuit is open/closed at any given point in time (default to false==closed) */
private AtomicBoolean circuitOpen = new AtomicBoolean(false);

/* when the circuit was marked open or was last allowed to try a 'singleTest' */
private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();

/**
* 执行成功
* 在短路器打开时,将其关闭,并重置计数流
*/
public void markSuccess() {
if (circuitOpen.get()) {
if (circuitOpen.compareAndSet(true, false)) {
//win the thread race to reset metrics
//Unsubscribe from the current stream to reset the health counts stream. This only affects the health counts view,
//and all other metric consumers are unaffected by the reset
metrics.resetStream();
}
}
}

// 判断是否允许请求(短路)
public boolean allowRequest() {
// 配置文件强制打开短路器,则拒绝所有请求
if (properties.circuitBreakerForceOpen().get()) {
// properties have asked us to force the circuit open so we will allow NO requests
return false;
}
// 配置文件强制关闭短路器,但仍然调用isOpen()方法进行统计
if (properties.circuitBreakerForceClosed().get()) {
// we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
isOpen();
// properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
return true;
}
return !isOpen() || allowSingleTest();
}

public boolean isOpen() {
// 获取短路器状态,如果打开则直接返回
if (circuitOpen.get()) {
// if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
return true;
}

// we're closed, so let's see if errors have made us so we should trip the circuit open
// 获取计数器
HealthCounts health = metrics.getHealthCounts();

// check if we are past the statisticalWindowVolumeThreshold
// 检查请求总数是否大于阈值
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
return false;
}

// 请求总数已大于阈值
// 则检查错误百分比是否大于阈值
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// 两者都大于阈值,则将开关打开
// our failure rate is too high, trip the circuit
if (circuitOpen.compareAndSet(false, true)) {
// if the previousValue was false then we want to set the currentTime
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
// 开关打开失败(可能其他线程打开了),但阈值仍然是超过的,所以仍然返回true
// How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
// caused another thread to set it to true already even though we were in the process of doing the same
// In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
return true;
}
}
}

/**
* 是否允许单个请求进行测试(half-open)
* 在短路一段时间(超过睡眠窗口时间)后,允许一个请求来尝试以判断连通状态
*/
public boolean allowSingleTest() {
// 短路器打开时间或上次测试连通时间
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 1) if the circuit is open
// 2) and it's been longer than 'sleepWindow' since we opened the circuit
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
// If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
// 尝试更新测试时间
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
// if this returns true that means we set the time so we'll return true to allow the singleTest
// if it returned false it means another thread raced us and allowed the singleTest before we did
return true;
}
}
return false;
}