// 抛出行为 } else if (mock.startsWith(Constants.THROW_PREFIX)) { mock = mock.substring(Constants.THROW_PREFIX.length()).trim(); // mock空时抛出默认异常信息 if (StringUtils.isBlank(mock)) { throw new RpcException("mocked exception for service degradation."); } else { // user customized class // mock作为类名,获取该异常类进行抛出 // 如果throws缓存小于1000(硬编码)则将该异常类缓存 Throwable t = getThrowable(mock); throw new RpcException(RpcException.BIZ_EXCEPTION, t); }
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd);
/* determine if we're allowed to execute */ // 短路器是否允许请求,参见流程图右上 // 即是否短路 if (circuitBreaker.allowRequest()) { // 获取信号量 final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } };
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; // 尝试获取信号量 if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); // 执行命令,并添加相应处理 return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { // 获取信号量失败,则进行降级处理 return handleSemaphoreRejectionViaFallback(); } } else { // 处于短路状态,进行降级处理 return handleShortCircuitViaFallback(); } } ... private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); ... // 成功回调,重置短路状态 final Action0 markOnCompleted = new Action0() { @Override public void call() { if (!commandIsScalar()) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); circuitBreaker.markSuccess(); } } }; // 失败回调,对Exception进行相应处理 // 在handler里,则会进行降级操作(Fallback) final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); }
private final HystrixCommandProperties properties; private final HystrixCommandMetrics metrics;
/* track whether this circuit is open/closed at any given point in time (default to false==closed) */ private AtomicBoolean circuitOpen = new AtomicBoolean(false);
/* when the circuit was marked open or was last allowed to try a 'singleTest' */ private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong(); /** * 执行成功 * 在短路器打开时,将其关闭,并重置计数流 */ public void markSuccess() { if (circuitOpen.get()) { if (circuitOpen.compareAndSet(true, false)) { //win the thread race to reset metrics //Unsubscribe from the current stream to reset the health counts stream. This only affects the health counts view, //and all other metric consumers are unaffected by the reset metrics.resetStream(); } } } // 判断是否允许请求(短路) public boolean allowRequest() { // 配置文件强制打开短路器,则拒绝所有请求 if (properties.circuitBreakerForceOpen().get()) { // properties have asked us to force the circuit open so we will allow NO requests return false; } // 配置文件强制关闭短路器,但仍然调用isOpen()方法进行统计 if (properties.circuitBreakerForceClosed().get()) { // we still want to allow isOpen() to perform it's calculations so we simulate normal behavior isOpen(); // properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through return true; } return !isOpen() || allowSingleTest(); } public boolean isOpen() { // 获取短路器状态,如果打开则直接返回 if (circuitOpen.get()) { // if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close return true; }
// we're closed, so let's see if errors have made us so we should trip the circuit open // 获取计数器 HealthCounts health = metrics.getHealthCounts();
// check if we are past the statisticalWindowVolumeThreshold // 检查请求总数是否大于阈值 if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything return false; } // 请求总数已大于阈值 // 则检查错误百分比是否大于阈值 if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { return false; } else { // 两者都大于阈值,则将开关打开 // our failure rate is too high, trip the circuit if (circuitOpen.compareAndSet(false, true)) { // if the previousValue was false then we want to set the currentTime circuitOpenedOrLastTestedTime.set(System.currentTimeMillis()); return true; } else { // 开关打开失败(可能其他线程打开了),但阈值仍然是超过的,所以仍然返回true // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have // caused another thread to set it to true already even though we were in the process of doing the same // In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open return true; } } } /** * 是否允许单个请求进行测试(half-open) * 在短路一段时间(超过睡眠窗口时间)后,允许一个请求来尝试以判断连通状态 */ public boolean allowSingleTest() { // 短路器打开时间或上次测试连通时间 long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get(); // 1) if the circuit is open // 2) and it's been longer than 'sleepWindow' since we opened the circuit if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) { // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try. // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'. // 尝试更新测试时间 if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) { // if this returns true that means we set the time so we'll return true to allow the singleTest // if it returned false it means another thread raced us and allowed the singleTest before we did return true; } } return false; }