spring异步@Async
大约 13 分钟
spring异步@Async
概述
@EnableAsync
会创建用来解析@Async
注解的Advisor。但是这个adcisor没有注入到容器,而是通过bean的后置处理器应用Advisor。
拦截的逻辑是类或者方法上有@Async的注解,增强的逻辑是使用指定或者默认的线程池执行方法。此时会为原来的bean创建代理对象。
@Async
标注在类上,类中所有的方法都会被代理@Async
标注在方法上,方法会被代理@Async("beanName")
通过注解值,指定方法要使用的的Executor
注:equals、hashCode、toString 是不会代理的
@Async
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
/**
* 指示要在任一类中检测到的“异步”注释类型
* 或方法级别。
* <p>默认情况下,Spring 的 @{@link Async} 注解和 EJB 3.1
* 将检测到 {@code @javax.ejb.Asynchronous} 注解。
* <p>此属性存在,以便开发人员可以提供自己的属性
* 自定义注解类型,以指示一个方法(或所有方法
* 给定的类)应异步调用。
*/
Class<? extends Annotation> annotation() default Annotation.class;
/**
* 指示是否要创建基于子类的 (CGLIB) 代理,而不是
* 到基于标准 Java 接口的代理。
* <p><strong>仅当 {@link #mode} 设置为 {@link AdviceMode#PROXY} 时才适用</strong>。
* <p>默认值为 {@code false}。
* <p>请注意,将此属性设置为 {@code true} 将影响<em>所有</em>
* Spring 管理的 Bean 需要代理,而不仅仅是那些标有 {@code @Async} 的 bean。
* 例如,其他标有 Spring 的 {@code @Transactional} 注解的 bean
* 将同时升级为子类代理。这种方法没有
* 在实践中产生负面影响,除非人们明确期望一种类型的代理
* 与另一个 — 例如,在测试中。
*/
boolean proxyTargetClass() default false;
/**
* Indicate how async advice should be applied.
* <p><b>The default is {@link AdviceMode#PROXY}.</b>
* Please note that proxy mode allows for interception of calls through the proxy
* only. Local calls within the same class cannot get intercepted that way; an
* {@link Async} annotation on such a method within a local call will be ignored
* since Spring's interceptor does not even kick in for such a runtime scenario.
* For a more advanced mode of interception, consider switching this to
* {@link AdviceMode#ASPECTJ}.
*/
AdviceMode mode() default AdviceMode.PROXY;
/**
* Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
* should be applied.
* <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run
* after all other post-processors, so that it can add an advisor to
* existing proxies rather than double-proxy.
*/
int order() default Ordered.LOWEST_PRECEDENCE;
}
AsyncConfigurationSelector
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
/**
* Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
* for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
* respectively.
*/
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[]{ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[]{ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
ProxyAsyncConfiguration
/**
* {@code @Configuration} 类,用于注册必要的 Spring 基础架构 Bean
* 启用基于代理的异步方法执行。
*
* @author Chris Beams
* @author Stephane Nicoll
* @author Juergen Hoeller
* @since 3.1
* @see EnableAsync
* @see AsyncConfigurationSelector
*/
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
/**
* enableAsync 这个属性肯定,不是null,因为是 @Bean 所以创建之前会先创建其配置类,也就是创建 ProxyAsyncConfiguration
*
* 初始化ProxyAsyncConfiguration时,会进行依赖注入也就是会执行 {@link AbstractAsyncConfiguration#setConfigurers(Collection)}
* 会从容器中拿到 AsyncConfigurer 类型的bean,给这两个属性赋值
* {@link AbstractAsyncConfiguration#exceptionHandler}
* {@link AbstractAsyncConfiguration#executor}
*
* 又因为 ProxyAsyncConfiguration 实现了 ImportAware 接口,所以 ImportAwareBeanPostProcessor 后置处理器
* 会回调 {@link AbstractAsyncConfiguration#setImportMetadata(AnnotationMetadata)} ,给 enableAsync 这个字段赋值。
*
* enableAsync 就是 @EnableAsync 注解的元数据信息
* */
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
/**
* 这两个参数的赋值,是在父类通过自动注入实现的
* {@link AbstractAsyncConfiguration#setConfigurers(Collection)}
* */
bpp.configure(this.executor, this.exceptionHandler);
// 注解的参数值
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
// 有 customAsyncAnnotation 就设置
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
AsyncAnnotationBeanPostProcessor
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
/**
* The default name of the {@link TaskExecutor} bean to pick up: "taskExecutor".
* <p>Note that the initial lookup happens by type; this is just the fallback
* in case of multiple executor beans found in the context.
* @since 4.2
* @see AnnotationAsyncExecutionInterceptor#DEFAULT_TASK_EXECUTOR_BEAN_NAME
*/
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME =
AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;
protected final Log logger = LogFactory.getLog(getClass());
@Nullable
private Supplier<Executor> executor;
@Nullable
private Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
@Nullable
private Class<? extends Annotation> asyncAnnotationType;
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
/**
* 实例化 AsyncAnnotationAdvisor,是这个类型的Advisor
* */
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
}
AsyncAnnotationAdvisor
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
/**
* Async 就是Pointcut匹配的时候会用到
* */
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
} catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
/**
* 默认就一个 @Async
* */
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
/**
* 这个是匹配类的,就是 类有@Async就行
* */
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
/**
* 这个是匹配方法的,就是 方法有@Async就行
* */
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
} else {
/**
* union 简单一点就是,一个满足就是true
* {@link MethodMatchers.UnionMethodMatcher#matches(Method, Class, Object...)}
* {@link MethodMatchers.UnionMethodMatcher#matches(Method, Class)}
*
* {@link ClassFilters.UnionClassFilter#matches(Class)}
*
* 在这里用的 {@link {@link AopUtils#canApply(Pointcut, Class, boolean)}
* 就是 {@link MethodMatchers.UnionMethodMatcher#matches(Method, Class)}}
*
* 所以union的结果就是 类有@Async或者方法有@Async就行
* */
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
AsyncExecutionInterceptor
AsyncExecutionInterceptor:
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
/**
* 拿到方法对应的Executor。
*
* @Async的查找顺序:方法 -> 方法声明的类
*
* 有注解@Async("beanName"),就通过beanName从容器中获取Executor,拿不到直接报错
* `@Async` 没有指定beanName,就使用默认的Executor {@link AsyncExecutionAspectSupport#defaultExecutor},会在构造器设置这个属性
*
* 构造器 {@link AsyncExecutionAspectSupport#AsyncExecutionAspectSupport(Executor)}
* defaultExecutor 是 {@link org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers(Collection)} 设置的
* `this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));`
* 没有 defaultExecutor 也会执行{@link AsyncExecutionInterceptor#getDefaultExecutor(BeanFactory)} 拿到一个
* 从BeanFactory中找 先找TaskExecutor,没有在找Executor 类型的bean
* 找不到就 `new SimpleAsyncTaskExecutor()`
*
* Tips:方法的Executor,可以通过 @Async("beanName") 注解的值拿到,在BeanFactory中找,找不到就报错。没有设置注解值,就找默认的,
* 默认查找顺序:AsyncConfigurer -> TaskExecutor -> Executor -> `new SimpleAsyncTaskExecutor()`
* */
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
// 没得 Executor 就直接报错咯
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Callable<Object> task = () -> {
try {
// 执行方法
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
} catch (ExecutionException ex) {
// 异常处理
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
} catch (Throwable ex) {
// 异常处理
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
/**
* 提交异步任务,就是使用Executor执行任务
* 支持三种特殊的返回值类型 CompletableFuture、ListenableFuture、Future 会有返回值,其他的返回null
* */
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
postProcessAfterInitialization
public Object postProcessAfterInitialization(Object bean, String beanName) {
// 忽略诸如作用域代理之类的 AOP 基础设施
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
/**
* Advised 类型,扩展当前 advisor 给他
*
* 因为Spring使用Cglib、JDK 创建的代理对象都会添加 Advised 接口,所以这里的目的是 看看被代理类是否符合Pointcut的规则
* {@link CglibAopProxy#getProxy(ClassLoader)}
* {@link AopProxyUtils#completeProxiedInterfaces(AdvisedSupport)}
*
* {@link JdkDynamicAopProxy#JdkDynamicAopProxy(AdvisedSupport)}
* {@link AopProxyUtils#completeProxiedInterfaces(AdvisedSupport, boolean)}
* */
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
/**
* `!advised.isFrozen()` 不是冻结的,表示还可以扩展 Advisor,因为如果是冻结的会在创建代理Cglib代理对象的时候就解析Advisor设置CallBack
* {@link CglibAopProxy#getCallbacks(Class)},而不是冻结的 则是在 调用代理对象的方法时才会解析Advisor
*
* `opUtils.getTargetClass(bean)` 拿到被代理对象,如果是cglib代理就是拿到父类咯
*
* isEligible 是符合条件的,就是 类有@Async 或者 方法有@Async 才会是true
*
* */
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
/**
* 将 @Async 的增强逻辑,添加到 被代理类中,也就是扩展被代理类的Advisor
* */
// Add our local Advisor to the existing proxy's Advisor chain...
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
} else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
/**
* isEligible 是符合条件的。
* 不是原始类 且 (类有@Async 或者 方法有@Async 才会是true)
*
* {@link AbstractBeanFactoryAwareAdvisingPostProcessor#isEligible(Object, String)}
* */
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
/**
* 设置Advisor
* 用的这个 {@link AnnotationAsyncExecutionInterceptor}
* */
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
// Use original ClassLoader if bean class not locally loaded in overriding class loader
ClassLoader classLoader = getProxyClassLoader();
if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) {
classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader();
}
// 创建代理对象
return proxyFactory.getProxy(classLoader);
}
// No proxy needed.
return bean;
}