spring异步@Async

fxz大约 13 分钟

spring异步@Async

概述

文档open in new window

@EnableAsync 会创建用来解析@Async注解的Advisor。但是这个adcisor没有注入到容器,而是通过bean的后置处理器应用Advisor。

拦截的逻辑是类或者方法上有@Async的注解,增强的逻辑是使用指定或者默认的线程池执行方法。此时会为原来的bean创建代理对象

  • @Async 标注在类上,类中所有的方法都会被代理
  • @Async 标注在方法上,方法会被代理
  • @Async("beanName") 通过注解值,指定方法要使用的的Executor

注:equals、hashCode、toString 是不会代理的

@Async

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

    /**
     * 指示要在任一类中检测到的“异步”注释类型
     * 或方法级别。
     * <p>默认情况下,Spring 的 @{@link Async} 注解和 EJB 3.1
     * 将检测到 {@code @javax.ejb.Asynchronous} 注解。
     * <p>此属性存在,以便开发人员可以提供自己的属性
     * 自定义注解类型,以指示一个方法(或所有方法
     * 给定的类)应异步调用。
     */
    Class<? extends Annotation> annotation() default Annotation.class;

    /**
     * 指示是否要创建基于子类的 (CGLIB) 代理,而不是
     * 到基于标准 Java 接口的代理。
     * <p><strong>仅当 {@link #mode} 设置为 {@link AdviceMode#PROXY} 时才适用</strong>。
     * <p>默认值为 {@code false}。
     * <p>请注意,将此属性设置为 {@code true} 将影响<em>所有</em>
     * Spring 管理的 Bean 需要代理,而不仅仅是那些标有 {@code @Async} 的 bean。
     * 例如,其他标有 Spring 的 {@code @Transactional} 注解的 bean
     * 将同时升级为子类代理。这种方法没有
     * 在实践中产生负面影响,除非人们明确期望一种类型的代理
     * 与另一个 — 例如,在测试中。
     */
    boolean proxyTargetClass() default false;

    /**
     * Indicate how async advice should be applied.
     * <p><b>The default is {@link AdviceMode#PROXY}.</b>
     * Please note that proxy mode allows for interception of calls through the proxy
     * only. Local calls within the same class cannot get intercepted that way; an
     * {@link Async} annotation on such a method within a local call will be ignored
     * since Spring's interceptor does not even kick in for such a runtime scenario.
     * For a more advanced mode of interception, consider switching this to
     * {@link AdviceMode#ASPECTJ}.
     */
    AdviceMode mode() default AdviceMode.PROXY;

    /**
     * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
     * should be applied.
     * <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run
     * after all other post-processors, so that it can add an advisor to
     * existing proxies rather than double-proxy.
     */
    int order() default Ordered.LOWEST_PRECEDENCE;

}

AsyncConfigurationSelector

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
            "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";


    /**
     * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
     * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
     * respectively.
     */
    @Override
    @Nullable
    public String[] selectImports(AdviceMode adviceMode) {
        switch (adviceMode) {
            case PROXY:
                return new String[]{ProxyAsyncConfiguration.class.getName()};
            case ASPECTJ:
                return new String[]{ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
            default:
                return null;
        }
    }

}

ProxyAsyncConfiguration

/**
 * {@code @Configuration} 类,用于注册必要的 Spring 基础架构 Bean
 * 启用基于代理的异步方法执行。
 *
 * @author Chris Beams
 * @author Stephane Nicoll
 * @author Juergen Hoeller
 * @since 3.1
 * @see EnableAsync
 * @see AsyncConfigurationSelector
 */
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)  
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

    @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
      
        /**
         * enableAsync 这个属性肯定,不是null,因为是 @Bean 所以创建之前会先创建其配置类,也就是创建 ProxyAsyncConfiguration
         *
         * 初始化ProxyAsyncConfiguration时,会进行依赖注入也就是会执行 {@link AbstractAsyncConfiguration#setConfigurers(Collection)}
         *  会从容器中拿到 AsyncConfigurer 类型的bean,给这两个属性赋值
         *      {@link AbstractAsyncConfiguration#exceptionHandler}
         *      {@link AbstractAsyncConfiguration#executor}
         *
         * 又因为 ProxyAsyncConfiguration 实现了 ImportAware 接口,所以 ImportAwareBeanPostProcessor 后置处理器
         * 会回调 {@link AbstractAsyncConfiguration#setImportMetadata(AnnotationMetadata)} ,给 enableAsync 这个字段赋值。
         *
         * enableAsync 就是 @EnableAsync 注解的元数据信息
         * */
        Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
        AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
        /**
         * 这两个参数的赋值,是在父类通过自动注入实现的
         * {@link AbstractAsyncConfiguration#setConfigurers(Collection)}
         * */
        bpp.configure(this.executor, this.exceptionHandler);
        // 注解的参数值
        Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
        //  有 customAsyncAnnotation 就设置
        if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
            bpp.setAsyncAnnotationType(customAsyncAnnotation);
        }
        bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
        bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
        return bpp;
    }

}

AsyncAnnotationBeanPostProcessor

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {

    /**
     * The default name of the {@link TaskExecutor} bean to pick up: "taskExecutor".
     * <p>Note that the initial lookup happens by type; this is just the fallback
     * in case of multiple executor beans found in the context.
     * @since 4.2
     * @see AnnotationAsyncExecutionInterceptor#DEFAULT_TASK_EXECUTOR_BEAN_NAME
     */
    public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME =
            AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;


    protected final Log logger = LogFactory.getLog(getClass());

    @Nullable
    private Supplier<Executor> executor;

    @Nullable
    private Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;

    @Nullable
    private Class<? extends Annotation> asyncAnnotationType;



    @Override
    public void setBeanFactory(BeanFactory beanFactory) {
        super.setBeanFactory(beanFactory);
        /**
         * 实例化 AsyncAnnotationAdvisor,是这个类型的Advisor
         * */
        AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
        if (this.asyncAnnotationType != null) {
            advisor.setAsyncAnnotationType(this.asyncAnnotationType);
        }
        advisor.setBeanFactory(beanFactory);
        this.advisor = advisor;
    }

}

AsyncAnnotationAdvisor

public AsyncAnnotationAdvisor(
        @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

    /**
     * Async 就是Pointcut匹配的时候会用到
     * */
    Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
    asyncAnnotationTypes.add(Async.class);
    try {
        asyncAnnotationTypes.add((Class<? extends Annotation>)
                ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
    } catch (ClassNotFoundException ex) {
        // If EJB 3.1 API not present, simply ignore.
    }
    this.advice = buildAdvice(executor, exceptionHandler);
    this.pointcut = buildPointcut(asyncAnnotationTypes);
}



 protected Advice buildAdvice(
            @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

        AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
        interceptor.configure(executor, exceptionHandler);
        return interceptor;
    }


protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
    ComposablePointcut result = null;
    /**
     * 默认就一个 @Async
     * */
    for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
        /**
         * 这个是匹配类的,就是 类有@Async就行
         * */
        Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
        /**
         * 这个是匹配方法的,就是 方法有@Async就行
         * */
        Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
        if (result == null) {
            result = new ComposablePointcut(cpc);
        } else {
            /**
             * union 简单一点就是,一个满足就是true
             *  {@link MethodMatchers.UnionMethodMatcher#matches(Method, Class, Object...)}
             *  {@link MethodMatchers.UnionMethodMatcher#matches(Method, Class)}
             *
             *  {@link ClassFilters.UnionClassFilter#matches(Class)}
             *
             *  在这里用的 {@link {@link AopUtils#canApply(Pointcut, Class, boolean)}
             *  就是 {@link MethodMatchers.UnionMethodMatcher#matches(Method, Class)}}
             *
             * 所以union的结果就是 类有@Async或者方法有@Async就行
             * */
            result.union(cpc);
        }
        result = result.union(mpc);
    }
    return (result != null ? result : Pointcut.TRUE);
}

AsyncExecutionInterceptor

AsyncExecutionInterceptor:

public Object invoke(final MethodInvocation invocation) throws Throwable {
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
    /**
     * 拿到方法对应的Executor。
     *
     * @Async的查找顺序:方法 -> 方法声明的类
     *
     *  有注解@Async("beanName"),就通过beanName从容器中获取Executor,拿不到直接报错
     * `@Async` 没有指定beanName,就使用默认的Executor {@link AsyncExecutionAspectSupport#defaultExecutor},会在构造器设置这个属性
     *
     * 构造器 {@link AsyncExecutionAspectSupport#AsyncExecutionAspectSupport(Executor)}
     *     defaultExecutor 是 {@link org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers(Collection)} 设置的
     *    `this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));`
     *          没有 defaultExecutor 也会执行{@link AsyncExecutionInterceptor#getDefaultExecutor(BeanFactory)} 拿到一个
     *          从BeanFactory中找 先找TaskExecutor,没有在找Executor 类型的bean
     *          找不到就 `new SimpleAsyncTaskExecutor()`
     *
     * Tips:方法的Executor,可以通过 @Async("beanName") 注解的值拿到,在BeanFactory中找,找不到就报错。没有设置注解值,就找默认的,
     *      默认查找顺序:AsyncConfigurer -> TaskExecutor -> Executor -> `new SimpleAsyncTaskExecutor()`
     * */
    AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
    // 没得 Executor 就直接报错咯
    if (executor == null) {
        throw new IllegalStateException(
                "No executor specified and no default executor set on AsyncExecutionInterceptor either");
    }

    Callable<Object> task = () -> {
        try {
            // 执行方法
            Object result = invocation.proceed();
            if (result instanceof Future) {
                return ((Future<?>) result).get();
            }
        } catch (ExecutionException ex) {
            // 异常处理
            handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
        } catch (Throwable ex) {
            // 异常处理
            handleError(ex, userDeclaredMethod, invocation.getArguments());
        }
        return null;
    };

    /**
     * 提交异步任务,就是使用Executor执行任务
     * 支持三种特殊的返回值类型 CompletableFuture、ListenableFuture、Future 会有返回值,其他的返回null
     * */
    return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

postProcessAfterInitialization

public Object postProcessAfterInitialization(Object bean, String beanName) {
    // 忽略诸如作用域代理之类的 AOP 基础设施
    if (this.advisor == null || bean instanceof AopInfrastructureBean) {
        // Ignore AOP infrastructure such as scoped proxies.
        return bean;
    }
    /**
     * Advised 类型,扩展当前 advisor 给他
     *
     * 因为Spring使用Cglib、JDK 创建的代理对象都会添加 Advised 接口,所以这里的目的是 看看被代理类是否符合Pointcut的规则
     *      {@link CglibAopProxy#getProxy(ClassLoader)}
     *      {@link AopProxyUtils#completeProxiedInterfaces(AdvisedSupport)}
     *
     *      {@link JdkDynamicAopProxy#JdkDynamicAopProxy(AdvisedSupport)}
     *      {@link AopProxyUtils#completeProxiedInterfaces(AdvisedSupport, boolean)}
     * */
    if (bean instanceof Advised) {
        Advised advised = (Advised) bean;
        /**
         * `!advised.isFrozen()` 不是冻结的,表示还可以扩展 Advisor,因为如果是冻结的会在创建代理Cglib代理对象的时候就解析Advisor设置CallBack
         *      {@link CglibAopProxy#getCallbacks(Class)},而不是冻结的 则是在 调用代理对象的方法时才会解析Advisor
         *
         * `opUtils.getTargetClass(bean)` 拿到被代理对象,如果是cglib代理就是拿到父类咯
         *
         * isEligible 是符合条件的,就是 类有@Async 或者 方法有@Async 才会是true
         *
         * */
        if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
            /**
             * 将 @Async 的增强逻辑,添加到 被代理类中,也就是扩展被代理类的Advisor
             * */
            // Add our local Advisor to the existing proxy's Advisor chain...
            if (this.beforeExistingAdvisors) {
                advised.addAdvisor(0, this.advisor);
            } else {
                advised.addAdvisor(this.advisor);
            }
            return bean;
        }
    }
    /**
     * isEligible 是符合条件的。
     * 不是原始类 且  (类有@Async 或者 方法有@Async 才会是true)
     *
     * {@link AbstractBeanFactoryAwareAdvisingPostProcessor#isEligible(Object, String)}
     * */
    if (isEligible(bean, beanName)) {
        ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
        if (!proxyFactory.isProxyTargetClass()) {
            evaluateProxyInterfaces(bean.getClass(), proxyFactory);
        }
        /**
         * 设置Advisor
         * 用的这个 {@link AnnotationAsyncExecutionInterceptor}
         * */
        proxyFactory.addAdvisor(this.advisor);
        customizeProxyFactory(proxyFactory);

        // Use original ClassLoader if bean class not locally loaded in overriding class loader
        ClassLoader classLoader = getProxyClassLoader();
        if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) {
            classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader();
        }
        // 创建代理对象
        return proxyFactory.getProxy(classLoader);
    }

    // No proxy needed.
    return bean;
}