resilience4j-circuit-breaker

fxz大约 17 分钟resilience4j熔断

resilience4j-circuit-breaker

CircuitBreaker 通过有限状态机实现,具有三种正常状态:CLOSED、OPEN 和 HALF_OPEN 以及两种特殊状态 DISABLED 和 FORCED_OPEN。

CircuitBreaker 使用滑动窗口来存储和聚合调用的结果。您可以在基于计数的滑动窗口和基于时间的滑动窗口之间进行选择。

基于计数的滑动窗口聚合最后 N 次调用的结果。

基于时间的滑动窗口聚合了最后 N 秒的调用结果。

state
state

那么resilience4j是如何实现断路器效果的呢?我们从基础的断路器api使用来入手,看resilience4j circuit breaker是如何使用进行状态机进行断路效果的。这是最简单的一行调用代码。

String result = circuitBreaker.executeSupplier(backendService::doSomething);

为什么将我们要执行的方法引用传给circuitBreaker的executeSupplier,就能达到断路保护的效果呢?点进方法看一下,其实circuitBreaker通过装饰器模式,对我们的方法进行了增强,达到断路保护的效果:

装饰增强

CircuitBreaker:

/**
 * 装饰并执行装饰供应商。
 */
default <T> T executeSupplier(Supplier<T> supplier) {
    return decorateSupplier(this, supplier).get();
}

// 通过circuitBreaker 进行supplier的装饰增强 达到断路保护的效果
static <T> Supplier<T> decorateSupplier(CircuitBreaker circuitBreaker, Supplier<T> supplier) {
        return () -> {
            // 获取权限
            circuitBreaker.acquirePermission();
          
            final long start = circuitBreaker.getCurrentTimestamp();
            try {
              	// 方法执行逻辑
                T result = supplier.get();
              
              	// 统计方法耗时
                long duration = circuitBreaker.getCurrentTimestamp() - start;
              
                // 调用成功 让断路器感知
                circuitBreaker.onResult(duration, circuitBreaker.getTimestampUnit(), result);
              
              	// 返回结果
                return result;
            } catch (Exception exception) {
                // 处理异常的情况
                long duration = circuitBreaker.getCurrentTimestamp() - start;
              
                // 调用失败 让断路器感知
                circuitBreaker.onError(duration, circuitBreaker.getTimestampUnit(), exception);
              
              	// 异常还得往外抛
                throw exception;
            }
        };
}

上面增强后的代码分为怎么几步:获取执行权限、执行方法逻辑、断路器感知执行结果。

获取执行权限

这一步的职责很清晰,就是判断能不能执行。可以看到acquirePermission方法调用了stateReference,stateReference其实就是我们的状态引用。如果不允许调用的话acquirePermission方法会抛出异常。

CircuitBreakerStateMachinepublic void acquirePermission() {
    try {
        // 获取权限
        stateReference.get().acquirePermission();
    } catch (Exception e) {
      	// 发布没有调用权限事件
        publishCallNotPermittedEvent();
      	// 异常向外抛
        throw e;
    }
}

前面我们说了,状态机有几种有限的状态,我们这里主要看关闭、半开、开放状态。

关闭状态下,允许调用,不会抛出任何异常,因为这时断路器并没有打开。

ClosedState/**
 * 不会抛出异常,因为断路器已关闭。
 */
@Override
public void acquirePermission() {
    // 不处理
}

半开状态下,需要根据配置的允许调用次数进行判断,如果次数大于0,那么允许调用,否则拒绝调用。可以看到,其实就是利用juc包中的类去线程安全的更新变量permittedNumberOfCalls值。

// 获取执行权限
@Override
public void acquirePermission() {
    // 半开状态下 需要根据允许的调用次数来判断是否可以调用
    if (!tryAcquirePermission()) {
        throw CallNotPermittedException
            .createCallNotPermittedException(CircuitBreakerStateMachine.this);
    }
}
// 尝试获取执行权限 根据剩余的允许调用次数判断 大于0则返回true 否则返回false
@Override
public boolean tryAcquirePermission() {
    // 如果允许的调用次数大于0 返回true
    if (permittedNumberOfCalls.getAndUpdate(current -> current == 0 ? current : --current)
        > 0) {
        return true;
    }

    // 记录不可调用事件
    circuitBreakerMetrics.onCallNotPermitted();
    return false;
}

那么还有一种状态,就是打开状态,他应该是直接抛出异常,不允许调用了吧?其实不是,因为resilience4中打开状态经过一段时间是可以自动转为半开状态的,所以是需要判断的。

// 获取执行权限
@Override
public void acquirePermission() {
  	// 尝试获取执行权限
    if (!tryAcquirePermission()) {
        throw CallNotPermittedException
            .createCallNotPermittedException(CircuitBreakerStateMachine.this);
    }
}
// 尝试获取执行权限
@Override
public boolean tryAcquirePermission() {
    // 检查等待时间是否已过 可以由打开状态过渡到半开状态
    if (clock.instant().isAfter(retryAfterWaitDuration)) {
        // 过渡到半开状态
        toHalfOpenState();
      
        // 检查状态转换后是否允许调用在HALF_OPEN状态下运行
        boolean callPermitted = stateReference.get().tryAcquirePermission();
      
        if (!callPermitted) {
          
            // 如果不允许调用 发布不允许调用事件
            publishCallNotPermittedEvent();
          
            // 记录不允许调用指标
            circuitBreakerMetrics.onCallNotPermitted();
        }
      
        return callPermitted;
    }

    // 如果等待时间未过 则不允许调用 返回false
    circuitBreakerMetrics.onCallNotPermitted();
    return false;
}

// open 状态转为 half open状态
private synchronized void toHalfOpenState() {
  	// 更新open标识
    if (isOpen.compareAndSet(true, false)) {
      	// 更新状态
        transitionToHalfOpenState();
    }
}

// 更新状态为HALF_OPEN
@Override
public void transitionToHalfOpenState() {
    stateTransition(HALF_OPEN, currentState -> new HalfOpenState(currentState.attempts()));
}

// 更新状态引用
private void stateTransition(State newState,
   UnaryOperator<CircuitBreakerState> newStateGenerator) {
   CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> {
       StateTransition.transitionBetween(getName(), currentState.getState(), newState);
       currentState.preTransitionHook();
       return newStateGenerator.apply(currentState);
    });
    publishStateTransitionEvent(
            StateTransition.transitionBetween(getName(), previousState.getState(), newState));
}

可以看到,在open状态下,其实会判断一下是否将状态转换为半开状态,然后在进行判断。并不是直接返回false。

综上,方法在执行前先获取执行权限,对于关闭状态直接允许调用,对于半开状态判断调用次数是否大于0,对于打开状态判断是否可以转为半开状态。

断路器感知执行结果

方法执行完成,需要让断路器感知执行的结果。

// 调用成功 让断路器感知
circuitBreaker.onResult(duration, circuitBreaker.getTimestampUnit(), result);  

onResult

可以看到,核心的代码在stateReference的onError和onSuccess方法,里面会记录调用的情况,根据调用的情况进行状态的转换。

CircuitBreakerStateMachine@Override
public void onResult(long duration, TimeUnit durationUnit, @Nullable Object result) {
    // 如果记录结果
    if (result != null && circuitBreakerConfig.getRecordResultPredicate().test(result)) {
        LOG.debug("CircuitBreaker '{}' recorded a result type '{}' as failure:", name, result.getClass());
        ResultRecordedAsFailureException failure = new ResultRecordedAsFailureException(name, result);
        publishCircuitErrorEvent(name, duration, durationUnit, failure);
        
      	// 记录调用指标 并检查是否需要过渡状态
        stateReference.get().onError(duration, durationUnit, failure);
    } else {
        onSuccess(duration, durationUnit);
        if (result != null) {
            handlePossibleTransition(Either.left(result));
        }
    }
}

@Override
public void onSuccess(long duration, TimeUnit durationUnit) {
    LOG.debug("CircuitBreaker '{}' succeeded:", name);
    // 发布成功事件
    publishSuccessEvent(duration, durationUnit);
  
    // 记录调用指标 并检查是否需要过渡状态
    stateReference.get().onSuccess(duration, durationUnit);
}

onError

onError的作用就是感知方法调用后,是否应该切换状态。

关闭状态下,会根据调用失败率判断,是否转为open状态。

ClosedState@Override
public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {
    // 首先记录失败调用 然后检查是超过阀值 否需要过渡到打开状态
    checkIfThresholdsExceeded(circuitBreakerMetrics.onError(duration, durationUnit));
}

private void checkIfThresholdsExceeded(Result result) {
    // 如果失败率或慢调用率超过阈值 并且当前状态是关闭状态
    if (Result.hasExceededThresholds(result) && isClosed.compareAndSet(true, false)) {
      // 发布阈值超过事件
      publishCircuitThresholdsExceededEvent(result, circuitBreakerMetrics);
      
      // 过渡到打开状态
      transitionToOpenState();
    }
}

半开状态下,会根据失败率判断,是否转为open状态或者关闭状态。

public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {
   // 首先记录失败调用 然后检查是超过阀值 否需要过渡到打开状态或者关闭状态
   checkIfThresholdsExceeded(circuitBreakerMetrics.onError(duration, durationUnit));
}

private void checkIfThresholdsExceeded(Result result) {
    // 如果失败率或慢调用率超过阈值 并且当前状态是半开状态 转为打开状态
    if (Result.hasExceededThresholds(result) && isHalfOpen.compareAndSet(true, false)) {
        transitionToOpenState();
    }

    // 如果失败率低于阀值 转为关闭状态
    if (result == BELOW_THRESHOLDS && isHalfOpen.compareAndSet(true, false)) {
        transitionToClosedState();
    }
}

打开状态下直接记录指标,不需要进行状态的切换。

public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {
    circuitBreakerMetrics.onError(duration, durationUnit);
}

可以看到,无论哪个状态都是根据circuitBreakerMetrics.onError方法记录了方法调用后的信息进行判断的。

CircuitBreakerMetrics:

// 记录error信息
public Result onError(long duration, TimeUnit durationUnit) {
  	// 记录指标快照
    Snapshot snapshot;
  
    if (durationUnit.toNanos(duration) > slowCallDurationThresholdInNanos) {
        // 记录慢速调用
        snapshot = metrics.record(duration, durationUnit, Outcome.SLOW_ERROR);
    } else {
        // 记录失败调用
        snapshot = metrics.record(duration, durationUnit, Outcome.ERROR);
    }

    // 检查故障率是否高于阈值或慢速调用百分比是否高于阈值
    return checkIfThresholdsExceeded(snapshot);
}

/**
  * 检查故障率是否高于阈值或慢速调用百分比是否高于阈值
  */
private Result checkIfThresholdsExceeded(Snapshot snapshot) {
  			// 获取失败率
        float failureRateInPercentage = getFailureRate(snapshot);
  			// 获取慢调用速率
        float slowCallsInPercentage = getSlowCallRate(snapshot);

        // 低于最低调用次数阈值
        if (failureRateInPercentage == -1 || slowCallsInPercentage == -1) {
            return Result.BELOW_MINIMUM_CALLS_THRESHOLD;
        }

        // 故障率和慢速调用率都超过阈值
        if (failureRateInPercentage >= failureRateThreshold
            && slowCallsInPercentage >= slowCallRateThreshold) {
            return Result.ABOVE_THRESHOLDS;
        }

        // 故障率超过阈值
        if (failureRateInPercentage >= failureRateThreshold) {
            return Result.FAILURE_RATE_ABOVE_THRESHOLDS;
        }

        // 慢速调用率超过阈值
        if (slowCallsInPercentage >= slowCallRateThreshold) {
            return Result.SLOW_CALL_RATE_ABOVE_THRESHOLDS;
        }

        // 故障率低于阈值
        return Result.BELOW_THRESHOLDS;
}

其中Metrics记录了调用失败的指标,CircuitBreakerMetrics便是根据Metrics返回的指标判断结果的状态。

Metrics有两个实现类,分别是基于固定计数的滑动窗口的算法,和基于时间的滑动窗口算法实现。

一个是基于请求纬度,一个是基于时间维度。

FixedSizeSlidingWindowMetrics是基于固定计数的滑动窗口算法,实现也很简单,使用滚动数组,每次在总聚合中清除旧的bucket数据,记录最新的调用指标。即每次统计最近n次调用。

FixedSizeSlidingWindowMetrics:

// 记录调用失败的指标
public synchronized Snapshot record(long duration, TimeUnit durationUnit, Outcome outcome) {
    // 记录到总聚合中
    totalAggregation.record(duration, durationUnit, outcome);
  
    // 移动窗口并记录到最新的bucket中
    moveWindowByOne().record(duration, durationUnit, outcome);
  
    // 返回快照
    return new SnapshotImpl(totalAggregation);
}

// 移动窗口并移除旧的指标
private Measurement moveWindowByOne() {
        // 移动headIndex到下一个bucket
        moveHeadIndexByOne();
        // 移除最新的bucket
        Measurement latestMeasurement = getLatestMeasurement();
        // 从总聚合中移除最新的bucket
        totalAggregation.removeBucket(latestMeasurement);
        // 重置最新的bucket
        latestMeasurement.reset();
        return latestMeasurement;
}


private Measurement getLatestMeasurement() {
    return measurements[headIndex];
}

  
void moveHeadIndexByOne() {
   	this.headIndex = (headIndex + 1) % windowSize;
}

SlidingTimeWindowMetrics是基于时间窗口的,和上面的区别在于移动指针清除旧数据的逻辑。即统计时间区间内的调用。

SlidingTimeWindowMetrics:

public synchronized Snapshot record(long duration, TimeUnit durationUnit, Outcome outcome) {
  	// 记录总聚合
    totalAggregation.record(duration, durationUnit, outcome);
  	// 移动窗口并记录到最新的bucket中
    moveWindowToCurrentEpochSecond(getLatestPartialAggregation())
        .record(duration, durationUnit, outcome);
  	 // 返回快照
    return new SnapshotImpl(totalAggregation);
}


private PartialAggregation moveWindowToCurrentEpochSecond(
        PartialAggregation latestPartialAggregation) {
        // 取当前时钟的时间,并将其转换为从 1970 年 1 月 1 日 00:00:00 UTC 开始计算的秒数
        long currentEpochSecond = clock.instant().getEpochSecond();
        // 差异秒数
        long differenceInSeconds = currentEpochSecond - latestPartialAggregation.getEpochSecond();
        if (differenceInSeconds == 0) {
            return latestPartialAggregation;
        }

        // 需要移动的秒数
        long secondsToMoveTheWindow = Math.min(differenceInSeconds, timeWindowSizeInSeconds);
        PartialAggregation currentPartialAggregation;
        do {
            // 移动头指针
            secondsToMoveTheWindow--;
            moveHeadIndexByOne();
            // 获取最新的 PartialAggregation
            currentPartialAggregation = getLatestPartialAggregation();
            // 从总聚合中移除当前 PartialAggregation
            totalAggregation.removeBucket(currentPartialAggregation);
            // 重置当前的 PartialAggregation 并设置当前对应的秒数
            currentPartialAggregation.reset(currentEpochSecond - secondsToMoveTheWindow);
        } while (secondsToMoveTheWindow > 0);

    return currentPartialAggregation;
}


private PartialAggregation getLatestPartialAggregation() {
    return partialAggregations[headIndex];
}

void moveHeadIndexByOne() {
     this.headIndex = (headIndex + 1) % timeWindowSizeInSeconds;
}

从moveWindowToCurrentEpochSecond和moveWindowByOne可以看到两种实现的区别。

AbstractAggregation的record方法记录了调用的失败信息。

AbstractAggregation:
void record(long duration, TimeUnit durationUnit, Metrics.Outcome outcome) {
    // 调用次数递增
    this.numberOfCalls++;
    // 总调用时长
    this.totalDurationInMillis += durationUnit.toMillis(duration);
    switch (outcome) {
        case SLOW_SUCCESS:
            // 慢调用次数加一
            numberOfSlowCalls++;
            break;

        case SLOW_ERROR:
            // 慢调用次数加一
            numberOfSlowCalls++;
            // 调用失败次数加一
            numberOfFailedCalls++;
            // 慢调用失败次数加一
            numberOfSlowFailedCalls++;
            break;

        case ERROR:
            // 调用失败次数加一
            numberOfFailedCalls++;
            break;

        default:
            break;
    }
}

onSuccess

onSuccess同理。记录窗口内调用的情况,根据失败率判断是否需要切换状态。