resilience4j-circuit-breaker
resilience4j-circuit-breaker
CircuitBreaker 通过有限状态机实现,具有三种正常状态:CLOSED、OPEN 和 HALF_OPEN 以及两种特殊状态 DISABLED 和 FORCED_OPEN。
CircuitBreaker 使用滑动窗口来存储和聚合调用的结果。您可以在基于计数的滑动窗口和基于时间的滑动窗口之间进行选择。
基于计数的滑动窗口聚合最后 N 次调用的结果。
基于时间的滑动窗口聚合了最后 N 秒的调用结果。
那么resilience4j是如何实现断路器效果的呢?我们从基础的断路器api使用来入手,看resilience4j circuit breaker是如何使用进行状态机进行断路效果的。这是最简单的一行调用代码。
String result = circuitBreaker.executeSupplier(backendService::doSomething);
为什么将我们要执行的方法引用传给circuitBreaker的executeSupplier,就能达到断路保护的效果呢?点进方法看一下,其实circuitBreaker通过装饰器模式,对我们的方法进行了增强,达到断路保护的效果:
装饰增强
CircuitBreaker:
/**
* 装饰并执行装饰供应商。
*/
default <T> T executeSupplier(Supplier<T> supplier) {
return decorateSupplier(this, supplier).get();
}
// 通过circuitBreaker 进行supplier的装饰增强 达到断路保护的效果
static <T> Supplier<T> decorateSupplier(CircuitBreaker circuitBreaker, Supplier<T> supplier) {
return () -> {
// 获取权限
circuitBreaker.acquirePermission();
final long start = circuitBreaker.getCurrentTimestamp();
try {
// 方法执行逻辑
T result = supplier.get();
// 统计方法耗时
long duration = circuitBreaker.getCurrentTimestamp() - start;
// 调用成功 让断路器感知
circuitBreaker.onResult(duration, circuitBreaker.getTimestampUnit(), result);
// 返回结果
return result;
} catch (Exception exception) {
// 处理异常的情况
long duration = circuitBreaker.getCurrentTimestamp() - start;
// 调用失败 让断路器感知
circuitBreaker.onError(duration, circuitBreaker.getTimestampUnit(), exception);
// 异常还得往外抛
throw exception;
}
};
}
上面增强后的代码分为怎么几步:获取执行权限、执行方法逻辑、断路器感知执行结果。
获取执行权限
这一步的职责很清晰,就是判断能不能执行。可以看到acquirePermission方法调用了stateReference,stateReference其实就是我们的状态引用。如果不允许调用的话acquirePermission方法会抛出异常。
CircuitBreakerStateMachine:
public void acquirePermission() {
try {
// 获取权限
stateReference.get().acquirePermission();
} catch (Exception e) {
// 发布没有调用权限事件
publishCallNotPermittedEvent();
// 异常向外抛
throw e;
}
}
前面我们说了,状态机有几种有限的状态,我们这里主要看关闭、半开、开放状态。
关闭状态下,允许调用,不会抛出任何异常,因为这时断路器并没有打开。
ClosedState:
/**
* 不会抛出异常,因为断路器已关闭。
*/
@Override
public void acquirePermission() {
// 不处理
}
半开状态下,需要根据配置的允许调用次数进行判断,如果次数大于0,那么允许调用,否则拒绝调用。可以看到,其实就是利用juc包中的类去线程安全的更新变量permittedNumberOfCalls值。
// 获取执行权限
@Override
public void acquirePermission() {
// 半开状态下 需要根据允许的调用次数来判断是否可以调用
if (!tryAcquirePermission()) {
throw CallNotPermittedException
.createCallNotPermittedException(CircuitBreakerStateMachine.this);
}
}
// 尝试获取执行权限 根据剩余的允许调用次数判断 大于0则返回true 否则返回false
@Override
public boolean tryAcquirePermission() {
// 如果允许的调用次数大于0 返回true
if (permittedNumberOfCalls.getAndUpdate(current -> current == 0 ? current : --current)
> 0) {
return true;
}
// 记录不可调用事件
circuitBreakerMetrics.onCallNotPermitted();
return false;
}
那么还有一种状态,就是打开状态,他应该是直接抛出异常,不允许调用了吧?其实不是,因为resilience4中打开状态经过一段时间是可以自动转为半开状态的,所以是需要判断的。
// 获取执行权限
@Override
public void acquirePermission() {
// 尝试获取执行权限
if (!tryAcquirePermission()) {
throw CallNotPermittedException
.createCallNotPermittedException(CircuitBreakerStateMachine.this);
}
}
// 尝试获取执行权限
@Override
public boolean tryAcquirePermission() {
// 检查等待时间是否已过 可以由打开状态过渡到半开状态
if (clock.instant().isAfter(retryAfterWaitDuration)) {
// 过渡到半开状态
toHalfOpenState();
// 检查状态转换后是否允许调用在HALF_OPEN状态下运行
boolean callPermitted = stateReference.get().tryAcquirePermission();
if (!callPermitted) {
// 如果不允许调用 发布不允许调用事件
publishCallNotPermittedEvent();
// 记录不允许调用指标
circuitBreakerMetrics.onCallNotPermitted();
}
return callPermitted;
}
// 如果等待时间未过 则不允许调用 返回false
circuitBreakerMetrics.onCallNotPermitted();
return false;
}
// open 状态转为 half open状态
private synchronized void toHalfOpenState() {
// 更新open标识
if (isOpen.compareAndSet(true, false)) {
// 更新状态
transitionToHalfOpenState();
}
}
// 更新状态为HALF_OPEN
@Override
public void transitionToHalfOpenState() {
stateTransition(HALF_OPEN, currentState -> new HalfOpenState(currentState.attempts()));
}
// 更新状态引用
private void stateTransition(State newState,
UnaryOperator<CircuitBreakerState> newStateGenerator) {
CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> {
StateTransition.transitionBetween(getName(), currentState.getState(), newState);
currentState.preTransitionHook();
return newStateGenerator.apply(currentState);
});
publishStateTransitionEvent(
StateTransition.transitionBetween(getName(), previousState.getState(), newState));
}
可以看到,在open状态下,其实会判断一下是否将状态转换为半开状态,然后在进行判断。并不是直接返回false。
综上,方法在执行前先获取执行权限,对于关闭状态直接允许调用,对于半开状态判断调用次数是否大于0,对于打开状态判断是否可以转为半开状态。
断路器感知执行结果
方法执行完成,需要让断路器感知执行的结果。
// 调用成功 让断路器感知
circuitBreaker.onResult(duration, circuitBreaker.getTimestampUnit(), result);
onResult
可以看到,核心的代码在stateReference的onError和onSuccess方法,里面会记录调用的情况,根据调用的情况进行状态的转换。
CircuitBreakerStateMachine:
@Override
public void onResult(long duration, TimeUnit durationUnit, @Nullable Object result) {
// 如果记录结果
if (result != null && circuitBreakerConfig.getRecordResultPredicate().test(result)) {
LOG.debug("CircuitBreaker '{}' recorded a result type '{}' as failure:", name, result.getClass());
ResultRecordedAsFailureException failure = new ResultRecordedAsFailureException(name, result);
publishCircuitErrorEvent(name, duration, durationUnit, failure);
// 记录调用指标 并检查是否需要过渡状态
stateReference.get().onError(duration, durationUnit, failure);
} else {
onSuccess(duration, durationUnit);
if (result != null) {
handlePossibleTransition(Either.left(result));
}
}
}
@Override
public void onSuccess(long duration, TimeUnit durationUnit) {
LOG.debug("CircuitBreaker '{}' succeeded:", name);
// 发布成功事件
publishSuccessEvent(duration, durationUnit);
// 记录调用指标 并检查是否需要过渡状态
stateReference.get().onSuccess(duration, durationUnit);
}
onError
onError的作用就是感知方法调用后,是否应该切换状态。
关闭状态下,会根据调用失败率判断,是否转为open状态。
ClosedState:
@Override
public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {
// 首先记录失败调用 然后检查是超过阀值 否需要过渡到打开状态
checkIfThresholdsExceeded(circuitBreakerMetrics.onError(duration, durationUnit));
}
private void checkIfThresholdsExceeded(Result result) {
// 如果失败率或慢调用率超过阈值 并且当前状态是关闭状态
if (Result.hasExceededThresholds(result) && isClosed.compareAndSet(true, false)) {
// 发布阈值超过事件
publishCircuitThresholdsExceededEvent(result, circuitBreakerMetrics);
// 过渡到打开状态
transitionToOpenState();
}
}
半开状态下,会根据失败率判断,是否转为open状态或者关闭状态。
public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {
// 首先记录失败调用 然后检查是超过阀值 否需要过渡到打开状态或者关闭状态
checkIfThresholdsExceeded(circuitBreakerMetrics.onError(duration, durationUnit));
}
private void checkIfThresholdsExceeded(Result result) {
// 如果失败率或慢调用率超过阈值 并且当前状态是半开状态 转为打开状态
if (Result.hasExceededThresholds(result) && isHalfOpen.compareAndSet(true, false)) {
transitionToOpenState();
}
// 如果失败率低于阀值 转为关闭状态
if (result == BELOW_THRESHOLDS && isHalfOpen.compareAndSet(true, false)) {
transitionToClosedState();
}
}
打开状态下直接记录指标,不需要进行状态的切换。
public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {
circuitBreakerMetrics.onError(duration, durationUnit);
}
可以看到,无论哪个状态都是根据circuitBreakerMetrics.onError方法记录了方法调用后的信息进行判断的。
CircuitBreakerMetrics:
// 记录error信息
public Result onError(long duration, TimeUnit durationUnit) {
// 记录指标快照
Snapshot snapshot;
if (durationUnit.toNanos(duration) > slowCallDurationThresholdInNanos) {
// 记录慢速调用
snapshot = metrics.record(duration, durationUnit, Outcome.SLOW_ERROR);
} else {
// 记录失败调用
snapshot = metrics.record(duration, durationUnit, Outcome.ERROR);
}
// 检查故障率是否高于阈值或慢速调用百分比是否高于阈值
return checkIfThresholdsExceeded(snapshot);
}
/**
* 检查故障率是否高于阈值或慢速调用百分比是否高于阈值
*/
private Result checkIfThresholdsExceeded(Snapshot snapshot) {
// 获取失败率
float failureRateInPercentage = getFailureRate(snapshot);
// 获取慢调用速率
float slowCallsInPercentage = getSlowCallRate(snapshot);
// 低于最低调用次数阈值
if (failureRateInPercentage == -1 || slowCallsInPercentage == -1) {
return Result.BELOW_MINIMUM_CALLS_THRESHOLD;
}
// 故障率和慢速调用率都超过阈值
if (failureRateInPercentage >= failureRateThreshold
&& slowCallsInPercentage >= slowCallRateThreshold) {
return Result.ABOVE_THRESHOLDS;
}
// 故障率超过阈值
if (failureRateInPercentage >= failureRateThreshold) {
return Result.FAILURE_RATE_ABOVE_THRESHOLDS;
}
// 慢速调用率超过阈值
if (slowCallsInPercentage >= slowCallRateThreshold) {
return Result.SLOW_CALL_RATE_ABOVE_THRESHOLDS;
}
// 故障率低于阈值
return Result.BELOW_THRESHOLDS;
}
其中Metrics记录了调用失败的指标,CircuitBreakerMetrics便是根据Metrics返回的指标判断结果的状态。
Metrics有两个实现类,分别是基于固定计数的滑动窗口的算法,和基于时间的滑动窗口算法实现。
一个是基于请求纬度,一个是基于时间维度。
FixedSizeSlidingWindowMetrics是基于固定计数的滑动窗口算法,实现也很简单,使用滚动数组,每次在总聚合中清除旧的bucket数据,记录最新的调用指标。即每次统计最近n次调用。
FixedSizeSlidingWindowMetrics:
// 记录调用失败的指标
public synchronized Snapshot record(long duration, TimeUnit durationUnit, Outcome outcome) {
// 记录到总聚合中
totalAggregation.record(duration, durationUnit, outcome);
// 移动窗口并记录到最新的bucket中
moveWindowByOne().record(duration, durationUnit, outcome);
// 返回快照
return new SnapshotImpl(totalAggregation);
}
// 移动窗口并移除旧的指标
private Measurement moveWindowByOne() {
// 移动headIndex到下一个bucket
moveHeadIndexByOne();
// 移除最新的bucket
Measurement latestMeasurement = getLatestMeasurement();
// 从总聚合中移除最新的bucket
totalAggregation.removeBucket(latestMeasurement);
// 重置最新的bucket
latestMeasurement.reset();
return latestMeasurement;
}
private Measurement getLatestMeasurement() {
return measurements[headIndex];
}
void moveHeadIndexByOne() {
this.headIndex = (headIndex + 1) % windowSize;
}
SlidingTimeWindowMetrics是基于时间窗口的,和上面的区别在于移动指针清除旧数据的逻辑。即统计时间区间内的调用。
SlidingTimeWindowMetrics:
public synchronized Snapshot record(long duration, TimeUnit durationUnit, Outcome outcome) {
// 记录总聚合
totalAggregation.record(duration, durationUnit, outcome);
// 移动窗口并记录到最新的bucket中
moveWindowToCurrentEpochSecond(getLatestPartialAggregation())
.record(duration, durationUnit, outcome);
// 返回快照
return new SnapshotImpl(totalAggregation);
}
private PartialAggregation moveWindowToCurrentEpochSecond(
PartialAggregation latestPartialAggregation) {
// 取当前时钟的时间,并将其转换为从 1970 年 1 月 1 日 00:00:00 UTC 开始计算的秒数
long currentEpochSecond = clock.instant().getEpochSecond();
// 差异秒数
long differenceInSeconds = currentEpochSecond - latestPartialAggregation.getEpochSecond();
if (differenceInSeconds == 0) {
return latestPartialAggregation;
}
// 需要移动的秒数
long secondsToMoveTheWindow = Math.min(differenceInSeconds, timeWindowSizeInSeconds);
PartialAggregation currentPartialAggregation;
do {
// 移动头指针
secondsToMoveTheWindow--;
moveHeadIndexByOne();
// 获取最新的 PartialAggregation
currentPartialAggregation = getLatestPartialAggregation();
// 从总聚合中移除当前 PartialAggregation
totalAggregation.removeBucket(currentPartialAggregation);
// 重置当前的 PartialAggregation 并设置当前对应的秒数
currentPartialAggregation.reset(currentEpochSecond - secondsToMoveTheWindow);
} while (secondsToMoveTheWindow > 0);
return currentPartialAggregation;
}
private PartialAggregation getLatestPartialAggregation() {
return partialAggregations[headIndex];
}
void moveHeadIndexByOne() {
this.headIndex = (headIndex + 1) % timeWindowSizeInSeconds;
}
从moveWindowToCurrentEpochSecond和moveWindowByOne可以看到两种实现的区别。
AbstractAggregation的record方法记录了调用的失败信息。
AbstractAggregation:
void record(long duration, TimeUnit durationUnit, Metrics.Outcome outcome) {
// 调用次数递增
this.numberOfCalls++;
// 总调用时长
this.totalDurationInMillis += durationUnit.toMillis(duration);
switch (outcome) {
case SLOW_SUCCESS:
// 慢调用次数加一
numberOfSlowCalls++;
break;
case SLOW_ERROR:
// 慢调用次数加一
numberOfSlowCalls++;
// 调用失败次数加一
numberOfFailedCalls++;
// 慢调用失败次数加一
numberOfSlowFailedCalls++;
break;
case ERROR:
// 调用失败次数加一
numberOfFailedCalls++;
break;
default:
break;
}
}
onSuccess
onSuccess同理。记录窗口内调用的情况,根据失败率判断是否需要切换状态。