1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
| ... // 重试handler 保存了重试信息 private final RetryHandler retryHandler; // 执行信息 保存了服务器信息 以及已经尝试连接当前服务器次数以及历史服务器次数 private volatile ExecutionInfo executionInfo; // 服务器信息 host:port private final Server server;
// 监听调用,保存了历史执行内容 监听列表 etc.. private final ExecutionContextListenerInvoker<?, T> listenerInvoker; ... public Observable<T> submit(final ServerOperation<T> operation) { // 执行内容信息上下文,同ExecutionInfo,属性非final final ExecutionInfoContext context = new ExecutionInfoContext(); // 执行监听器在行为开始时的动作 if (listenerInvoker != null) { try { listenerInvoker.onExecutionStart(); } catch (AbortExecutionException e) { return Observable.error(e); } } // 同一实例的最大重试次数 final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); // 切换实例的重试次数 final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer Observable<T> o = (server == null ? selectServer() : Observable.just(server)) .concatMap(new Func1<Server, Observable<T>>() { @Override // Called for each server being selected public Observable<T> call(Server server) { // 设定当前服务器信息,重置单机尝试次数,增加切换次数 context.setServer(server); final ServerStats stats = loadBalancerContext.getServerStats(server); // Called for each attempt and retry Observable<T> o = Observable .just(server) .concatMap(new Func1<Server, Observable<T>>() { @Override public Observable<T> call(final Server server) { // 增加本服务器的调用次数 context.incAttemptCount(); loadBalancerContext.noteOpenConnection(stats); if (listenerInvoker != null) { try { listenerInvoker.onStartWithServer(context.toExecutionInfo()); } catch (AbortExecutionException e) { return Observable.error(e); } } final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start(); return operation.call(server).doOnEach(new Observer<T>() { private T entity; @Override public void onCompleted() { recordStats(tracer, stats, entity, null); // TODO: What to do if onNext or onError are never called? }
@Override public void onError(Throwable e) { recordStats(tracer, stats, null, e); logger.debug("Got error {} when executed on server {}", e, server); if (listenerInvoker != null) { listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo()); } }
@Override public void onNext(T entity) { this.entity = entity; if (listenerInvoker != null) { listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo()); } } private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) { tracer.stop(); loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler); } }); }// 里层 call end });// 里层 Observable<T> o = ... end // 如果最大重试单台服务器次数大于0,onError时进行maxRetrysSame次重试 // 重试条件: // 1.非AbortExecutionException错误 // 2.重试次数不能大于maxRetrysSame // 3.判断是否是打开retry开关 // 4.如果是同一服务器则判断错误是否为可重试的错误,非同一服务器直接返回true if (maxRetrysSame > 0) o = o.retry(retryPolicy(maxRetrysSame, true)); return o; }// 外层 call end });// 外层 Observable<T> o = ... end // 可重试其他服务器 if (maxRetrysNext > 0 && server == null) o = o.retry(retryPolicy(maxRetrysNext, false)); // 调用全部失败处理 return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() { @Override public Observable<T> call(Throwable e) { if (context.getAttemptCount() > 0) { if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), e); } else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED, "Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), e); } } if (listenerInvoker != null) { listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo()); } return Observable.error(e); } }); }
|