spring cloud loadBalanced
大约 27 分钟
spring cloud loadBalanced
自动配置类
LoadBalancerAutoConfiguration
/**
* 用于阻塞户端负载均衡的自动配置。
* <ul>作用: 收集容器中所有加了@LoadBalanced的RestTemplate 并添加拦截器以及其他自定义</ul>
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerClientsProperties.class)
public class LoadBalancerAutoConfiguration {
// 收集容器中所有加了@LoadBalanced的 RestTemplate
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
// 用于修改增强请求
@Autowired(required = false)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
/**
* 收集容器中所有的RestTemplateCustomizer 并应用于 restTemplate
*/
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
// 对 restTemplate 进行加工
customizer.customize(restTemplate);
}
}
});
}
/**
* 用于创建 LoadBalancerRequest
*/
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
/**
* 没有开启重试或者没有引用相关重试的依赖时生效
*/
@Configuration(proxyBeanMethods = false)
@Conditional(RetryMissingOrDisabledCondition.class)
static class LoadBalancerInterceptorConfig {
/**
* 拦截HttpRequest 并委托给LoadBalancerClient执行
*/
@Bean
public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
/**
* 向容器中注入一个RestTemplateCustomizer 作用是向restTemplate添加LoadBalancerInterceptor拦截器
*/
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
private static class RetryMissingOrDisabledCondition extends AnyNestedCondition {
RetryMissingOrDisabledCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class RetryTemplateMissing {
}
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", havingValue = "false")
static class RetryDisabled {
}
}
/**
* 重试机制的自动配置。
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RetryTemplate.class)
public static class RetryAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public LoadBalancedRetryFactory loadBalancedRetryFactory() {
return new LoadBalancedRetryFactory() {
};
}
}
/**
* 重试拦截机制的自动配置。
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RetryTemplate.class)
@ConditionalOnBean(ReactiveLoadBalancer.Factory.class)
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", matchIfMissing = true)
public static class RetryInterceptorAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public RetryLoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory, LoadBalancedRetryFactory loadBalancedRetryFactory,
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory) {
return new RetryLoadBalancerInterceptor(loadBalancerClient, requestFactory, loadBalancedRetryFactory,
loadBalancerFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
}
LoadBalancerAutoConfiguration
/**
* 收集容器中的LoadBalancerClientSpecification 然后配置LoadBalancerClientFactory
*/
@Configuration(proxyBeanMethods = false)
@LoadBalancerClients
@EnableConfigurationProperties(LoadBalancerClientsProperties.class)
@AutoConfigureBefore({ ReactorLoadBalancerClientAutoConfiguration.class,
LoadBalancerBeanPostProcessorAutoConfiguration.class })
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.enabled", havingValue = "true", matchIfMissing = true)
public class LoadBalancerAutoConfiguration {
private final ObjectProvider<List<LoadBalancerClientSpecification>> configurations;
public LoadBalancerAutoConfiguration(ObjectProvider<List<LoadBalancerClientSpecification>> configurations) {
this.configurations = configurations;
}
@Bean
@ConditionalOnMissingBean
public LoadBalancerZoneConfig zoneConfig(Environment environment) {
return new LoadBalancerZoneConfig(environment.getProperty("spring.cloud.loadbalancer.zone"));
}
/**
* 收集容器中所有的LoadBalancerClientSpecification,然后创建LoadBalancerClientFactory
*/
@ConditionalOnMissingBean
@Bean
public LoadBalancerClientFactory loadBalancerClientFactory(LoadBalancerClientsProperties properties) {
LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory(properties);
clientFactory.setConfigurations(this.configurations.getIfAvailable(Collections::emptyList));
return clientFactory;
}
}
BlockingLoadBalancerClientAutoConfiguration
/**
* An autoconfiguration for {@link BlockingLoadBalancerClient}.
*
* @author Olga Maciaszek-Sharma
* @author Gandhimathi Velusamy
* @since 2.1.3
*/
@Configuration(proxyBeanMethods = false)
@LoadBalancerClients
@AutoConfigureAfter(LoadBalancerAutoConfiguration.class)
@AutoConfigureBefore({ org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration.class,
AsyncLoadBalancerAutoConfiguration.class })
@ConditionalOnClass(RestTemplate.class)
public class BlockingLoadBalancerClientAutoConfiguration {
/**
* LoadBalancerClient 是用来接收 HttpRequest 根据 Uri 得到 serviceId ,
* 然后使用 serviceId 负载均衡得到唯一的 serviceInstance,
* 然后再执行 HttpRequest
*/
@Bean
@ConditionalOnBean(LoadBalancerClientFactory.class)
@ConditionalOnMissingBean
public LoadBalancerClient blockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory) {
return new BlockingLoadBalancerClient(loadBalancerClientFactory);
}
@Bean
@ConditionalOnBean(LoadBalancerClientFactory.class)
@ConditionalOnMissingBean(LoadBalancerServiceInstanceCookieTransformer.class)
public LoadBalancerServiceInstanceCookieTransformer loadBalancerServiceInstanceCookieTransformer(
LoadBalancerClientFactory loadBalancerClientFactory) {
return new LoadBalancerServiceInstanceCookieTransformer(loadBalancerClientFactory);
}
@Bean
@ConditionalOnMissingBean(XForwardedHeadersTransformer.class)
@ConditionalOnBean(LoadBalancerClientFactory.class)
public XForwardedHeadersTransformer xForwarderHeadersTransformer(
LoadBalancerClientFactory loadBalancerClientFactory) {
return new XForwardedHeadersTransformer(loadBalancerClientFactory);
}
@Configuration
@ConditionalOnClass(RetryTemplate.class)
@EnableConfigurationProperties(LoadBalancerClientsProperties.class)
protected static class BlockingLoadBalancerRetryConfig {
@Bean
@ConditionalOnMissingBean
LoadBalancedRetryFactory loadBalancedRetryFactory(
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory) {
return new BlockingLoadBalancedRetryFactory(loadBalancerFactory);
}
}
}
注解
@LoadBalanced
/**
* 为RestTemplate或WebClient bean标记的注解,以配置使用LoadBalancerClient。
* 本质上是@Qualifier
* @author Spencer Gibb
*/
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
@LoadBalancerClient
/**
* 负载均衡器客户端的声明性配置。将此注释添加到任何
* <code>@Configuration</code>,然后注入 {@link LoadBalancerClientFactory} 到
* 访问创建的客户端。
*
* @author Dave Syer
*/
@Configuration(proxyBeanMethods = false)
@Import(LoadBalancerClientConfigurationRegistrar.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LoadBalancerClient {
/**
* name 的同义词(客户端的名称)。
*
* @return the name of the load balancer client
* @see #name()
*/
@AliasFor("name")
String value() default "";
/**
* 负载均衡器客户端的名称,用于唯一标识一组客户端
* 资源,包括负载均衡器。
*
* @return the name of the load balancer client
*/
@AliasFor("value")
String name() default "";
/**
* 负载均衡器客户端的自定义<code>@Configuration</code>。可以包含
* 覆盖构成客户端的部分<code>@Bean</code>定义。
*
* @return configuration classes for the load balancer client.
* @see LoadBalancerClientConfiguration for the defaults
*/
Class<?>[] configuration() default {};
}
@LoadBalancerClients
// 看成是@LoadBalancerClient的集合
@Configuration(proxyBeanMethods = false)
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE })
@Documented
@Import(LoadBalancerClientConfigurationRegistrar.class)
public @interface LoadBalancerClients {
LoadBalancerClient[] value() default {};
Class<?>[] defaultConfiguration() default {};
}
类
LoadBalancerClientConfigurationRegistrar
由注解@LoadBalancerClient和@LoadBalancerClients导入,作用是解析注解的元信息,然后注册LoadBalancerClientSpecification到容器中。也就是每个服务都可以有不通的负载均衡策略。
/**
* 解析@LoadBalancerClient和@LoadBalancerClients注解元信息 注册LoadBalancerClientSpecification到容器中
*
* @author Dave Syer
*/
public class LoadBalancerClientConfigurationRegistrar implements ImportBeanDefinitionRegistrar {
/**
* 根据注解元信息获取客户端名称
*/
private static String getClientName(Map<String, Object> client) {
if (client == null) {
return null;
}
String value = (String) client.get("value");
if (!StringUtils.hasText(value)) {
value = (String) client.get("name");
}
if (StringUtils.hasText(value)) {
return value;
}
throw new IllegalStateException("Either 'name' or 'value' must be provided in @LoadBalancerClient");
}
/**
* 注册一个LoadBalancerClientSpecification到容器中
*
* @param name 客户端名称
* @param configuration 客户端需要生效的配置
*/
private static void registerClientConfiguration(BeanDefinitionRegistry registry, Object name,
Object configuration) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder
.genericBeanDefinition(LoadBalancerClientSpecification.class);
builder.addConstructorArgValue(name);
builder.addConstructorArgValue(configuration);
registry.registerBeanDefinition(name + ".LoadBalancerClientSpecification", builder.getBeanDefinition());
}
/**
* 解析注解元信息 注册LoadBalancerClientSpecification到容器中
*
* @param metadata annotation metadata of the importing class
* @param registry current bean definition registry
*/
@Override
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
Map<String, Object> attrs = metadata.getAnnotationAttributes(LoadBalancerClients.class.getName(), true);
if (attrs != null && attrs.containsKey("value")) {
AnnotationAttributes[] clients = (AnnotationAttributes[]) attrs.get("value");
for (AnnotationAttributes client : clients) {
/**
* 映射成 LoadBalancerClientSpecification 注册到BeanFactory中,
* LoadBalancerClientSpecification 的作用可以看 {@link NamedContextFactory#createContext(String)}
* 简单来说就是将 client.get("configuration") 的值 注册给指定Name的IOC容器
* */
registerClientConfiguration(registry, getClientName(client), client.get("configuration"));
}
}
if (attrs != null && attrs.containsKey("defaultConfiguration")) {
String name;
if (metadata.hasEnclosingClass()) {
name = "default." + metadata.getEnclosingClassName();
} else {
name = "default." + metadata.getClassName();
}
// 同上
registerClientConfiguration(registry, name, attrs.get("defaultConfiguration"));
}
Map<String, Object> client = metadata.getAnnotationAttributes(LoadBalancerClient.class.getName(), true);
String name = getClientName(client);
if (name != null) {
// 同上
registerClientConfiguration(registry, name, client.get("configuration"));
}
}
}
LoadBalancerClientSpecification
/**
* 客户端配置
* @author Dave Syer
*/
public class LoadBalancerClientSpecification implements NamedContextFactory.Specification {
private String name;
private Class<?>[] configuration;
public LoadBalancerClientSpecification() {
}
public LoadBalancerClientSpecification(String name, Class<?>[] configuration) {
Assert.hasText(name, "name must not be empty");
this.name = name;
Assert.notNull(configuration, "configuration must not be null");
this.configuration = configuration;
}
public String getName() {
return this.name;
}
public void setName(String name) {
Assert.hasText(name, "name must not be empty");
this.name = name;
}
public Class<?>[] getConfiguration() {
return this.configuration;
}
public void setConfiguration(Class<?>[] configuration) {
Assert.notNull(configuration, "configuration must not be null");
this.configuration = configuration;
}
@Override
public String toString() {
ToStringCreator to = new ToStringCreator(this);
to.append("name", this.name);
to.append("configuration", this.configuration);
return to.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LoadBalancerClientSpecification that = (LoadBalancerClientSpecification) o;
return Objects.equals(this.name, that.name) && Arrays.equals(this.configuration, that.configuration);
}
@Override
public int hashCode() {
return Objects.hash(this.name, this.configuration);
}
}
NamedContextFactory
/**
* 创建一组子上下文,允许在每个子上下文中用一组规范来定义 Bean
* <p>
* 移植自 spring-cloud-netflix FeignClientFactory 和 SpringClientFactory
*
* @param <C> specification
* @author Spencer Gibb
* @author Dave Syer
* @author Tommy Karlsson
*/
// TODO: add javadoc
public abstract class NamedContextFactory<C extends NamedContextFactory.Specification>
implements DisposableBean, ApplicationContextAware {
private final String propertySourceName;
private final String propertyName;
private Map<String, AnnotationConfigApplicationContext> contexts = new ConcurrentHashMap<>();
private Map<String, C> configurations = new ConcurrentHashMap<>();
private ApplicationContext parent;
private Class<?> defaultConfigType;
public NamedContextFactory(Class<?> defaultConfigType, String propertySourceName, String propertyName) {
this.defaultConfigType = defaultConfigType;
this.propertySourceName = propertySourceName;
this.propertyName = propertyName;
}
@Override
public void setApplicationContext(ApplicationContext parent) throws BeansException {
this.parent = parent;
}
public ApplicationContext getParent() {
return parent;
}
public void setConfigurations(List<C> configurations) {
for (C client : configurations) {
this.configurations.put(client.getName(), client);
}
}
public Set<String> getContextNames() {
return new HashSet<>(this.contexts.keySet());
}
@Override
public void destroy() {
Collection<AnnotationConfigApplicationContext> values = this.contexts.values();
for (AnnotationConfigApplicationContext context : values) {
// This can fail, but it never throws an exception (you see stack traces
// logged as WARN).
context.close();
}
this.contexts.clear();
}
protected AnnotationConfigApplicationContext getContext(String name) {
if (!this.contexts.containsKey(name)) {
synchronized (this.contexts) {
if (!this.contexts.containsKey(name)) {
this.contexts.put(name, createContext(name));
}
}
}
return this.contexts.get(name);
}
protected AnnotationConfigApplicationContext createContext(String name) {
AnnotationConfigApplicationContext context;
if (this.parent != null) {
// jdk11 issue
// https://github.com/spring-cloud/spring-cloud-netflix/issues/3101
// https://github.com/spring-cloud/spring-cloud-openfeign/issues/475
DefaultListableBeanFactory beanFactory = new DefaultListableBeanFactory();
if (parent instanceof ConfigurableApplicationContext) {
beanFactory.setBeanClassLoader(
((ConfigurableApplicationContext) parent).getBeanFactory().getBeanClassLoader());
} else {
beanFactory.setBeanClassLoader(parent.getClassLoader());
}
context = new AnnotationConfigApplicationContext(beanFactory);
context.setClassLoader(this.parent.getClassLoader());
} else {
context = new AnnotationConfigApplicationContext();
}
// 注册客户端对应的配置类
if (this.configurations.containsKey(name)) {
for (Class<?> configuration : this.configurations.get(name).getConfiguration()) {
// 将配置类注册到context中
context.register(configuration);
}
}
// 注册默认配置类
for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
if (entry.getKey().startsWith("default.")) {
for (Class<?> configuration : entry.getValue().getConfiguration()) {
context.register(configuration);
}
}
}
/**
* 固定注册这两个。
*
* 比如 {@link org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory} 的 defaultConfigType
* 是这个类型 {@link org.springframework.cloud.loadbalancer.annotation.LoadBalancerClientConfiguration}
*
* LoadBalancerClientConfiguration 其目的是注册了 ServiceInstanceListSupplier、ReactorLoadBalancer<ServiceInstance>
* 是用来实现负载均衡策略得到唯一的 ServiceInstance 的。而且都有 @ConditionalOnMissingBean 条件,若我们想自定义
* 可以设置 {@link NamedContextFactory#configurations} 属性 扩展配置类。
*
* 可以使用 @LoadBalancerClient 或者直接注册 LoadBalancerClientSpecification 类型的bean到容器中,
* 看 {@link org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration#loadBalancerClientFactory()}
* */
context.register(PropertyPlaceholderAutoConfiguration.class, this.defaultConfigType);
/**
* 设置客户端属性
* */
context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(this.propertySourceName,
Collections.<String, Object>singletonMap(this.propertyName, name)));
if (this.parent != null) {
// Uses Environment from parent as well as beans
context.setParent(this.parent);
}
context.setDisplayName(generateDisplayName(name));
context.refresh();
return context;
}
protected String generateDisplayName(String name) {
return this.getClass().getSimpleName() + "-" + name;
}
public <T> T getInstance(String name, Class<T> type) {
AnnotationConfigApplicationContext context = getContext(name);
try {
return context.getBean(type);
} catch (NoSuchBeanDefinitionException e) {
// ignore
}
return null;
}
public <T> ObjectProvider<T> getLazyProvider(String name, Class<T> type) {
return new ClientFactoryObjectProvider<>(this, name, type);
}
public <T> ObjectProvider<T> getProvider(String name, Class<T> type) {
AnnotationConfigApplicationContext context = getContext(name);
return context.getBeanProvider(type);
}
public <T> T getInstance(String name, Class<?> clazz, Class<?>... generics) {
ResolvableType type = ResolvableType.forClassWithGenerics(clazz, generics);
return getInstance(name, type);
}
@SuppressWarnings("unchecked")
public <T> T getInstance(String name, ResolvableType type) {
AnnotationConfigApplicationContext context = getContext(name);
String[] beanNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type);
if (beanNames.length > 0) {
for (String beanName : beanNames) {
if (context.isTypeMatch(beanName, type)) {
return (T) context.getBean(beanName);
}
}
}
return null;
}
public <T> Map<String, T> getInstances(String name, Class<T> type) {
AnnotationConfigApplicationContext context = getContext(name);
return BeanFactoryUtils.beansOfTypeIncludingAncestors(context, type);
}
/**
* Specification with name and configuration.
*/
public interface Specification {
String getName();
Class<?>[] getConfiguration();
}
}
LoadBalancerClientFactory
/**
* 继承NamedContextFactory
* 创建客户端、负载均衡器和客户端配置实例的工厂。
* 为每个客户端名称创建一个 Spring ApplicationContext
*
* @author Spencer Gibb
* @author Dave Syer
* @author Olga Maciaszek-Sharma
*/
public class LoadBalancerClientFactory extends NamedContextFactory<LoadBalancerClientSpecification>
implements ReactiveLoadBalancer.Factory<ServiceInstance> {
private static final Log log = LogFactory.getLog(LoadBalancerClientFactory.class);
/**
* Property source name for load balancer.
*/
public static final String NAMESPACE = "loadbalancer";
/**
* Property for client name within the load balancer namespace.
*/
public static final String PROPERTY_NAME = NAMESPACE + ".client.name";
private final LoadBalancerClientsProperties properties;
/**
* @deprecated in favour of
* {@link LoadBalancerClientFactory#LoadBalancerClientFactory(LoadBalancerClientsProperties)}
*/
@Deprecated
public LoadBalancerClientFactory() {
this(null);
}
public LoadBalancerClientFactory(LoadBalancerClientsProperties properties) {
super(LoadBalancerClientConfiguration.class, NAMESPACE, PROPERTY_NAME);
this.properties = properties;
}
public static String getName(Environment environment) {
return environment.getProperty(PROPERTY_NAME);
}
@Override
public ReactiveLoadBalancer<ServiceInstance> getInstance(String serviceId) {
return getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class);
}
@Override
public LoadBalancerProperties getProperties(String serviceId) {
if (properties == null) {
if (log.isWarnEnabled()) {
log.warn("LoadBalancerClientsProperties is null. Please use the new constructor.");
}
return null;
}
if (serviceId == null || !properties.getClients().containsKey(serviceId)) {
// no specific client properties, return default
return properties;
}
// because specifics are overlayed on top of defaults, everything in `properties`,
// unless overridden, is in `clientsProperties`
return properties.getClients().get(serviceId);
}
}
LoadBalancerClient
/**
* 表示客户端负载均衡器。
*
* @author Spencer Gibb
*/
public interface LoadBalancerClient extends ServiceInstanceChooser {
/**
* 使用来自 LoadBalancer 的 ServiceInstance 执行指定请求
* 服务。
*
* @param serviceId The service ID to look up the LoadBalancer.
* @param request Allows implementations to execute pre and post actions, such as
* incrementing metrics.
* @param <T> type of the response
* @return The result of the LoadBalancerRequest callback on the selected
* ServiceInstance.
* @throws IOException in case of IO issues.
*/
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
/**
* Executes request using a ServiceInstance from the LoadBalancer for the specified
* service.
*
* @param serviceId The service ID to look up the LoadBalancer.
* @param serviceInstance The service to execute the request to.
* @param request Allows implementations to execute pre and post actions, such as
* incrementing metrics.
* @param <T> type of the response
* @return The result of the LoadBalancerRequest callback on the selected
* ServiceInstance.
* @throws IOException in case of IO issues.
*/
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
/**
* Creates a proper URI with a real host and port for systems to utilize. Some systems
* use a URI with the logical service name as the host, such as
* http://myservice/path/to/service. This will replace the service name with the
* host:port from the ServiceInstance.
*
* @param instance service instance to reconstruct the URI
* @param original A URI with the host as a logical service name.
* @return A reconstructed URI.
*/
URI reconstructURI(ServiceInstance instance, URI original);
}
public class BlockingLoadBalancerClient implements LoadBalancerClient {
private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;
/**
* @deprecated in favour of
* {@link BlockingLoadBalancerClient#BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory)}
*/
@Deprecated
public BlockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory,
LoadBalancerProperties properties) {
this.loadBalancerClientFactory = loadBalancerClientFactory;
}
public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {
this.loadBalancerClientFactory = loadBalancerClientFactory;
}
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
/**
* 根据 serviceId 获取配置的 hint 值,默认是 default
* 可以设置 spring.cloud.loadbalancer.hint.serviceName=hint1 来设置该值
* 注:看 {@link LoadBalancerProperties}。
* */
String hint = getHint(serviceId);
// 装饰成 LoadBalancerRequestAdapter
LoadBalancerRequestAdapter<T, TimedRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(request,
buildRequestContext(request, hint));
/**
* 从 loadBalancerClientFactory 中获取 LoadBalancerLifecycle 类型的bean
*
* 注:LoadBalancerClientFactory 继承 NamedContextFactory , 会根据 serviceId 创建一个IOC容器,再从这个指定的IOC容器中获取bean,
* 创建的IOC容器会存到Map中
* */
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
.getSupportedLifecycleProcessors(
loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
DefaultRequestContext.class, Object.class, ServiceInstance.class);
// 回调 LoadBalancerLifecycle#onStart 生命周期方法
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
/**
* 负载均衡选择出唯一的 serviceInstance
* {@link BlockingLoadBalancerClient#choose(String, Request)}
* */
ServiceInstance serviceInstance = choose(serviceId, lbRequest);
// 实例为空,就报错
if (serviceInstance == null) {
// 回调 LoadBalancerLifecycle#onComplete 生命周期方法
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse())));
throw new IllegalStateException("No instances available for " + serviceId);
}
// 使用 实例 执行请求
return execute(serviceId, serviceInstance, lbRequest);
}
private <T> TimedRequestContext buildRequestContext(LoadBalancerRequest<T> delegate, String hint) {
if (delegate instanceof HttpRequestLoadBalancerRequest) {
HttpRequest request = ((HttpRequestLoadBalancerRequest) delegate).getHttpRequest();
if (request != null) {
RequestData requestData = new RequestData(request);
return new RequestDataContext(requestData, hint);
}
}
return new DefaultRequestContext(delegate, hint);
}
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)
throws IOException {
// 装饰一下 serviceInstance
DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);
// 从 loadBalancerClientFactory 中获取 LoadBalancerLifecycle 类型的bean
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
Request lbRequest = request instanceof Request ? (Request) request : new DefaultRequest<>();
// 回调 LoadBalancerLifecycle#onStartRequest 生命周期方法
supportedLifecycleProcessors
.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, new DefaultResponse(serviceInstance)));
try {
/**
* 执行请求
* {@link LoadBalancerRequestFactory#createRequest(HttpRequest, byte[], ClientHttpRequestExecution)}
* */
T response = request.apply(serviceInstance);
LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);
Object clientResponse = getClientResponse(response, properties.isUseRawStatusCodeInResponseData());
// 回调 LoadBalancerLifecycle#onComplete 生命周期方法
supportedLifecycleProcessors
.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS,
lbRequest, defaultResponse, clientResponse)));
return response;
}
catch (IOException iOException) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.FAILED, iOException, lbRequest, defaultResponse)));
throw iOException;
}
catch (Exception exception) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, defaultResponse)));
ReflectionUtils.rethrowRuntimeException(exception);
}
return null;
}
private <T> Object getClientResponse(T response, boolean useRawStatusCodes) {
ClientHttpResponse clientHttpResponse = null;
if (response instanceof ClientHttpResponse) {
clientHttpResponse = (ClientHttpResponse) response;
}
if (clientHttpResponse != null) {
try {
if (useRawStatusCodes) {
return new ResponseData(null, clientHttpResponse);
}
return new ResponseData(clientHttpResponse, null);
}
catch (IOException ignored) {
}
}
return response;
}
private Set<LoadBalancerLifecycle> getSupportedLifecycleProcessors(String serviceId) {
return LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(
loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
DefaultRequestContext.class, Object.class, ServiceInstance.class);
}
@Override
public URI reconstructURI(ServiceInstance serviceInstance, URI original) {
return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
}
@Override
public ServiceInstance choose(String serviceId) {
return choose(serviceId, REQUEST);
}
@Override
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
/**
* 通过 loadBalancerClientFactory 获取 ReactiveLoadBalancer 实例。
*
* 其实是会构造一个IOC容器,而IOC容器会注册这个配置类 {@link LoadBalancerClientConfiguration}
* 这个配置的目的是生成 ServiceInstanceListSupplier、ReactorLoadBalancer
* 然后从IOC容器中获取 ReactiveLoadBalancer 类型的bean
*
* 注:ReactorLoadBalancer 依赖 ServiceInstanceListSupplier 得到 List<ServiceInstance>
* 然后根据其负载均衡策略得到唯一的 serviceInstance
* 而 ServiceInstanceListSupplier 默认是通过获取 DiscoveryClient 得到 List<ServiceInstance>,
* 然后根据 ServiceInstanceListSupplier 的逻辑过滤掉一些
* */
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
}
// 选择出 ServiceInstance
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
if (loadBalancerResponse == null) {
return null;
}
return loadBalancerResponse.getServer();
}
private String getHint(String serviceId) {
LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);
String defaultHint = properties.getHint().getOrDefault("default", "default");
String hintPropertyValue = properties.getHint().get(serviceId);
return hintPropertyValue != null ? hintPropertyValue : defaultHint;
}
}
LoadBalancerRequest
public interface LoadBalancerRequest<T> {
T apply(ServiceInstance instance) throws Exception;
}
/**
* 默认 {@link LoadBalancerRequest} 实现。
*
* @author Olga Maciaszek-Sharma
* @since 3.1.2
*/
class BlockingLoadBalancerRequest implements HttpRequestLoadBalancerRequest<ClientHttpResponse> {
private final LoadBalancerClient loadBalancer;
private final List<LoadBalancerRequestTransformer> transformers;
private final ClientHttpRequestData clientHttpRequestData;
BlockingLoadBalancerRequest(LoadBalancerClient loadBalancer, List<LoadBalancerRequestTransformer> transformers,
ClientHttpRequestData clientHttpRequestData) {
this.loadBalancer = loadBalancer;
this.transformers = transformers;
this.clientHttpRequestData = clientHttpRequestData;
}
@Override
public ClientHttpResponse apply(ServiceInstance instance) throws Exception {
// 装饰一下,其目的是会根据 instance 生成 uri
HttpRequest serviceRequest = new ServiceRequestWrapper(clientHttpRequestData.request, instance, loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
// 对请求进行加工
serviceRequest = transformer.transformRequest(serviceRequest, instance);
}
}
/**
* 放行请求
* {@link InterceptingClientHttpRequest.InterceptingRequestExecution#execute(HttpRequest, byte[])}
*/
return clientHttpRequestData.execution.execute(serviceRequest, clientHttpRequestData.body);
}
@Override
public HttpRequest getHttpRequest() {
return clientHttpRequestData.request;
}
static class ClientHttpRequestData {
private final HttpRequest request;
private final byte[] body;
private final ClientHttpRequestExecution execution;
ClientHttpRequestData(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) {
this.request = request;
this.body = body;
this.execution = execution;
}
}
}
ReactiveLoadBalancer
/**
* 反应式负载均衡器。
*
* @param <T> type of the response
* @author Spencer Gibb
* @author Olga Maciaszek-Sharma
*/
public interface ReactiveLoadBalancer<T> {
/**
* Default implementation of a request.
*/
Request<DefaultRequestContext> REQUEST = new DefaultRequest<>();
/**
* 根据负载平衡算法选择下一个服务器。
*
* @param request - incoming request
* @return publisher for the response
*/
@SuppressWarnings("rawtypes")
Publisher<Response<T>> choose(Request request);
default Publisher<Response<T>> choose() { // conflicting name
return choose(REQUEST);
}
interface Factory<T> {
default LoadBalancerProperties getProperties(String serviceId) {
return null;
}
ReactiveLoadBalancer<T> getInstance(String serviceId);
/**
* Allows accessing beans registered within client-specific LoadBalancer contexts.
*
* @param name Name of the beans to be returned
* @param type The class of the beans to be returned
* @param <X> The type of the beans to be returned
* @return a {@link Map} of beans
* @see <code>@LoadBalancerClient</code>
*/
<X> Map<String, X> getInstances(String name, Class<X> type);
/**
* Allows accessing a bean registered within client-specific LoadBalancer
* contexts.
*
* @param name Name of the bean to be returned
* @param clazz The class of the bean to be returned
* @param generics The classes of generic types of the bean to be returned
* @param <X> The type of the bean to be returned
* @return a {@link Map} of beans
* @see <code>@LoadBalancerClient</code>
*/
<X> X getInstance(String name, Class<?> clazz, Class<?>... generics);
}
}
/**
* {@link ReactorServiceInstanceLoadBalancer} 的基于循环的实现。
*
* @author Spencer Gibb
* @author Olga Maciaszek-Sharma
* @author Zhuozhi JI
*/
public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private static final Log log = LogFactory.getLog(RoundRobinLoadBalancer.class);
final AtomicInteger position;
final String serviceId;
ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
/**
* @param serviceInstanceListSupplierProvider a provider of
* {@link ServiceInstanceListSupplier} that will be used to get available instances
* @param serviceId id of the service for which to choose an instance
*/
public RoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
String serviceId) {
this(serviceInstanceListSupplierProvider, serviceId, new Random().nextInt(1000));
}
/**
* @param serviceInstanceListSupplierProvider a provider of
* {@link ServiceInstanceListSupplier} that will be used to get available instances
* @param serviceId id of the service for which to choose an instance
* @param seedPosition Round Robin element position marker
*/
public RoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
String serviceId, int seedPosition) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
this.position = new AtomicInteger(seedPosition);
}
@SuppressWarnings("rawtypes")
@Override
// see original
// https://github.com/Netflix/ocelli/blob/master/ocelli-core/
// src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.java
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
List<ServiceInstance> serviceInstances) {
Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + serviceId);
}
return new EmptyResponse();
}
// Do not move position when there is only 1 instance, especially some suppliers
// have already filtered instances
if (instances.size() == 1) {
return new DefaultResponse(instances.get(0));
}
// Ignore the sign bit, this allows pos to loop sequentially from 0 to
// Integer.MAX_VALUE
int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;
ServiceInstance instance = instances.get(pos % instances.size());
return new DefaultResponse(instance);
}
}
LoadBalancerInterceptor
/**
* 拦截器 委托LoadBalancerClient执行
*
* @author Spencer Gibb
* @author Dave Syer
* @author Ryan Baxter
* @author William Tran
*/
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
// 主机名就是host
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
/**
* 使用 LoadBalancerRequestFactory 构造出 LoadBalancerRequest,
* 构造逻辑其实就是使用 LoadBalancerRequestTransformer 对 HttpRequest 进行增强
* 然后委托给 LoadBalancerClient 执行请求
*
* 默认是这个实现类
* {@link org.springframework.cloud.loadbalancer.blocking.client.BlockingLoadBalancerClient#execute(String, LoadBalancerRequest)}
* */
return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
}
ServiceInstanceListSupplier
有不同的实现策略。
/**
* {@link Supplier} 的 {@link ServiceInstance} 对象列表。
*
* @author Olga Maciaszek-Sharma
* @since 2.2.0
*/
public interface ServiceInstanceListSupplier extends Supplier<Flux<List<ServiceInstance>>> {
String getServiceId();
default Flux<List<ServiceInstance>> get(Request request) {
return get();
}
static ServiceInstanceListSupplierBuilder builder() {
return new ServiceInstanceListSupplierBuilder();
}
}
总结
- 解析@LoadBalancerClient、@LoadBalancerClients注解元信息,向容器中注册LoadBalancerClientSpecification。
- 配置LoadBalancerClientFactory,其中包含了所有的LoadBalancerClientSpecification配置。对于每一个客户端对应一个应用程序上下文以此来实现不同的客户端不能的负载均衡配置。
- 收集容器中添加了@LoadBalanced注解的RestTemplate。为他们添加LoadBalancerInterceptor。
- LoadBalancerInterceptor会拦截请求,委托给LoadBalancerClient去执行。
- BlockingLoadBalancerClient会通过loadBalancerClientFactory查找客户端对应的loadBalancer。
- 默认的实现是基于轮训策略的RoundRobinLoadBalancer。RoundRobinLoadBalancer利用ServiceInstanceListSupplier返回的服务实例列表,轮训出一个实例返回。
- BlockingLoadBalancerClient请求负载均衡出的服务实例。