服务容错(一) 重试

当服务调用失败时,如果是幂等性操作,我们可能需要进行重试。

Dubbo

Dubbo重试策略有:

  • Failover Cluster 当出现失败,重试其它服务器。
  • Failfast Cluster 只发起一次调用,失败立即报错,通常用于非幂等性的操作。
  • Failsafe Cluster 出现异常时,直接忽略,通常用于写入审计日志等操作。
  • Failback Cluster 后台记录失败请求,定时重发,通常用于消息通知操作。

它们的实现都在dubbo-cluster下的org.apache.dubbo.rpc.cluster.support包中(Dubbo-2.7.0 从2.7开始,dubbo正式将包名全部从alibaba迁至了apache下)。

包下分别有XXXClusterXXXClusterInvoker,Cluster是返回了一个ClusterInvoker对象,所以实际是调用了ClusterInvoker。ClusterInvoker都继承了AbstractClusterInvoker,AbstractClusterInvoker做了一些基础的实现(列举Invoker,负载均衡 etc..),而** protected abstract Result doInvoke(Invocation invocation, List<Invoker> invokers,
LoadBalance loadbalance) throws RpcException;**则是待实现方法。

接着依次来看其实现:

  • FailoverClusterInvoker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
...
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 服务提供者列表
List<Invoker<T>> copyInvokers = invokers;
// 检查服务提供者列表是否为空
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 从url中获取重试次数 len是总调用次数 = 重试次数 + 1
int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
// 循环调用,出错重试
for (int i = 0; i < len; i++) {
// 每次重试前刷新服务提供者列表以防止过期服务提供者在其中
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
// 负载均衡选择invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
// 添加到 invoker 到 invoked 列表中
invoked.add(invoker);
// 设置 invoked 到 RPC 上下文中
RpcContext.getContext().setInvokers((List) invoked);
try {
// 调用目标 Invoker 的 invoke 方法
Result result = invoker.invoke(invocation);
// 记录上一次调用出错信息
if (le != null && logger.isWarnEnabled()) {
logger.warn(...);
}
return result;
} catch (RpcException e) {
// 业务错误则抛出 不应继续调用
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
// 记录本次调用的错误信息
le = new RpcException(e.getMessage(), e);
} finally {
// 记录调用地址(被调用的服务提供者),用于log
providers.add(invoker.getUrl().getAddress());
}
}// for end
// 循环调用全部失败
throw new RpcException(le.getCode(), ...);
}
}
  • FailfastClusterInvoker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
...
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
// 调用一次并返回结果
try {
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException(...);
}
}
}
  • FailsafeClusterInvoker
1
2
3
4
5
6
7
8
9
10
11
...
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
return new RpcResult(); // ignore
}
}
  • FailbackClusterInvoker

首先看doInvoke方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
Invoker<T> invoker = null;
try {
// 检查列表非空
checkInvokers(invokers, invocation);
// 负载均衡选择
invoker = select(loadbalance, invocation, invokers, null);
// 调用
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
// 将该调用失败任务添加进任务列表定时执行
addFailed(loadbalance, invocation, invokers, invoker);
return new RpcResult(); // ignore
}
}

接着看addFailed方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
...
private volatile Timer failTimer;
...
private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
// 双重检查构建单例HashedWheelTimer,关于HashedWheelTimer参考文末链接
// dubbo实现在org.apache.dubbo.common.time包下
if (failTimer == null) {
synchronized (this) {
if (failTimer == null) {
// NamedThreadFactory生产"failback-cluster-timer"前缀名,true为守护线程
// tickDuration = 1, the time unit of the tickDuration = TimeUnit.SECONDS
// the size of the wheel=32,
// maximum number of pending timeouts=failbackTasks,
// 超过maximum number调用会抛出RejectedExecutionException
failTimer = new HashedWheelTimer(
new NamedThreadFactory("failback-cluster-timer", true),
1,
TimeUnit.SECONDS, 32, failbackTasks);
}
}
}
// 重试任务,当重试次数未超过重试上限retries的时候会进行调用操作,
// 如果调用失败且为超过重试上限时将重试次数+1并将该任务新设超时加入到Timer中
RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
try {
// 将重试任务放入队列
failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
} catch (Throwable e) {
logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
}
}

Ribbon

首先,因Ribbon在spring cloud中是由Hystrix包装调用的,所以在配置超时时Hystrix的超时要大于Ribbon的超时。如Ribbon每次超时为 1 s,超时后重试的次数为 2, 则Hystrix的超时时间应该设为大于 3 s的值。

同时,在使用了Feign的情况下,应禁用掉Feign的重试机制,当前版本默认是禁用的。

然后再来看ribbon重试,ribbon通过三个参数控制重试:

  • maxAutoRetries 对当前服务器的重试次数
  • maxAutoRetriesNextServer 切换服务器的重试次数
  • okToRetryOnAllOperations 对所有操作请求都进行重试

一个配置文件示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
hystrix:
command:
myusers-service:
execution:
isolation:
thread:
timeoutInMilliseconds: 6500 # hystrix超时配置

myusers-service:
ribbon:
NIWSServerListClassName: com.netflix.loadbalancer.ConfigurationBasedServerList
listOfServers: http://example1.com,http://example2.com
ConnectTimeout: 1000 # 请求连接的超时时间
ReadTimeout: 3000 # 请求处理的超时时间
MaxTotalHttpConnections: 500 # 最大总连接数
MaxConnectionsPerHost: 100 # 单机连接最大数目

# 重试相关
maxAutoRetries: 1 # 对当前实例的重试次数
maxAutoRetriesNextServer: 2 # 切换实例的重试次数
okToRetryOnAllOperations: false # 对所有操作请求都进行重试

从官方示例来看,Ribbon执行请求的入口在ribbon-loadbalancer下AbstractLoadBalancerAwareClient的executeWithLoadBalancer方法(Ribbon-2.3.0),需要注意的是这里使用了RxJava

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
// 由请求和请求配置构建command
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

try {
// 返回执行结果
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
// 执行网络请求,由子类实现
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}

}

跳转至LoadBalancerCommand

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
...
// 重试handler 保存了重试信息
private final RetryHandler retryHandler;
// 执行信息 保存了服务器信息 以及已经尝试连接当前服务器次数以及历史服务器次数
private volatile ExecutionInfo executionInfo;
// 服务器信息 host:port
private final Server server;

// 监听调用,保存了历史执行内容 监听列表 etc..
private final ExecutionContextListenerInvoker<?, T> listenerInvoker;

...

public Observable<T> submit(final ServerOperation<T> operation) {
// 执行内容信息上下文,同ExecutionInfo,属性非final
final ExecutionInfoContext context = new ExecutionInfoContext();

// 执行监听器在行为开始时的动作
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
// 同一实例的最大重试次数
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
// 切换实例的重试次数
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

// Use the load balancer
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
public Observable<T> call(Server server) {
// 设定当前服务器信息,重置单机尝试次数,增加切换次数
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);

// Called for each attempt and retry
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
// 增加本服务器的调用次数
context.incAttemptCount();
loadBalancerContext.noteOpenConnection(stats);

if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}

final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();

return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
recordStats(tracer, stats, entity, null);
// TODO: What to do if onNext or onError are never called?
}

@Override
public void onError(Throwable e) {
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server);
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}

@Override
public void onNext(T entity) {
this.entity = entity;
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}

private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}// 里层 call end
});// 里层 Observable<T> o = ... end

// 如果最大重试单台服务器次数大于0,onError时进行maxRetrysSame次重试
// 重试条件:
// 1.非AbortExecutionException错误
// 2.重试次数不能大于maxRetrysSame
// 3.判断是否是打开retry开关
// 4.如果是同一服务器则判断错误是否为可重试的错误,非同一服务器直接返回true
if (maxRetrysSame > 0)
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}// 外层 call end
});// 外层 Observable<T> o = ... end

// 可重试其他服务器
if (maxRetrysNext > 0 && server == null)
o = o.retry(retryPolicy(maxRetrysNext, false));

// 调用全部失败处理
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " + maxRetrysNext
+ " retries, while making a call for: " + context.getServer(), e);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " + maxRetrysSame
+ " retries, while making a call for: " + context.getServer(), e);
}
}
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
});
}