spring事务

fxz大约 89 分钟

spring事务

  • @EnableTransactionManagement会import到容器中TransactionManagementConfigurationSelector。
  • TransactionManagementConfigurationSelector会向容器注册相关的切面、切点、增强逻辑。
  • 切点逻辑是匹配事务注解的方法。
  • 增强逻辑是TransactionInterceptor实现的。

关键类

PlatformTransactionManager

事务管理器,定义了实现事务的规范。

public interface PlatformTransactionManager extends TransactionManager {
  
		// 开启事务
    TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
          throws TransactionException;
  
		// 提交事务
    void commit(TransactionStatus status) throws TransactionException;
  
		// 回滚事务
    void rollback(TransactionStatus status) throws TransactionException;
}

AbstractPlatformTransactionManager

  1. 判断当前是否已经存在一个事务
  2. 应用合适的事务传播行为
  3. 在必要的时候挂起/恢复事务
  4. 提交时检查事务是否被标记成为rollback-only
  5. 在回滚时做适当的修改(是执行真实的回滚/还是将事务标记成rollback-only
  6. 触发注册的同步回调

TransactionDefinition

对事务定义的抽象,这些定义有些是数据库层面本身就有的,例如隔离级别、是否只读、超时时间、名称

public interface TransactionDefinition {
// 定义了7中事务的传播机制
int PROPAGATION_REQUIRED = 0;
int PROPAGATION_SUPPORTS = 1;
int PROPAGATION_MANDATORY = 2;
int PROPAGATION_REQUIRES_NEW = 3;
int PROPAGATION_NOT_SUPPORTED = 4;
int PROPAGATION_NEVER = 5;
int PROPAGATION_NESTED = 6;

// 4种隔离级别,-1代表的是使用数据库默认的隔离级别
// 比如在MySQL下,使用的就是ISOLATION_REPEATABLE_READ(可重复读)
int ISOLATION_DEFAULT = -1;
int ISOLATION_READ_UNCOMMITTED = 1;  
int ISOLATION_READ_COMMITTED = 2; 
int ISOLATION_REPEATABLE_READ = 4; 
int ISOLATION_SERIALIZABLE = 8;  

// 事务的超时时间,默认不限制时间
int TIMEOUT_DEFAULT = -1;

// 提供了对上面三个属性的get方法
default int getPropagationBehavior() {
	return PROPAGATION_REQUIRED;
}
default int getIsolationLevel() {
	return ISOLATION_DEFAULT;
}
default int getTimeout() {
	return TIMEOUT_DEFAULT;
}

// 事务是否是只读的,默认不是
default boolean isReadOnly() {
	return false;
}

// 事务的名称
@Nullable
default String getName() {
	return null;
}

// 返回一个只读的TransactionDefinition
// 只对属性提供了getter方法,所有属性都是接口中定义的默认值
static TransactionDefinition withDefaults() {
	return StaticTransactionDefinition.INSTANCE;
}
}
public class DefaultTransactionDefinition implements TransactionDefinition, Serializable {


    public static final String PREFIX_PROPAGATION = "PROPAGATION_";
    public static final String PREFIX_ISOLATION = "ISOLATION_";
    public static final String PREFIX_TIMEOUT = "timeout_";
    public static final String READ_ONLY_MARKER = "readOnly";

    static final Constants constants = new Constants(TransactionDefinition.class);

    private int propagationBehavior = PROPAGATION_REQUIRED;

    private int isolationLevel = ISOLATION_DEFAULT;

    private int timeout = TIMEOUT_DEFAULT;

    private boolean readOnly = false;

    @Nullable
    private String name;
  }
public interface TransactionAttribute extends TransactionDefinition {

    /**
     * 返回与此事务属性关联的限定符值。
     * <p>这可用于选择相应的事务管理器
     * 处理此特定交易。
     * @since 3.0
     */
    @Nullable
    String getQualifier();

    /**
     * 返回与此交易属性关联的标签。
     * <p>这可用于应用特定的事务行为
     * 或遵循纯粹的描述性。
     * @since 5.3
     */
    Collection<String> getLabels();

    /**
     * 我们是否应该回滚给定的异常
     * @param ex the exception to evaluate
     * @return whether to perform a rollback or not
     */
    boolean rollbackOn(Throwable ex);

}
DefaultTransactionAttribute:

@Override
public boolean rollbackOn(Throwable ex) {
  	// 默认只会滚RuntimeException和Error类型
    return (ex instanceof RuntimeException || ex instanceof Error);
}
RuleBasedTransactionAttribute:

@Override
public boolean rollbackOn(Throwable ex) {
    RollbackRuleAttribute winner = null;
    int deepest = Integer.MAX_VALUE;

    if (this.rollbackRules != null) {
        /**
         * 规则是啥看这里 {@link SpringTransactionAnnotationParser#parseTransactionAnnotation(AnnotationAttributes)}
         * */
        for (RollbackRuleAttribute rule : this.rollbackRules) {
            /**
             * depth = -1 表示没匹配到
             * */
            int depth = rule.getDepth(ex);
            if (depth >= 0 && depth < deepest) {
                deepest = depth;
                // 记录匹配的规则
                winner = rule;
            }
        }
    }

    // User superclass behavior (rollback on unchecked) if no rule matches.
    if (winner == null) {
        /**
         * 使用父类匹配规则 {@link DefaultTransactionAttribute#rollbackOn(Throwable)}
         *  很简单 `return (ex instanceof RuntimeException || ex instanceof Error);`
         * */
        return super.rollbackOn(ex);
    }
    // 不是 NoRollbackRuleAttribute 就回滚
    return !(winner instanceof NoRollbackRuleAttribute);
}

TransactionStatus

描述创建后的事务的状态

public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {

    // 于判断当前事务是否设置了保存点
    boolean hasSavepoint();

   	// 复写了父接口Flushable中的方法
		// 主要用于刷新会话
		// 对于Hibernate/jpa而言就是调用了其session/entityManager的flush方法
    @Override
    void flush();

}
// 判断当前事务是否是一个新的事务
// 不是一个新事务的话,那么需要加入到已经存在的事务中
boolean isNewTransaction();

// 事务是否被标记成RollbackOnly
// 如果被标记成了RollbackOnly,意味着事务只能被回滚
void setRollbackOnly(); 
boolean isRollbackOnly();

// 是否事务完成,回滚或提交都意味着事务完成了
boolean isCompleted();

// 创建保存点
Object createSavepoint() throws TransactionException;

// 回滚到指定保存点
void rollbackToSavepoint(Object savepoint) throws TransactionException;

// 移除回滚点
void releaseSavepoint(Object savepoint) throws TransactionException;

TransactionSynchronizationManager

管理资源同步和行为同步。

资源同步:数据库连接就是跟这个事务同步的一个资源。

行为同步:在事务开启之前我们需要先获取一个数据库连接,同样的在事务提交时我们需要将连接关闭(不一定是真正的关闭,如果是连接池只是归还到连接池中),这个时候关闭连接这个行为也需要跟事务进行同步。

public abstract class TransactionSynchronizationManager {

    /**
     * 事务资源,就是当前事物内涉及到的所有资源(数据库连接)
     *<p/></p?>
     * 比如数据库连接:
     *      Key:DataSource 对象
     *      Value:ConnectionHolder
     *<p/>
     * 什么时候会设置值:<p/>
     *      1. 开启新事务时,会通过 DataSource 获取连接,并将连接存到这里<p/>
     *      2. 执行 {@link org.springframework.jdbc.datasource.DataSourceUtils#getConnection(DataSource)} 获取连接,使用 DataSource 做为key从 resources中找不到连接,
     *          就会使用 DataSource获取连接,存到 synchronizations 和 这里<p/>
     *      注:前提是在事务内执行。简单来说就是Java虚拟机栈中存在@Transactional的方法(就是有{@link TransactionInterceptor#invoke(MethodInvocation)})
     *<p/>
     * 什么时候移除key对应属性值:
     *      - 暂停当前事务
     *      - 完成当前事务(rollback或者commit)
     *      Tips:{@link TransactionSynchronizationManager#doUnbindResource(Object)}
     */
    private static final ThreadLocal<Map<Object, Object>> resources =
            new NamedThreadLocal<>("Transactional resources");

    /**
     * 事务同步资源,就是在事务中产生的 非事务管理器数据源生成的连接或者是用于在事务完成时(rollback或者commit)要触发事件,都算是事务同步资源
     *<p/>
     * 比如:
     *      - ConnectionSynchronization
     *          使用工具类获取连接会设置 {@link org.springframework.jdbc.datasource.DataSourceUtils#doGetConnection(DataSource)}
     *          这种连接和事务连接不一样,是直接通过数据源获取的连接,不会配置自动提交、超时时间、隔离级别,默认是啥就是啥,
     *          唯一的作用就是在事务完成时(rollback或者commit) 释放掉这些连接
     *          比如:获取连接用的数据源(d1)和事务管理的数据源(d2)不是同一个时,就会将d1创建的连接装饰成 ConnectionSynchronization,
     *              并将该对象存到 synchronizations 属性中,然后对应的连接也会存到注册到 resources 中
     *
     *      - TransactionalApplicationListenerSynchronization
     *          发布事务事件时会设置 {@link TransactionalApplicationListenerMethodAdapter#onApplicationEvent(ApplicationEvent)}
     *          作用就是在事务完成时(rollback或者commit) 回调其 TransactionalApplicationListener、SynchronizationCallback
     *<p/>
     * 什么时候会设置值:简单来说就是Java虚拟机栈中存在@Transactional的方法(就是有{@link TransactionInterceptor#invoke(MethodInvocation)})
     *      具体一点就是执行完 {@link TransactionAspectSupport#createTransactionIfNecessary(PlatformTransactionManager, TransactionAttribute, String)}
     *      该属性就会被初始化,在使用过程中会根据 `synchronizations.get() != null ` 来判断是激活了事务同步 {@link TransactionSynchronizationManager#isSynchronizationActive()}
     *<p/>
     * 什么时候清空该属性值:
     *      1. 暂停当前事务
     *      2. 完成当前事务(rollback或者commit)
     *      Tips:{@link TransactionSynchronizationManager#clear()}
     */
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
            new NamedThreadLocal<>("Transaction synchronizations");

    private static final ThreadLocal<String> currentTransactionName =
            new NamedThreadLocal<>("Current transaction name");

    private static final ThreadLocal<Boolean> currentTransactionReadOnly =
            new NamedThreadLocal<>("Current transaction read-only status");

    private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
            new NamedThreadLocal<>("Current transaction isolation level");

    /**
     * 实际激活的事务。
     *  - 当前线程的事务不是空事务 就是 true
     *  - 空事务 或者 执行非事务方法就是 false
     */
    private static final ThreadLocal<Boolean> actualTransactionActive =
            new NamedThreadLocal<>("Actual transaction active");
  
}

TransactionSynchronization

行为的同步

public interface TransactionSynchronization extends Flushable {
	// 事务完成的状态
    // 0 提交
    // 1 回滚
    // 2 异常状态,例如在事务执行时出现异常,然后回滚,回滚时又出现异常
    // 就会被标记成状态2
	int STATUS_COMMITTED = 0;
	int STATUS_ROLLED_BACK = 1;
	int STATUS_UNKNOWN = 2;

    // 我们绑定的这些TransactionSynchronization需要跟事务同步
    // 1.如果事务挂起,我们需要将其挂起
    // 2.如果事务恢复,我们需要将其恢复
	default void suspend() {
	}
	default void resume() {
	}
	@Override
	default void flush() {
	}
	
  // 在事务执行过程中,提供的一些回调方法
	default void beforeCommit(boolean readOnly) {
	}
	default void beforeCompletion() {
	}
	default void afterCommit() {
	}
	default void afterCompletion(int status) {
	}

}

@EnableTransactionManagement

@EnableTransactionManagement注解用于启用Spring的事务管理功能。

  1. @EnableTransactionManagement:

    • 使用@EnableTransactionManagement会导入@Import(TransactionManagementConfigurationSelector.class)
    • 解析配置类时,因为TransactionManagementConfigurationSelector的父类AdviceModeImportSelector实现了ImportSelector,所以会回调AdviceModeImportSelector#selectImports(AnnotationMetadata)方法。
    • 在这个回调方法中,获取@EnableTransactionManagement(mode = AdviceMode.PROXY)注解的mode属性值,回调子类方法TransactionManagementConfigurationSelector#selectImports(AdviceMode)
  2. TransactionManagementConfigurationSelector的工作流程:

    • TransactionManagementConfigurationSelector#selectImports(AdviceMode)方法返回两个类:AutoProxyRegistrarProxyTransactionManagementConfiguration,将它们添加到BeanDefinitionMap中。
  3. AutoProxyRegistrar:

    • 继承ImportBeanDefinitionRegistrar,所以解析@Import时会回调AutoProxyRegistrar#registerBeanDefinitions(AnnotationMetadata, BeanDefinitionRegistry)方法。
    • 该方法会执行AopConfigUtils#registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry),注册InfrastructureAdvisorAutoProxyCreator到容器中。
    • InfrastructureAdvisorAutoProxyCreator是BeanPostProcessor,在实例化前、提前AOP、初始后判断bean是否要进行代理。
  4. ProxyTransactionManagementConfiguration:

    • 继承AbstractTransactionManagementConfiguration
    • 通过@Bean注册TransactionalEventListenerFactory,用于处理@TransactionalEventListener标注的方法,将方法构造成事件监听器,注册到事件广播器中。
    • 通过@Bean注册Advisor、Advisor的Advice和AnnotationTransactionAttributeSource。
    • BeanFactoryTransactionAttributeSourceAdvisor实现PointcutAdvisor接口,决定是否进行代理的依据是PointcutAdvisor#getPointcut(),其Pointcut是TransactionAttributeSourcePointcut
    • TransactionInterceptor是Advice,实现了事务增强逻辑。
    • Advisor和Advice都依赖AnnotationTransactionAttributeSource这个bean,用来查找、解析@Transactional。在方法上找、类上找。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {

    /**
     * 指示是否要创建基于子类的 (CGLIB) 代理 ({@code true}) 为
     * 与基于标准 Java 接口的代理相反 ({@code false})。默认值为
     * {@code false}<strong>仅当 {@link #mode()} 设置为
     * {@link AdviceMode#PROXY}</strong>。
     * <p>请注意,将此属性设置为 {@code true} 将影响<em>所有</em>
     * Spring 管理的 Bean 需要代理,而不仅仅是那些标有
     * {@code @Transactional}。例如,标有 Spring 的其他豆子
     * {@code @Async} 注解将同时升级为子类代理
     *时间。这种方法在实践中没有负面影响,除非明确
     * 期望一种类型的代理与另一种类型的代理,例如在测试中。
     */
    boolean proxyTargetClass() default false;

    /**
     * Indicate how transactional advice should be applied.
     * <p><b>The default is {@link AdviceMode#PROXY}.</b>
     * Please note that proxy mode allows for interception of calls through the proxy
     * only. Local calls within the same class cannot get intercepted that way; an
     * {@link Transactional} annotation on such a method within a local call will be
     * ignored since Spring's interceptor does not even kick in for such a runtime
     * scenario. For a more advanced mode of interception, consider switching this to
     * {@link AdviceMode#ASPECTJ}.
     */
    AdviceMode mode() default AdviceMode.PROXY;

    /**
     * Indicate the ordering of the execution of the transaction advisor
     * when multiple advices are applied at a specific joinpoint.
     * <p>The default is {@link Ordered#LOWEST_PRECEDENCE}.
     */
    int order() default Ordered.LOWEST_PRECEDENCE;

}

AutoProxyRegistrar

向容器中注册InfrastructureAdvisorAutoProxyCreator,查找容器中特定类型的Advisor。

// 	AnnotationMetadata,代表的是AutoProxyRegistrar的导入类的元信息
// 既包含了类元信息,也包含了注解元信息
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    boolean candidateFound = false;
    // 获取@EnableTransactionManagement所在配置类上的注解元信息
    Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
    // 遍历注解
    for (String annType : annTypes) {
        // 可以理解为将注解中的属性转换成一个map
        AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
        if (candidate == null) {
            continue;
        }
        // 直接从map中获取对应的属性
        Object mode = candidate.get("mode");
        Object proxyTargetClass = candidate.get("proxyTargetClass");

        // mode,代理模型,一般都是SpringAOP
        // proxyTargetClass,是否使用cglib代理
        if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
            Boolean.class == proxyTargetClass.getClass()) {

            // 注解中存在这两个属性,并且属性类型符合要求,表示找到了合适的注解
            candidateFound = true;

            // 实际上会往容器中注册一个InfrastructureAdvisorAutoProxyCreator
            if (mode == AdviceMode.PROXY) {
                AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
                if ((Boolean) proxyTargetClass) {
                    AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
                    return;
                }
            }
        }
    }
    // ......
}

@EnableAspectJAutoProxy注解也向容器中注册了一个能实现自动代理的bd,那么当@EnableAspectJAutoProxy跟@EnableTransactionManagement同时使用,会根据优先级仅生效一个

private static BeanDefinition registerOrEscalateApcAsRequired(
    Class<?> cls, BeanDefinitionRegistry registry, @Nullable Object source) {
    if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) {
        BeanDefinition apcDefinition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME);
        if (!cls.getName().equals(apcDefinition.getBeanClassName())) {
            // 当前已经注册到容器中的Bean的优先级
            int currentPriority = findPriorityForClass(apcDefinition.getBeanClassName());
            // 当前准备注册到容器中的Bean的优先级
            int requiredPriority = findPriorityForClass(cls);
            // 谁的优先级大就注册谁,AnnotationAwareAspectJAutoProxyCreator是最大的
            // 所以AnnotationAwareAspectJAutoProxyCreator会覆盖别的Bean
            if (currentPriority < requiredPriority) {
                apcDefinition.setBeanClassName(cls.getName());
            }
        }
        return null;
    }
	// 注册bd
    RootBeanDefinition beanDefinition = new RootBeanDefinition(cls);
    beanDefinition.setSource(source);
    beanDefinition.getPropertyValues().add("order", Ordered.HIGHEST_PRECEDENCE);
    beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
    registry.registerBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME, beanDefinition);
    return beanDefinition;
}

ProxyTransactionManagementConfiguration

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
    /**
     * transactionAdvisor 就是 @EnableTransactionManagement 的增强器,而 @Role(BeanDefinition.ROLE_INFRASTRUCTURE) 也是有用的
     * 因为 InfrastructureAdvisorAutoProxyCreator 只会使用Role的值是 {@link BeanDefinition.ROLE_INFRASTRUCTURE} 的 Advisor 来判断后置处理的bean是否需要代理
     *        {@link BeanFactoryAdvisorRetrievalHelper#findAdvisorBeans()}
     * */
    @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
            TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
        /**
         * 实现 PointcutAdvisor 接口,所以是否要进行代理得看 {@link PointcutAdvisor#getPointcut()}
         * 而 BeanFactoryTransactionAttributeSourceAdvisor 的 Pointcut是这个 {@link TransactionAttributeSourcePointcut}
         * 而 TransactionAttributeSourcePointcut 类匹配和方法匹配是使用 transactionAttributeSource 来解析注解的
         *      - ClassFilter {@link TransactionAttributeSourcePointcut.TransactionAttributeSourceClassFilter}
         *          类不是java包下的 不是 Ordered类 就是匹配
         *
         *      - MethodMatcher {@link TransactionAttributeSourcePointcut#matches(Method, Class)}
         *          查找 方法->方法声明的类 有@Transactional 就是匹配
         * */
        BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();

        // 默认是注册的这个类型 AnnotationTransactionAttributeSource
        advisor.setTransactionAttributeSource(transactionAttributeSource);
        /**
         * Advice, 也就是具体的增强逻辑
         * */
        advisor.setAdvice(transactionInterceptor);
        if (this.enableTx != null) {
            /**
             * 设置排序值
             * */
            advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
        }
        return advisor;
    }

    @Bean
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public TransactionAttributeSource transactionAttributeSource() {
        /**
         * Advisor 和 Advice 都依赖了这个bean。
         * TransactionAttributeSource 该对象很简单,就是解析 @Transactional 注解,解析成 RuleBasedTransactionAttribute 对象
         * */
        return new AnnotationTransactionAttributeSource();
    }

    @Bean
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
        /**
         * 就是 BeanFactoryTransactionAttributeSourceAdvisor 的 advice
         * */
        TransactionInterceptor interceptor = new TransactionInterceptor();
        /**
         * 依赖了 transactionAttributeSource,这东西是用来拿到方法、类上的 @Transactional注解,解析成 RuleBasedTransactionAttribute 对象
         * */
        interceptor.setTransactionAttributeSource(transactionAttributeSource);
        if (this.txManager != null) {
            /**
             * 设置事务管理器,该属性值是父类依赖注入 TransactionManagementConfigurer 类型的bean设置的
             * */
            interceptor.setTransactionManager(this.txManager);
        }
        return interceptor;
    }

}



@Configuration
public abstract class AbstractTransactionManagementConfiguration implements ImportAware {

    /**
     * 就是 @EnableTransactionManagement 的元数据
     */
    @Nullable
    protected AnnotationAttributes enableTx;

    /**
     * Default transaction manager, as configured through a {@link TransactionManagementConfigurer}.
     */
    @Nullable
    protected TransactionManager txManager;


    @Override
    public void setImportMetadata(AnnotationMetadata importMetadata) {
        this.enableTx = AnnotationAttributes.fromMap(
                importMetadata.getAnnotationAttributes(EnableTransactionManagement.class.getName(), false));
        if (this.enableTx == null) {
            throw new IllegalArgumentException(
                    "@EnableTransactionManagement is not present on importing class " + importMetadata.getClassName());
        }
    }

    @Autowired(required = false)
    void setConfigurers(Collection<TransactionManagementConfigurer> configurers) {
        /**
         * 配置事务管理器。事务管理器是用于事务的开启、回滚、提交 就是通过这个接口统一调用的,
         * 其依赖 TransactionSynchronizationManager 管理事务的状态
         * */
        if (CollectionUtils.isEmpty(configurers)) {
            return;
        }
        if (configurers.size() > 1) {
            throw new IllegalStateException("Only one TransactionManagementConfigurer may exist");
        }
        TransactionManagementConfigurer configurer = configurers.iterator().next();
        this.txManager = configurer.annotationDrivenTransactionManager();
    }


    @Bean(name = TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public static TransactionalEventListenerFactory transactionalEventListenerFactory() {
        /**
         * {@link EventListenerMethodProcessor} 后置处理器会用到 EventListenerFactory,
         * 而 TransactionalEventListenerFactory 用于处理 @TransactionalEventListener 标注的方法,将方法构造成事件监听器,注册到事件广播器中
         * */
        return new TransactionalEventListenerFactory();
    }
}

BeanFactoryTransactionAttributeSourceAdvisor

切面,看切点匹配逻辑以及通知增强逻辑。


/**
 * 由 {@link TransactionAttributeSource} 驱动的顾问程序,用于包括
 * 事务性方法的事务建议 Bean。
 *
 * @author Juergen Hoeller
 * @since 2.5.5
 * @see #setAdviceBeanName
 * @see TransactionInterceptor
 * @see TransactionAttributeSourceAdvisor
 */
@SuppressWarnings("serial")
public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {

    @Nullable
    private TransactionAttributeSource transactionAttributeSource;

    private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
        @Override
        @Nullable
        protected TransactionAttributeSource getTransactionAttributeSource() {
            return transactionAttributeSource;
        }
    };


    /**
     * Set the transaction attribute source which is used to find transaction
     * attributes. This should usually be identical to the source reference
     * set on the transaction interceptor itself.
     * @see TransactionInterceptor#setTransactionAttributeSource
     */
    public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
        this.transactionAttributeSource = transactionAttributeSource;
    }

    /**
     * Set the {@link ClassFilter} to use for this pointcut.
     * Default is {@link ClassFilter#TRUE}.
     */
    public void setClassFilter(ClassFilter classFilter) {
        this.pointcut.setClassFilter(classFilter);
    }

    @Override
    public Pointcut getPointcut() {
        return this.pointcut;
    }

}

TransactionAttributeSourcePointcut

切点,类以及方法匹配。

abstract class TransactionAttributeSourcePointcut extends StaticMethodMatcherPointcut implements Serializable {

    protected TransactionAttributeSourcePointcut() {
        /**
         * TransactionAttributeSourceClassFilter 会执行抽象方法 `getTransactionAttributeSource` 然后执行 {@link TransactionAttributeSource#isCandidateClass(Class)}
         * 判断类是否匹配。
         *
         * 注:过滤规则很简单,只要类不是java包下的 不是 Ordered接口 就是匹配
         * */
        setClassFilter(new TransactionAttributeSourceClassFilter());
    }


    @Override
    public boolean matches(Method method, Class<?> targetClass) {
        /**
         * TransactionAttributeSourcePointcut 实现了 MethodMatcher,所以判断方法匹配的时候会执行当前方法。
         * 会执行抽象方法 `getTransactionAttributeSource` 然后执行 {@link TransactionAttributeSource#getTransactionAttribute(Method, Class)}
         *
         * {@link AbstractFallbackTransactionAttributeSource#getTransactionAttribute(Method, Class)}
         * 注:
         *  1. 过滤规则很简单,方法 -> 方法声明的类 先找到@Transactional就返回。也就是有注解就是匹配
         *  2. 如果方法不是public的,直接返回null 不解析上面的@Transactional注解,也就是不代理
         * */
        TransactionAttributeSource tas = getTransactionAttributeSource();
        return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
    }


    /**
     * 获取基础 TransactionAttributeSource(可能为 {@code null})。
     * 由子类实现。
     */
    @Nullable
    protected abstract TransactionAttributeSource getTransactionAttributeSource();


    /**
     * {@link ClassFilter} that delegates to {@link TransactionAttributeSource#isCandidateClass}
     * for filtering classes whose methods are not worth searching to begin with.
     */
    private class TransactionAttributeSourceClassFilter implements ClassFilter {

        @Override
        public boolean matches(Class<?> clazz) {
            if (TransactionalProxy.class.isAssignableFrom(clazz) ||
                    TransactionManager.class.isAssignableFrom(clazz) ||
                    PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) {
                return false;
            }
            TransactionAttributeSource tas = getTransactionAttributeSource();
            return (tas == null || tas.isCandidateClass(clazz));
        }
    }

}
public abstract class AbstractFallbackTransactionAttributeSource
        implements TransactionAttributeSource, EmbeddedValueResolverAware {

    @Nullable
    private transient StringValueResolver embeddedValueResolver;

    /**
     * Cache of TransactionAttributes, keyed by method on a specific target class.
     * <p>As this base class is not marked Serializable, the cache will be recreated
     * after serialization - provided that the concrete subclass is Serializable.
     */
    private final Map<Object, TransactionAttribute> attributeCache = new ConcurrentHashMap<>(1024);


    @Override
    public void setEmbeddedValueResolver(StringValueResolver resolver) {
        this.embeddedValueResolver = resolver;
    }


    

    /**
     * Determine a cache key for the given method and target class.
     * <p>Must not produce same key for overloaded methods.
     * Must produce same key for different instances of the same method.
     * @param method the method (never {@code null})
     * @param targetClass the target class (may be {@code null})
     * @return the cache key (never {@code null})
     */
    protected Object getCacheKey(Method method, @Nullable Class<?> targetClass) {
        return new MethodClassKey(method, targetClass);
    }


    /**
     * Subclasses need to implement this to return the transaction attribute for the
     * given class, if any.
     * @param clazz the class to retrieve the attribute for
     * @return all transaction attribute associated with this class, or {@code null} if none
     */
    @Nullable
    protected abstract TransactionAttribute findTransactionAttribute(Class<?> clazz);

    /**
     * Subclasses need to implement this to return the transaction attribute for the
     * given method, if any.
     * @param method the method to retrieve the attribute for
     * @return all transaction attribute associated with this method, or {@code null} if none
     */
    @Nullable
    protected abstract TransactionAttribute findTransactionAttribute(Method method);

    /**
     * Should only public methods be allowed to have transactional semantics?
     * <p>The default implementation returns {@code false}.
     */
    protected boolean allowPublicMethodsOnly() {
        return false;
    }

}

TransactionInterceptor

这个nb了,增强逻辑。

    @Override
    @Nullable
    public Object invoke(MethodInvocation invocation) throws Throwable {
        // 拿到被代理类。
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

        // 执行增强逻辑。
        return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
            @Override
            @Nullable
            public Object proceedWithInvocation() throws Throwable {
                return invocation.proceed();
            }

            @Override
            public Object getTarget() {
                return invocation.getThis();
            }

            @Override
            public Object[] getArguments() {
                return invocation.getArguments();
            }
        });
    }

TransactionAspectSupport:

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
                                          final InvocationCallback invocation) throws Throwable {

     // 用来解析 方法、类上是否有@Transactional
     TransactionAttributeSource tas = getTransactionAttributeSource();
     // 1.1 拿到@Transactional注解,解析后的属性值
     final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);

     /**
      * 1.2 推断出要用的事务管理器:@Transactional("tm1")
* 默认的({@link org.springframework.transaction.annotation.AbstractTransactionManagementConfiguration#setConfigurers)}) -> BeanFactory中找TransactionManager
      * */
     final TransactionManager tm = determineTransactionManager(txAttr);

     
     /**
      * 强转,tm 必须是 PlatformTransactionManager 类型的组
      * */
     PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
     // 就是一个method的标识
     final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

     /**
      * 没有@Transactional注解  或者 事务管理器不是CallbackPreferringPlatformTransactionManager类型
      * */
     if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
         /**
          * 1.3 如果需要就创建事务(主要是根据事务传播行为来判断的)
          * 就是使用DataSource创建Connection,然后设置为非自动提交 `Connection.setAutoCommit(false)`
          * */
         // Standard transaction demarcation with getTransaction and commit/rollback calls.
         TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

         Object retVal;
         try {
             // 放行方法
             // This is an around advice: Invoke the next interceptor in the chain.
             // This will normally result in a target object being invoked.
             retVal = invocation.proceedWithInvocation();
         } catch (Throwable ex) {
             /**
              * 1.4 出现异常的处理,看看是回滚还是提交事务。
              * 看看异常类型是不是要回滚的类型,是就回滚,否则就提交事务 {@link RuleBasedTransactionAttribute#rollbackOn(Throwable)}
              *
              * 而具体的回滚类型是根据 @Transactional(rollbackFor = RuntimeException.class, rollbackForClassName = "a",
              *             noRollbackFor = Throwable.class, noRollbackForClassName = "b") 的值解析的
              *
              *
              * 无论是回滚,还是提交事务 最终都会执行这个,恢复上一个事务的内容到ThreadLocal中 {@link AbstractPlatformTransactionManager#cleanupAfterCompletion(DefaultTransactionStatus)}
              * */
             // target invocation exception
             completeTransactionAfterThrowing(txInfo, ex);
             throw ex;
         } finally {
             /**
              * 1.5 清除当前事务信息。就是将当前txInfo之前的txInfo恢复到ThreadLocal中
              * {@link TransactionAspectSupport#transactionInfoHolder}
              * */
             cleanupTransactionInfo(txInfo);
         }

         if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
             // Set rollback-only in case of Vavr failure matching our rollback rules...
             TransactionStatus status = txInfo.getTransactionStatus();
             if (status != null && txAttr != null) {
                 retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
             }
         }

         /**
          * 1.6 提交事务。
          *  {@link TransactionAspectSupport#commitTransactionAfterReturning(TransactionInfo)}
          *
          * 最终会执行这个,恢复上一个事务的内容到ThreadLocal中 {@link AbstractPlatformTransactionManager#cleanupAfterCompletion(DefaultTransactionStatus)}
          * */
         commitTransactionAfterReturning(txInfo);
         return retVal;
     } else {
         Object result;
         final ThrowableHolder throwableHolder = new ThrowableHolder();

         // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
         try {
             result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
                 TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
                 try {
                     Object retVal = invocation.proceedWithInvocation();
                     if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                         // Set rollback-only in case of Vavr failure matching our rollback rules...
                         retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                     }
                     return retVal;
                 } catch (Throwable ex) {
                     if (txAttr.rollbackOn(ex)) {
                         // A RuntimeException: will lead to a rollback.
                         if (ex instanceof RuntimeException) {
                             throw (RuntimeException) ex;
                         } else {
                             throw new ThrowableHolderException(ex);
                         }
                     } else {
                         // A normal return value: will lead to a commit.
                         throwableHolder.throwable = ex;
                         return null;
                     }
                 } finally {
                     cleanupTransactionInfo(txInfo);
                 }
             });
         } catch (ThrowableHolderException ex) {
             throw ex.getCause();
         } catch (TransactionSystemException ex2) {
             if (throwableHolder.throwable != null) {
                 logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                 ex2.initApplicationException(throwableHolder.throwable);
             }
             throw ex2;
         } catch (Throwable ex2) {
             if (throwableHolder.throwable != null) {
                 logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
             }
             throw ex2;
         }

         // Check result state: It might indicate a Throwable to rethrow.
         if (throwableHolder.throwable != null) {
             throw throwableHolder.throwable;
         }
         return result;
     }
 }



 protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
                                                     @Nullable TransactionAttribute txAttr, String joinpointIdentification,
                                                     @Nullable TransactionStatus status) {

        TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
        if (txAttr != null) {
            // We need a transaction for this method...
            if (logger.isTraceEnabled()) {
                logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }
            // 设置事务状态
            // The transaction manager will flag an error if an incompatible tx already exists.
            txInfo.newTransactionStatus(status);
        } else {
            // The TransactionInfo.hasTransaction() method will return false. We created it only
            // to preserve the integrity of the ThreadLocal stack maintained in this class.
            if (logger.isTraceEnabled()) {
                logger.trace("No need to create transaction for [" + joinpointIdentification +
                        "]: This method is not transactional.");
            }
        }

        /**
         * 绑定到这个属性上 {@link TransactionAspectSupport#transactionInfoHolder}
         * 并且会记录原来ThreadLocal的值,当事务结束了要恢复回去 {@link TransactionAspectSupport#cleanupTransactionInfo(TransactionInfo)}
         * */
        // We always bind the TransactionInfo to the thread, even if we didn't create
        // a new transaction here. This guarantees that the TransactionInfo stack
        // will be managed correctly even if no transaction was created by this aspect.
        txInfo.bindToThread();
        return txInfo;
    }



    /**
     * 处理Throwable,完成事务。
     * 我们可能会提交或回滚,具体取决于配置。
     * @param txInfo information about the current transaction
     * @param ex throwable encountered
     */
    protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
        if (txInfo != null && txInfo.getTransactionStatus() != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                        "] after exception: " + ex);
            }
            /**
             * 异常是需要回滚 {@link RuleBasedTransactionAttribute#rollbackOn(Throwable)}
             *
             * - 使用 @Transactional 配置的rollback进行匹配,匹配就返回结果
             * - 没有匹配rollback,就判断异常是不是  RuntimeException 、Error 是就回滚
             * */
            if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
                try {
                    /**
                     * 回滚
                     * {@link AbstractPlatformTransactionManager#rollback(TransactionStatus)}
                     * */
                    txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                } catch (TransactionSystemException ex2) {
                    logger.error("Application exception overridden by rollback exception", ex);
                    ex2.initApplicationException(ex);
                    throw ex2;
                } catch (RuntimeException | Error ex2) {
                    logger.error("Application exception overridden by rollback exception", ex);
                    throw ex2;
                }
            } else {
                /**
                 * 提交事务
                 * {@link AbstractPlatformTransactionManager#commit(TransactionStatus)}
                 * */
                // We don't roll back on this exception.
                // Will still roll back if TransactionStatus.isRollbackOnly() is true.
                try {
                    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
                } catch (TransactionSystemException ex2) {
                    logger.error("Application exception overridden by commit exception", ex);
                    ex2.initApplicationException(ex);
                    throw ex2;
                } catch (RuntimeException | Error ex2) {
                    logger.error("Application exception overridden by commit exception", ex);
                    throw ex2;
                }
            }
        }
    }


  protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
        if (txInfo != null && txInfo.getTransactionStatus() != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }
            /**
             * {@link AbstractPlatformTransactionManager#commit(TransactionStatus)}
             * */
            txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
        }
    }

1.1 解析Transactional注解属性

    AbstractFallbackTransactionAttributeSource@Override
    @Nullable
    public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
        if (method.getDeclaringClass() == Object.class) {
            return null;
        }


      	// 从缓存取 避免重复解析
        Object cacheKey = getCacheKey(method, targetClass);
        TransactionAttribute cached = this.attributeCache.get(cacheKey);
        if (cached != null) {
            if (cached == NULL_TRANSACTION_ATTRIBUTE) {
                return null;
            } else {
                return cached;
            }
        } else {
            // 计算事务属性 方法必须是public的 依次从方法、类上获取。
            TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
           
            if (txAttr == null) {
                this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
            } else {
                String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
                if (txAttr instanceof DefaultTransactionAttribute) {
                    DefaultTransactionAttribute dta = (DefaultTransactionAttribute) txAttr;
                    dta.setDescriptor(methodIdentification);
                    dta.resolveAttributeStrings(this.embeddedValueResolver);
                }
                if (logger.isTraceEnabled()) {
                    logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
                }
                this.attributeCache.put(cacheKey, txAttr);
            }
            return txAttr;
        }
    }



	// 计算事务属性 方法必须是public的 依次从方法、类上获取。
   @Nullable
    protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
        // 方法不是public 就直接返回null, 也就是说@Transactional 得标注在public方法上才有用
        if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
            return null;
        }

        // 就是targetClass是代理对象的情况,返回的是其被代理类的方法。
        Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);

        // 1.1.1 方法有@Transactional就解析注解值,然后返回
        TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
        if (txAttr != null) {
            return txAttr;
        }

        // 1.1.2 尝试从方法声明类上找,有@Transactional就解析注解值,然后返回
        txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
        if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
            return txAttr;
        }

        /**
         * 不相等,说明method是代理对象的方法, 上面解析被代理对象的方法找不到注解信息,尝试从 代理对象的方法、代理对象找 @Transactional,
         * 找到就返回
         * */
        if (specificMethod != method) {
            // Fallback is to look at the original method.
            txAttr = findTransactionAttribute(method);
            if (txAttr != null) {
                return txAttr;
            }
            // Last fallback is the class of the original method.
            txAttr = findTransactionAttribute(method.getDeclaringClass());
            if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
                return txAttr;
            }
        }

        return null;
    }


1.1.1 解析方法上事务注解属性
AnnotationTransactionAttributeSource:

@Override
@Nullable
protected TransactionAttribute findTransactionAttribute(Method method) {
    /**
     * 遍历 annotationParsers,执行 {@link TransactionAnnotationParser#parseTransactionAnnotation(AnnotatedElement)} 返回注解对应的属性信息,
     * 解析的对象是 method,说白了就是看看 method 是否有注解
     * 没得就返回null
     *
     * 注:默认有这个 SpringTransactionAnnotationParser,就是是否有 @Transactional
     * */
    return determineTransactionAttribute(method);
}
1.1.2 解析类上事务注解
AnnotationTransactionAttributeSource:

@Override
@Nullable
protected TransactionAttribute findTransactionAttribute(Class<?> clazz) {
    /**
     * 遍历 annotationParsers,执行 {@link TransactionAnnotationParser#parseTransactionAnnotation(AnnotatedElement)} 返回注解对应的属性信息,
     * 解析的对象是 clazz ,说白了就是看看 clazz是否有注解
     * 没得就返回null
     *
     * 注:默认有这个 SpringTransactionAnnotationParser,就是是否有 @Transactional
     * */
    return determineTransactionAttribute(clazz);
}

遍历注解解析器解析注解属性

@Nullable
protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
    for (TransactionAnnotationParser parser : this.annotationParsers) {
        TransactionAttribute attr = parser.parseTransactionAnnotation(element);
        if (attr != null) {
            return attr;
        }
    }
    return null;
}

解析出事务属性

public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
    // 从 AnnotatedElement 找到  @Transactional 注解信息
    AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
            element, Transactional.class, false, false);
    if (attributes != null) {
        // 就是将注解的值 设置到 TransactionAttribute 对象中
        return parseTransactionAnnotation(attributes);
    } else {
        return null;
    }
}


// 解析注解属性 事务回滚规则用的是RollbackRuleAttribute
 protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
        RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();

        Propagation propagation = attributes.getEnum("propagation");
        rbta.setPropagationBehavior(propagation.value());
        Isolation isolation = attributes.getEnum("isolation");
        rbta.setIsolationLevel(isolation.value());

        rbta.setTimeout(attributes.getNumber("timeout").intValue());
        String timeoutString = attributes.getString("timeoutString");
        Assert.isTrue(!StringUtils.hasText(timeoutString) || rbta.getTimeout() < 0,
                "Specify 'timeout' or 'timeoutString', not both");
        rbta.setTimeoutString(timeoutString);

        rbta.setReadOnly(attributes.getBoolean("readOnly"));
        // 这个属性是指定 事务管理的
        rbta.setQualifier(attributes.getString("value"));
        rbta.setLabels(Arrays.asList(attributes.getStringArray("label")));

        /**
         * 回滚规则,两种类型:
         *      - RollbackRuleAttribute 表示要回滚
         *      - NoRollbackRuleAttribute 表示不回滚
         *
         * 在这里会用到 {@link RuleBasedTransactionAttribute#rollbackOn(Throwable)}
         * */
        List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
        for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
            rollbackRules.add(new RollbackRuleAttribute(rbRule));
        }
        for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
            rollbackRules.add(new RollbackRuleAttribute(rbRule));
        }
        for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
            rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
        }
        for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
            rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
        }
        rbta.setRollbackRules(rollbackRules);

        return rbta;
    }

1.2推断事务管理器

TransactionAspectSupport:

@Nullable
protected TransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
    // 如果未设置 tx 属性,请勿尝试查找 tx 管理器
    if (txAttr == null || this.beanFactory == null) {
        return getTransactionManager();
    }
  
    /**
     * 通过限定符查找 @Transactional("tm1") {@link SpringTransactionAnnotationParser#parseTransactionAnnotation(AnnotationAttributes)}
     * */
    String qualifier = txAttr.getQualifier();
    if (StringUtils.hasText(qualifier)) {
        /**
         * 注解 value值不为空,就根据value从BeanFactory拿到事务管理器。找不到就报错
         * */
        return determineQualifiedTransactionManager(this.beanFactory, qualifier);
    } else if (StringUtils.hasText(this.transactionManagerBeanName)) {
        // 设置了 transactionManagerBeanName,就使用这个name找
        return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
    } else {
        TransactionManager defaultTransactionManager = getTransactionManager();
        if (defaultTransactionManager == null) {
            defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
            if (defaultTransactionManager == null) {
                /**
                 * 通过 TransactionManager 类型从 BeanFactory拿到事务管理器,
                 * 类型匹配到多个bean,会通过 @Primary、@Priority确定唯一一个,确定不能就报错了
                 * */
                defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class);
                this.transactionManagerCache.putIfAbsent(
                        DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
            }
        }
        return defaultTransactionManager;
    }
}



   private TransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) {
        TransactionManager txManager = this.transactionManagerCache.get(qualifier);
        if (txManager == null) {
            txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType(
                    beanFactory, TransactionManager.class, qualifier);
            this.transactionManagerCache.putIfAbsent(qualifier, txManager);
        }
        return txManager;
    }

1.3 createTransactionIfNecessary

AbstractPlatformTransactionManager:

/**
 * 如有必要,根据给定的 TransactionAttribute 创建事务。
 */
@SuppressWarnings("serial")
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
                                                       @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

    /**
     * 使用 @Transactional,默认的parser是无法设置name值的,所以都会装饰一下
     * {@link SpringTransactionAnnotationParser#parseTransactionAnnotation(AnnotationAttributes)}
     * */
    // If no name specified, apply method identification as transaction name.
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            /**
             * 1.3.1 获取事务状态。
             *
             * 使用 txAttr 通过事务管理器 获取事务状态
             *
             * {@link AbstractPlatformTransactionManager#getTransaction(TransactionDefinition)}
             *      - 会根据事务传播行为来决定,是新创建事务、暂停事务、savepoint 等操作
             *      - 是新建事务,就会使用DataSource获取Connection,然后将Connection设置为非自动提交
             *
             * DefaultTransactionStatus 事务状态,由这三个东西组成:
             *      - RuleBasedTransactionAttribute(事务属性):就是描述@Transactional注解的对象
             *      - DataSourceTransactionObject(事务对象):记录事务的ConnectionHolder
             *      - SuspendedResourcesHolder(暂停资源持有者):记录上一个事务的ConnectionHolder和TransactionSynchronization
             *      注:事务说白了就是一个数据库连接
             * */
            status = tm.getTransaction(txAttr);
        } else {
            if (logger.isDebugEnabled()) {
                logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                        "] because no transaction manager has been configured");
            }
        }
    }
  
    /**
     * 1.3.2 将事务管理器、事务属性、方法标识和事务状态 装饰成 TransactionInfo 返回,
     * */
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
1.3.1 获取事务状态
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
        throws TransactionException {

    /**
     * definition 就是描述@Transactional注解的对象,
     * definition 是空就给一个默认值
     * */
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

    /**
     * 1.3.1.1 创建事务对象。从 resources 中拿到资源设置给事务对象
     * 这个资源并不一定是开启事务时创建的连接,比如空事务情况下,使用 {@link org.springframework.jdbc.datasource.DataSourceUtils#doGetConnection(DataSource)} 获取连接
     * 这个连接也会存到 resources 中,目的是在事务方法内能重复使用
     *
     * 比如:{@link org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction()}
     *      会创建这个对象 DataSourceTransactionObject
     *      1. 从ThreadLocal中拿到 Map<DataSource,ConnectionHolder> {@link TransactionSynchronizationManager#resources}
     *      2. DataSource 作为key,从 resources 拿到 ConnectionHolder 设置给事务对象
     *      Tips:创建的是这个对象DataSourceTransactionObject,事务对象说白了就是 一个数据库连接的包装对象
     * */
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();

    /**
     * 1.3.1.2 判断是否存在事务。就是出现了事务方法嵌套调用的情况
     *
     * 比如这个事务管理器:
     *      {@link org.springframework.jdbc.datasource.DataSourceTransactionManager#isExistingTransaction(Object)}
     *      `txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()`
     *      就是事务对象有连接 且 连接是活动的事务 才是true
     *
     * 空事务不算存在事务,因为其 isTransactionActive 是false
     * */
    if (isExistingTransaction(transaction)) {
        /**
         * 1.3.1.3 存在事务的处理,就是根据传播行为看看是:暂停当前事务,新建事务,设置保存点
         * */
        return handleExistingTransaction(def, transaction, debugEnabled);
    }
  
    // 能往下执行,说明还没有事务
  
    /**
     * 校验 @Transactional() 设置的值 是否合法
     * */
    // Check definition settings for new transaction.
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }
  
    /**
     * 校验事务隔离级别参数,不存在事务就抛出异常
     * */
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        /**
         * 1.3.1.4 暂停事务。入参就是要暂停的事务。其实就是从ThreadLocal中取出这两个属性内容,记录到 SuspendedResourcesHolder 中
         *      {@link TransactionSynchronizationManager#synchronizations}
         *      {@link TransactionSynchronizationManager#resources}
         * */
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
        }
      
        try {
            /**
             * 1.3.1.5 开启事务(校验事务隔离级别参数,创建新事务):
             *  1. 事务对象,没有连接或者连接isSynchronizedWithTransaction 就通过数据源创建连接然后设置给事务对象
             *  2. 将 @Transactional 的信息设置到连接和事务对象中(自动提交、是否只读、隔离级别、超时时间)
             *  3. 是新创建的连接,就使用数据源作为key,将连接绑定是事务资源中 {@link TransactionSynchronizationManager#resources}
             *  4. 设置 ConnectionHolder 属性 isSynchronizedWithTransaction 为true
             *  5. 记录信息到 TransactionSynchronizationManager
             * */
            return startTransaction(def, transaction, debugEnabled, suspendedResources);
        } catch (RuntimeException | Error ex) {
            // 1.3.1.6 出现异常,就恢复之前的事务状态到ThreadLocal中
            resume(null, suspendedResources);
            throw ex;
        }
    } else {
        /**
         * 创建空事务,其实就是没有事务
         * */
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                    "isolation level will effectively be ignored: " + def);
        }
      
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
      	// 1.3.1.7 创建事务状态
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}
1.3.1.1 创建事务对象
protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    /**
     * 使用 DataSource 作为key从 ThreadLocal中拿 ConnectionHolder,
     * 没有就是 null,第一次开启事务 这个值就是null咯
     * */
    ConnectionHolder conHolder =
            (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}
1.3.1.2 判断是否存在事务
DataSourceTransactionManager:

protected boolean isExistingTransaction(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

    /**
     * 在开始事务时会将 isTransactionActive 设置为true
     * 只有在事务完成时,才会将事务的持有的 getConnectionHolder().isTransactionActive() 设置为false
     * 所以可以通过这个判断 是否存在事务
     *
     * 空事务不算存在事务,因为空事务不会开启事务(不会给事务对象创建数据库连接),也就不会将 isTransactionActive 设置为true
     *
     * 在空事务下,使用{@link DataSourceUtils#doGetConnection(DataSource)}获取连接,也不会设置 isTransactionActive 为true,
     * 只是存到了 {@link TransactionSynchronizationManager#resources}
     * */
    return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
1.3.1.3 存在事务的处理
/**
 * 存在的事务创建事务状态(主要是根据事务传播行为来判断)
 */
private TransactionStatus handleExistingTransaction(
        TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {
    /**
     * 传播行为:存在事务就报错
     * */
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }

    /**
     * 传播行为:存在事务就挂起
     * */
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction");
        }
        /**
         * 1.3.1.3.1 暂停事务。返回 SuspendedResourcesHolder,这里面记录了被移除的资源,和所暂停事务的状态信息
         *
         * 就是从 {@link TransactionSynchronizationManager#resources} 移除资源,移除的资源会存到 SuspendedResourcesHolder 中
         * */
        Object suspendedResources = suspend(transaction);

        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        // 1.3.1.3.2 处理事务状态
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }

    /**
     * 传播行为:创建新的事物,已经存在事物就挂起
     * */
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction, creating new transaction with name [" +
                    definition.getName() + "]");
        }

        /**
         * 1.3.1.3.1 暂停事务,返回的是暂停事务的资源
         * */
        SuspendedResourcesHolder suspendedResources = suspend(transaction);

        try {
            /**
             * 1.3.1.3.3 开启新事物,会记录暂停事务的信息
             * */
            return startTransaction(definition, transaction, debugEnabled, suspendedResources);
        } catch (RuntimeException | Error beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }

    /**
     * 传播行为:如果当前事务存在就创建嵌套事务,否则就像 PROPAGATION_REQUIRED 一样处理
     *
     * 通过这里能体现,没有事务的时候就和PROPAGATION_REQUIRED一样 创建新的事务 {@link AbstractPlatformTransactionManager#getTransaction(TransactionDefinition) }
     * */
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        /**
         * 不支持嵌套事务 就报错,{@link org.springframework.jdbc.datasource.DataSourceTransactionManager#DataSourceTransactionManager()} 是支持的
         * */
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                            "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (debugEnabled) {
            logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
        }

        /**
         * 使用保存点来实现 嵌套事务
         * DataSourceTransactionManager 就是true
         * */
        if (useSavepointForNestedTransaction()) {
            // Create savepoint within existing Spring-managed transaction,
            // through the SavepointManager API implemented by TransactionStatus.
            // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            // 执行sql创建保存点
            status.createAndHoldSavepoint();
            return status;
        } else {
          	// 1.3.1.3.3 开启事务
            return startTransaction(definition, transaction, debugEnabled, null);
        }
    }

    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    if (debugEnabled) {
        logger.debug("Participating in existing transaction");
    }
    if (isValidateExistingTransaction()) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition
                        + "] specifies isolation level which is incompatible with existing transaction: "
                        +
                        (currentIsolationLevel != null ?
                                isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                "(unknown)"));
            }
        }
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition
                        + "] is not marked as read-only but existing transaction is");
            }
        }
    }

    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  	// 1.3.1.3.2 处理事务状态
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
  • 暂停事务
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    /**
     * 是活动的事务。
     * 可以这里理解,只要Java虚拟机栈中存在@Transactional的方法,这个判断就是true
     *      1. 暂停当前事务
     *      2. 完成当前事务(rollback或者commit)
     * */
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        /**
         * 1️⃣ 暂停事务的行为同步。暂停了 TransactionSynchronizationManager.isSynchronizationActive() 就是 false了
         *
         * 就是拿到 {@link TransactionSynchronizationManager#synchronizations},遍历执行 {@link TransactionSynchronization#suspend()}
         * */
        List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
        try {
            Object suspendedResources = null;
            // 存在事务对象
            if (transaction != null) {
                /**
                 * 2️⃣ 暂停事务资源同步。
                 * {@link org.springframework.jdbc.datasource.DataSourceTransactionManager#doSuspend(Object)}
                 *      1. 取消事务对象绑定的连接信息 txObject.setConnectionHolder(null);
                 *      2. 从事务资源中移除事务对象对应的资源(就是移除DataSource作为key的资源) {@link TransactionSynchronizationManager#unbindResource(Object)}
                 *
                 *      Tips:返回值,就是事务资源中移除的资源
                 * */
                suspendedResources = doSuspend(transaction);
            }
          
            // 拿到原来的值
            String name = TransactionSynchronizationManager.getCurrentTransactionName();
            TransactionSynchronizationManager.setCurrentTransactionName(null);
            boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
            Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
            boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
            TransactionSynchronizationManager.setActualTransactionActive(false);
          
            /**
             * 装饰一下,就是记录现在得值,后面恢复事务需要重新设置到 TransactionSynchronizationManager
             * */
            return new SuspendedResourcesHolder(
                    suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
        } catch (RuntimeException | Error ex) {
            // doSuspend failed - original transaction is still active...
            doResumeSynchronization(suspendedSynchronizations);
            throw ex;
        }
    } else if (transaction != null) {
        /**
         *  2️⃣ 暂停事务资源同步。
         * */
        // Transaction active but no synchronization active.
        Object suspendedResources = doSuspend(transaction);
        return new SuspendedResourcesHolder(suspendedResources);
    } else {
        // Neither transaction nor synchronization active.
        return null;
    }
}

1️⃣ 暂停行为同步

AbstractPlatformTransactionManager:

private List<TransactionSynchronization> doSuspendSynchronization() {
    // 拿到事务同步资源
    List<TransactionSynchronization> suspendedSynchronizations =
            TransactionSynchronizationManager.getSynchronizations();
    // 遍历,然后挂起
    for (TransactionSynchronization synchronization : suspendedSynchronizations) {
        /**
         * 对于数据库连接:
         *      {@link org.springframework.jdbc.datasource.DataSourceUtils.ConnectionSynchronization#suspend()}
         *       如果该Connection存在事务资源中,就取消事务资源对该Connection的引用;
         *       否则就关闭连接
         * */
        synchronization.suspend();
    }
    // 清空线程事务同步资源
    TransactionSynchronizationManager.clearSynchronization();
    // 返回 事务同步资源
    return suspendedSynchronizations;
}

2️⃣ 暂停资源同步

DataSourceTransactionManager: 
protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    txObject.setConnectionHolder(null);
    return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
  • 处理事务状态
  protected final DefaultTransactionStatus prepareTransactionStatus(
          TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
          boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

			// 事务状态
      DefaultTransactionStatus status = newTransactionStatus(
              definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);

			// 初始化行为同步
      prepareSynchronization(status, definition);
      return status;
  }



// 创建事务状态
protected DefaultTransactionStatus newTransactionStatus(
        TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
        boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

    /**
     * newSynchronization 且 不是同步激活状态  就是 真的新同步
     *
     * 同步激活状态:
     *      - true:简单来说就是Java虚拟机栈中存在@Transactional的方法
     *      - false:暂停当前事务、完成当前事务(rollback或者commit)
     *
     * 结论:当前线程在 TransactionSynchronizationManager 没有记录信息, actualNewSynchronization 就是true
     * */
    boolean actualNewSynchronization = newSynchronization &&
            !TransactionSynchronizationManager.isSynchronizationActive();
    return new DefaultTransactionStatus(
            transaction, newTransaction, actualNewSynchronization,
            definition.isReadOnly(), debug, suspendedResources);
}

    /**
     * 适当地初始化事务同步。
     * Initialize transaction synchronization as appropriate.
     */
    protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
        /**
         * 是新的同步,才记录这些信息(注:空事务也算的)
         * */
        if (status.isNewSynchronization()) {
            /**
             * 真的活动的事务。就是得有事务,且事务是新的才是true
             * */
            TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
            // 记录事务隔离级别
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                    definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                            definition.getIsolationLevel() : null);
            // 是否只读
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
            // 事务的name
            TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
            // 同步属性初始化,会根据这个属性是否为null判断 是不是 isNewSynchronization
            TransactionSynchronizationManager.initSynchronization();
        }
    }
  • 开启事务

    private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
                                               boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
        /**
         * 不是从不同步。这个值是看你用的啥事务管理,是事务管理器的属性
         * */
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        // new DefaultTransactionStatus 对象
        DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
        /**
         * {@link org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin(Object, TransactionDefinition)}
         *      1. 事务对象没有连接或者连接isSynchronizedWithTransaction 就通过数据源创建连接然后设置给事务对象
         *      2. 将 @Transactional 的信息设置到连接和事务对象中(设置的内容:自动提交、是否只读、隔离级别、超时时间)
         *      3. 是新创建的连接,就使用数据源作为key,将连接绑定是事务资源中 {@link TransactionSynchronizationManager#resources}
         *      4. 设置 ConnectionHolder 属性 isSynchronizedWithTransaction 为true
         * */
        doBegin(transaction, definition);
        /**
         * status.isNewSynchronization() 就设置 TransactionSynchronizationManager:
         *  - 记录事务隔离级别
         *  - 是否只读
         *  - 事务的name
         *  - {@link TransactionSynchronizationManager#synchronizations} 属性初始化,会根据这个属性是否为null判断 是不是 isNewSynchronization
         * */
        prepareSynchronization(status, definition);
        return status;
    }
    

doBegin

/**
 * 1. 事务对象,没有连接或者连接isSynchronizedWithTransaction 就通过数据源创建连接然后设置给事务对象<br/>
 * 2. 将 @Transactional 的信息设置到连接和事务对象中(自动提交、是否只读、隔离级别、超时时间) <br/>
 * 3. 是新创建的连接,就使用数据源作为key,将连接绑定是事务资源中 {@link TransactionSynchronizationManager#resources} <br/>
 * 4. 设置 ConnectionHolder 属性 isSynchronizedWithTransaction 为true
 */
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        /**
         * 没有连接 或者 连接是事务同步 那就应该创建一个新的连接
         *
         * 可以这么理解,当前方法叫 doBegin ,也就是要给事务对象绑定连接。连接没有那就创建一个在绑定合情合理,
         * 而连接有了,但是连接是事务同步 说明上一个事务还未完成,你还要绑定 那也给你创建一个新的连接,这种属于非法操作了,如果你老老实实用Spring 别搞啥自定义,是不会出现这种情况的
         *
         * 因为 doBegin 是在 startTransaction 的时候会执行,而 startTransaction 之前会先暂停当前线程的事物资源,在开启事物,
         * 而暂停就是
         *      1. 移除事物对象的引用 `txObject.setConnectionHolder(null);`
         *      2. 从 {@link TransactionSynchronizationManager#resources} 移除 ConnectionHolder 存起来。这个ConnectionHolder 其实就是上面置空的属性
         *          Tips: 暂停事务的代码 {@link DataSourceTransactionManager#doSuspend(Object)}
         *      记住代码是死的,Spring的事物代码都这么写了,所以基本不可能出现 isSynchronizedWithTransaction 的情况,除非开发人员玩花活
         *
         * 注:在一个Spring事务(单一事务、嵌套事务)
         *      事务暂停:`txObject.setConnectionHolder(null)`
         *      事务完成:` txObject.getConnectionHolder().setSynchronizedWithTransaction(false);`
         * */
        if (!txObject.hasConnectionHolder() ||
                txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            /**
             * 拿到连接 {@link DataSource#getConnection()}
             *
             * 这里就是动态数据源的原理了 {@link AbstractRoutingDataSource#getConnection()}
             * */
            Connection newCon = obtainDataSource().getConnection();
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }
            // 装饰 Connection 并设置给事务对象
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        /**
         * 表示 这个事务对象的连接 是事务同步
         *
         * 在事物完成时 会将该属性设置为 false
         *  {@link DataSourceTransactionManager#doCleanupAfterCompletion(Object)}
         *  {@link ConnectionHolder#clear()}
         * */
        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        con = txObject.getConnectionHolder().getConnection();

        /**
         * 就是根据 @Transactional 注解值,给Connection设置 只读属性、隔离级别属性。
         * 返回的是 Connection 之前的隔离饥级别信息
         * */
        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        // 隔离级别
        txObject.setPreviousIsolationLevel(previousIsolationLevel);
        // 是否只读
        txObject.setReadOnly(definition.isReadOnly());

        // 如果连接是自动提交,就设置为非自动提交
        // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
        // so we don't want to do it unnecessarily (for example if we've explicitly
        // configured the connection pool to set it already).
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            if (logger.isDebugEnabled()) {
                logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }
            // 设置为非自动提交
            con.setAutoCommit(false);
        }

        /**
         * 就比如 `@Transactional(readOnly = true)` 就设置事务为只读的
         * 如果是只读的就执行sql设置为只读事务  `SET TRANSACTION READ ONLY`
         * */
        prepareTransactionalConnection(con, definition);
        /**
         * 活动的事务。
         * 在事务完成时才会将该属性设置为false,暂停事务并不会
         * */
        txObject.getConnectionHolder().setTransactionActive(true);

        /**
         * `@Transactional(timeout = -1)` 不是-1才拿注解值,没有就返回默认值 -1
         * */
        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            // 设置超时时间
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        /**
         * 是新创建的connect的,就绑定到ThreadLocal中 {@link TransactionSynchronizationManager#resources}
         * 缓存的Map<DataSource,ConnectionHolder>。使用DataSource作为key,是因为一个线程可能有来回切换数据源的情况(动态数据源)
         * */
        // Bind the connection holder to the thread.
        if (txObject.isNewConnectionHolder()) {
            /**
             * 绑定到事务资源中 {@link TransactionSynchronizationManager#resources}
             * */
            TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
        }
    } catch (Throwable ex) {
        if (txObject.isNewConnectionHolder()) {
            /**
             * 从事务资源 {@link TransactionSynchronizationManager#resources} 中移除该连接,或者是关闭连接
             *
             * */
            DataSourceUtils.releaseConnection(con, obtainDataSource());
            // 清除 txObject 的连接信息
            txObject.setConnectionHolder(null, false);
        }
        throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
}
1.3.1.4 暂停事务
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    /**
     * 是活动的事务。
     * 可以这里理解,只要Java虚拟机栈中存在@Transactional的方法,这个判断就是true
     *      1. 暂停当前事务
     *      2. 完成当前事务(rollback或者commit)
     * */
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        /**
         * 1️⃣ 暂停事务的行为同步。暂停了 TransactionSynchronizationManager.isSynchronizationActive() 就是 false了
         *
         * 就是拿到 {@link TransactionSynchronizationManager#synchronizations},遍历执行 {@link TransactionSynchronization#suspend()}
         * */
        List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
        try {
            Object suspendedResources = null;
            // 存在事务对象
            if (transaction != null) {
                /**
                 * 2️⃣ 暂停事务资源同步。
                 * {@link org.springframework.jdbc.datasource.DataSourceTransactionManager#doSuspend(Object)}
                 *      1. 取消事务对象绑定的连接信息 txObject.setConnectionHolder(null);
                 *      2. 从事务资源中移除事务对象对应的资源(就是移除DataSource作为key的资源) {@link TransactionSynchronizationManager#unbindResource(Object)}
                 *
                 *      Tips:返回值,就是事务资源中移除的资源
                 * */
                suspendedResources = doSuspend(transaction);
            }
          
            // 拿到原来的值
            String name = TransactionSynchronizationManager.getCurrentTransactionName();
            TransactionSynchronizationManager.setCurrentTransactionName(null);
            boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
            Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
            boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
            TransactionSynchronizationManager.setActualTransactionActive(false);
          
            /**
             * 装饰一下,就是记录现在得值,后面恢复事务需要重新设置到 TransactionSynchronizationManager
             * */
            return new SuspendedResourcesHolder(
                    suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
        } catch (RuntimeException | Error ex) {
            // doSuspend failed - original transaction is still active...
            doResumeSynchronization(suspendedSynchronizations);
            throw ex;
        }
    } else if (transaction != null) {
        /**
         *  2️⃣ 暂停事务资源同步。
         * */
        // Transaction active but no synchronization active.
        Object suspendedResources = doSuspend(transaction);
        return new SuspendedResourcesHolder(suspendedResources);
    } else {
        // Neither transaction nor synchronization active.
        return null;
    }
}

1️⃣ 暂停行为同步

AbstractPlatformTransactionManager:

private List<TransactionSynchronization> doSuspendSynchronization() {
    // 拿到事务同步资源
    List<TransactionSynchronization> suspendedSynchronizations =
            TransactionSynchronizationManager.getSynchronizations();
    // 遍历,然后挂起
    for (TransactionSynchronization synchronization : suspendedSynchronizations) {
        /**
         * 对于数据库连接:
         *      {@link org.springframework.jdbc.datasource.DataSourceUtils.ConnectionSynchronization#suspend()}
         *       如果该Connection存在事务资源中,就取消事务资源对该Connection的引用;
         *       否则就关闭连接
         * */
        synchronization.suspend();
    }
    // 清空线程事务同步资源
    TransactionSynchronizationManager.clearSynchronization();
    // 返回 事务同步资源
    return suspendedSynchronizations;
}

2️⃣ 暂停资源同步

DataSourceTransactionManager: 
protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    txObject.setConnectionHolder(null);
    return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
1.3.1.5 开启事务
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
                                           boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
    /**
     * 不是从不同步。这个值是看你用的啥事务管理,是事务管理器的属性
     * */
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    // new DefaultTransactionStatus 对象
    DefaultTransactionStatus status = newTransactionStatus(
            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    /**
     * {@link org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin(Object, TransactionDefinition)}
     *      1. 事务对象没有连接或者连接isSynchronizedWithTransaction 就通过数据源创建连接然后设置给事务对象
     *      2. 将 @Transactional 的信息设置到连接和事务对象中(设置的内容:自动提交、是否只读、隔离级别、超时时间)
     *      3. 是新创建的连接,就使用数据源作为key,将连接绑定是事务资源中 {@link TransactionSynchronizationManager#resources}
     *      4. 设置 ConnectionHolder 属性 isSynchronizedWithTransaction 为true
     * */
    doBegin(transaction, definition);
    /**
     * status.isNewSynchronization() 就设置 TransactionSynchronizationManager:
     *  - 记录事务隔离级别
     *  - 是否只读
     *  - 事务的name
     *  - {@link TransactionSynchronizationManager#synchronizations} 属性初始化,会根据这个属性是否为null判断 是不是 isNewSynchronization
     * */
    prepareSynchronization(status, definition);
    return status;
}

doBegin

/**
 * 1. 事务对象,没有连接或者连接isSynchronizedWithTransaction 就通过数据源创建连接然后设置给事务对象<br/>
 * 2. 将 @Transactional 的信息设置到连接和事务对象中(自动提交、是否只读、隔离级别、超时时间) <br/>
 * 3. 是新创建的连接,就使用数据源作为key,将连接绑定是事务资源中 {@link TransactionSynchronizationManager#resources} <br/>
 * 4. 设置 ConnectionHolder 属性 isSynchronizedWithTransaction 为true
 */
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        /**
         * 没有连接 或者 连接是事务同步 那就应该创建一个新的连接
         *
         * 可以这么理解,当前方法叫 doBegin ,也就是要给事务对象绑定连接。连接没有那就创建一个在绑定合情合理,
         * 而连接有了,但是连接是事务同步 说明上一个事务还未完成,你还要绑定 那也给你创建一个新的连接,这种属于非法操作了,如果你老老实实用Spring 别搞啥自定义,是不会出现这种情况的
         *
         * 因为 doBegin 是在 startTransaction 的时候会执行,而 startTransaction 之前会先暂停当前线程的事物资源,在开启事物,
         * 而暂停就是
         *      1. 移除事物对象的引用 `txObject.setConnectionHolder(null);`
         *      2. 从 {@link TransactionSynchronizationManager#resources} 移除 ConnectionHolder 存起来。这个ConnectionHolder 其实就是上面置空的属性
         *          Tips: 暂停事务的代码 {@link DataSourceTransactionManager#doSuspend(Object)}
         *      记住代码是死的,Spring的事物代码都这么写了,所以基本不可能出现 isSynchronizedWithTransaction 的情况,除非开发人员玩花活
         *
         * 注:在一个Spring事务(单一事务、嵌套事务)
         *      事务暂停:`txObject.setConnectionHolder(null)`
         *      事务完成:` txObject.getConnectionHolder().setSynchronizedWithTransaction(false);`
         * */
        if (!txObject.hasConnectionHolder() ||
                txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            /**
             * 拿到连接 {@link DataSource#getConnection()}
             *
             * 这里就是动态数据源的原理了 {@link AbstractRoutingDataSource#getConnection()}
             * */
            Connection newCon = obtainDataSource().getConnection();
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }
            // 装饰 Connection 并设置给事务对象
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        /**
         * 表示 这个事务对象的连接 是事务同步
         *
         * 在事物完成时 会将该属性设置为 false
         *  {@link DataSourceTransactionManager#doCleanupAfterCompletion(Object)}
         *  {@link ConnectionHolder#clear()}
         * */
        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        con = txObject.getConnectionHolder().getConnection();

        /**
         * 就是根据 @Transactional 注解值,给Connection设置 只读属性、隔离级别属性。
         * 返回的是 Connection 之前的隔离饥级别信息
         * */
        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        // 隔离级别
        txObject.setPreviousIsolationLevel(previousIsolationLevel);
        // 是否只读
        txObject.setReadOnly(definition.isReadOnly());

        // 如果连接是自动提交,就设置为非自动提交
        // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
        // so we don't want to do it unnecessarily (for example if we've explicitly
        // configured the connection pool to set it already).
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            if (logger.isDebugEnabled()) {
                logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }
            // 设置为非自动提交
            con.setAutoCommit(false);
        }

        /**
         * 就比如 `@Transactional(readOnly = true)` 就设置事务为只读的
         * 如果是只读的就执行sql设置为只读事务  `SET TRANSACTION READ ONLY`
         * */
        prepareTransactionalConnection(con, definition);
        /**
         * 活动的事务。
         * 在事务完成时才会将该属性设置为false,暂停事务并不会
         * */
        txObject.getConnectionHolder().setTransactionActive(true);

        /**
         * `@Transactional(timeout = -1)` 不是-1才拿注解值,没有就返回默认值 -1
         * */
        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            // 设置超时时间
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        /**
         * 是新创建的connect的,就绑定到ThreadLocal中 {@link TransactionSynchronizationManager#resources}
         * 缓存的Map<DataSource,ConnectionHolder>。使用DataSource作为key,是因为一个线程可能有来回切换数据源的情况(动态数据源)
         * */
        // Bind the connection holder to the thread.
        if (txObject.isNewConnectionHolder()) {
            /**
             * 绑定到事务资源中 {@link TransactionSynchronizationManager#resources}
             * */
            TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
        }
    } catch (Throwable ex) {
        if (txObject.isNewConnectionHolder()) {
            /**
             * 从事务资源 {@link TransactionSynchronizationManager#resources} 中移除该连接,或者是关闭连接
             *
             * */
            DataSourceUtils.releaseConnection(con, obtainDataSource());
            // 清除 txObject 的连接信息
            txObject.setConnectionHolder(null, false);
        }
        throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
}

1.3.1.6 恢复事务
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
        throws TransactionException {

    if (resourcesHolder != null) {
        // 挂起的资源 比如数据库连接
        Object suspendedResources = resourcesHolder.suspendedResources;
        if (suspendedResources != null) {
            /**
             * 1️⃣恢复事务资源。
             * 就是重新设置回事务资源ThreadLocal中 {@link TransactionSynchronizationManager#resources}
             * */
            doResume(transaction, suspendedResources);
        }
        List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
        if (suspendedSynchronizations != null) {
            TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
            TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
            // 2️⃣恢复事务同步资源
            doResumeSynchronization(suspendedSynchronizations);
        }
    }
}

1️⃣恢复资源同步

protected void doResume(@Nullable Object transaction, Object suspendedResources) {
    TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}

2️⃣恢复行为同步

private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) {
    TransactionSynchronizationManager.initSynchronization();
    for (TransactionSynchronization synchronization : suspendedSynchronizations) {
        /**
         * {@link org.springframework.jdbc.datasource.DataSourceUtils.ConnectionSynchronization#resume()}
         * 就是将其Connection重新设置到 {@link TransactionSynchronizationManager#resources} 中
         * */
        synchronization.resume();
        /**
         * 重新注册回 {@link TransactionSynchronizationManager#synchronizations}
         * */
        TransactionSynchronizationManager.registerSynchronization(synchronization);
    }
}
1.3.1.7 创建事务状态
protected final DefaultTransactionStatus prepareTransactionStatus(
          TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
          boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

			// 事务状态
      DefaultTransactionStatus status = newTransactionStatus(
              definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);

			// 初始化行为同步
      prepareSynchronization(status, definition);
      return status;
  }



// 创建事务状态
protected DefaultTransactionStatus newTransactionStatus(
        TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
        boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

    /**
     * newSynchronization 且 不是同步激活状态  就是 真的新同步
     *
     * 同步激活状态:
     *      - true:简单来说就是Java虚拟机栈中存在@Transactional的方法
     *      - false:暂停当前事务、完成当前事务(rollback或者commit)
     *
     * 结论:当前线程在 TransactionSynchronizationManager 没有记录信息, actualNewSynchronization 就是true
     * */
    boolean actualNewSynchronization = newSynchronization &&
            !TransactionSynchronizationManager.isSynchronizationActive();
    return new DefaultTransactionStatus(
            transaction, newTransaction, actualNewSynchronization,
            definition.isReadOnly(), debug, suspendedResources);
}

    /**
     * 适当地初始化事务同步。
     * Initialize transaction synchronization as appropriate.
     */
    protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
        /**
         * 是新的同步,才记录这些信息(注:空事务也算的)
         * */
        if (status.isNewSynchronization()) {
            /**
             * 真的活动的事务。就是得有事务,且事务是新的才是true
             * */
            TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
            // 记录事务隔离级别
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                    definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                            definition.getIsolationLevel() : null);
            // 是否只读
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
            // 事务的name
            TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
            // 同步属性初始化,会根据这个属性是否为null判断 是不是 isNewSynchronization
            TransactionSynchronizationManager.initSynchronization();
        }
    }
1.3.2 创建事务信息
TransactionAspectSupport:

protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
                                                 @Nullable TransactionAttribute txAttr, String joinpointIdentification,
                                                 @Nullable TransactionStatus status) {

    TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
    if (txAttr != null) {
        // We need a transaction for this method...
        if (logger.isTraceEnabled()) {
            logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
        }
        // 设置事务状态
        // The transaction manager will flag an error if an incompatible tx already exists.
        txInfo.newTransactionStatus(status);
    } else {
        // The TransactionInfo.hasTransaction() method will return false. We created it only
        // to preserve the integrity of the ThreadLocal stack maintained in this class.
        if (logger.isTraceEnabled()) {
            logger.trace("No need to create transaction for [" + joinpointIdentification +
                    "]: This method is not transactional.");
        }
    }

    /**
     * 绑定到这个属性上 {@link TransactionAspectSupport#transactionInfoHolder}
     * 并且会记录原来ThreadLocal的值,当事务结束了要恢复回去 {@link TransactionAspectSupport#cleanupTransactionInfo(TransactionInfo)}
     * */
    // We always bind the TransactionInfo to the thread, even if we didn't create
    // a new transaction here. This guarantees that the TransactionInfo stack
    // will be managed correctly even if no transaction was created by this aspect.
    txInfo.bindToThread();
    return txInfo;
}

1.4 出现异常的处理

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
        if (logger.isTraceEnabled()) {
            logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                    "] after exception: " + ex);
        }

        /**
         * 异常是需要回滚 {@link RuleBasedTransactionAttribute#rollbackOn(Throwable)}
         *
         * - 使用 @Transactional 配置的rollback进行匹配,匹配就返回结果
         * - 没有匹配rollback,就判断异常是不是  RuntimeException 、Error 是就回滚
         * */
        if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
            try {
                /**
                 * 1️⃣回滚
                 * {@link AbstractPlatformTransactionManager#rollback(TransactionStatus)}
                 * */
                txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
            } catch (TransactionSystemException ex2) {
                logger.error("Application exception overridden by rollback exception", ex);
                ex2.initApplicationException(ex);
                throw ex2;
            } catch (RuntimeException | Error ex2) {
                logger.error("Application exception overridden by rollback exception", ex);
                throw ex2;
            }
        } else {
            /**
             * 2️⃣提交事务
             * {@link AbstractPlatformTransactionManager#commit(TransactionStatus)}
             * */
            // We don't roll back on this exception.
            // Will still roll back if TransactionStatus.isRollbackOnly() is true.
            try {
                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
            } catch (TransactionSystemException ex2) {
                logger.error("Application exception overridden by commit exception", ex);
                ex2.initApplicationException(ex);
                throw ex2;
            } catch (RuntimeException | Error ex2) {
                logger.error("Application exception overridden by commit exception", ex);
                throw ex2;
            }
        }
    }
}

1️⃣回滚

public final void rollback(TransactionStatus status) throws TransactionException {
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
                "Transaction is already completed - do not call commit or rollback more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    processRollback(defStatus, false);
}

    private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
        try {
            boolean unexpectedRollback = unexpected;

            try {
                /**
                 * 回调 TransactionSynchronization
                 * 就是遍历 {@link TransactionSynchronizationManager#synchronizations} 属性
				 * 完成事务前回调
                 * */
                triggerBeforeCompletion(status);

                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Rolling back transaction to savepoint");
                    }
                    /**
                     * 回滚到最前的保存点
                     * */
                    status.rollbackToHeldSavepoint();
                } else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction rollback");
                    }
                    /**
                     * 回滚
                     * {@link org.springframework.jdbc.datasource.DataSourceTransactionManager#doRollback(DefaultTransactionStatus)}
                     * */
                    doRollback(status);
                } else {
                    // Participating in larger transaction
                    if (status.hasTransaction()) {
                        if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                            }
                            doSetRollbackOnly(status);
                        } else {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                            }
                        }
                    } else {
                        logger.debug("Should roll back transaction but cannot - no transaction available");
                    }
                    // Unexpected rollback only matters here if we're asked to fail early
                    if (!isFailEarlyOnGlobalRollbackOnly()) {
                        unexpectedRollback = false;
                    }
                }
            } catch (RuntimeException | Error ex) {
                // 回调 TransactionSynchronization 完成事务后回调
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw ex;
            }

            // 回调 TransactionSynchronization  完成事务后回调
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

            // Raise UnexpectedRollbackException if we had a global rollback-only marker
            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                        "Transaction rolled back because it has been marked as rollback-only");
            }
        } finally {
            // 恢复上个事务的资源
            cleanupAfterCompletion(status);
        }
    }

2️⃣提交

AbstractPlatformTransactionManager:

public final void commit(TransactionStatus status) throws TransactionException {
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
                "Transaction is already completed - do not call commit or rollback more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    if (defStatus.isLocalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Transactional code has requested rollback");
        }
        // 回滚
        processRollback(defStatus, false);
        return;
    }

    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
        }
        // 回滚
        processRollback(defStatus, true);
        return;
    }

    // commit
    processCommit(defStatus);
}


 private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;

            try {
                boolean unexpectedRollback = false;
                prepareForCommit(status);
                // 回调 TransactionSynchronization 提交前回调
                triggerBeforeCommit(status);
                // 回调 TransactionSynchronization 完成前回调
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;

                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Releasing transaction savepoint");
                    }
                    unexpectedRollback = status.isGlobalRollbackOnly();
                    status.releaseHeldSavepoint();
                } else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction commit");
                    }
                    unexpectedRollback = status.isGlobalRollbackOnly();
                    // 提交事务
                    doCommit(status);
                } else if (isFailEarlyOnGlobalRollbackOnly()) {
                    unexpectedRollback = status.isGlobalRollbackOnly();
                }

                // Throw UnexpectedRollbackException if we have a global rollback-only
                // marker but still didn't get a corresponding exception from commit.
                if (unexpectedRollback) {
                    throw new UnexpectedRollbackException(
                            "Transaction silently rolled back because it has been marked as rollback-only");
                }
            } catch (UnexpectedRollbackException ex) {
                // can only be caused by doCommit
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                throw ex;
            } catch (TransactionException ex) {
                // can only be caused by doCommit
                if (isRollbackOnCommitFailure()) {
                    doRollbackOnCommitException(status, ex);
                } else {
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                }
                throw ex;
            } catch (RuntimeException | Error ex) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, ex);
                throw ex;
            }

            // Trigger afterCommit callbacks, with an exception thrown there
            // propagated to callers but the transaction still considered as committed.
            try {
                // 回调 TransactionSynchronization 提交后回调
                triggerAfterCommit(status);
            } finally {
                // 回调 TransactionSynchronization
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }

        } finally {
            // 就是恢复暂停的事务资源
            cleanupAfterCompletion(status);
        }
    }

1.5 清空事务信息

TransactionAspectSupport:

protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
    if (txInfo != null) {
        txInfo.restoreThreadLocalStatus();
    }
}

TransactionInfo:

  private void restoreThreadLocalStatus() {
            // Use stack to restore old transaction TransactionInfo.
            // Will be null if none was set.
            transactionInfoHolder.set(this.oldTransactionInfo);
        }

1.6 提交事务

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
        if (logger.isTraceEnabled()) {
            logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
        }
        /**
         * {@link AbstractPlatformTransactionManager#commit(TransactionStatus)}
         * */
        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    }
}
AbstractPlatformTransactionManager:

public final void commit(TransactionStatus status) throws TransactionException {
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
                "Transaction is already completed - do not call commit or rollback more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    if (defStatus.isLocalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Transactional code has requested rollback");
        }
        // 回滚
        processRollback(defStatus, false);
        return;
    }

    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
        }
        // 回滚
        processRollback(defStatus, true);
        return;
    }

    // commit
    processCommit(defStatus);
}


 private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;

            try {
                boolean unexpectedRollback = false;
                prepareForCommit(status);
                // 回调 TransactionSynchronization 提交前回调
                triggerBeforeCommit(status);
                // 回调 TransactionSynchronization 完成前回调
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;

                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Releasing transaction savepoint");
                    }
                    unexpectedRollback = status.isGlobalRollbackOnly();
                    status.releaseHeldSavepoint();
                } else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction commit");
                    }
                    unexpectedRollback = status.isGlobalRollbackOnly();
                    // 提交事务
                    doCommit(status);
                } else if (isFailEarlyOnGlobalRollbackOnly()) {
                    unexpectedRollback = status.isGlobalRollbackOnly();
                }

                // Throw UnexpectedRollbackException if we have a global rollback-only
                // marker but still didn't get a corresponding exception from commit.
                if (unexpectedRollback) {
                    throw new UnexpectedRollbackException(
                            "Transaction silently rolled back because it has been marked as rollback-only");
                }
            } catch (UnexpectedRollbackException ex) {
                // can only be caused by doCommit
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                throw ex;
            } catch (TransactionException ex) {
                // can only be caused by doCommit
                if (isRollbackOnCommitFailure()) {
                    doRollbackOnCommitException(status, ex);
                } else {
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                }
                throw ex;
            } catch (RuntimeException | Error ex) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, ex);
                throw ex;
            }

            // Trigger afterCommit callbacks, with an exception thrown there
            // propagated to callers but the transaction still considered as committed.
            try {
                // 回调 TransactionSynchronization 提交后回调
                triggerAfterCommit(status);
            } finally {
                // 回调 TransactionSynchronization
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }

        } finally {
            // 就是恢复暂停的事务资源
            cleanupAfterCompletion(status);
        }
    }

TransactionalEventListenerFactory

事务事件监听

  1. @EnableTransactionManagement导入ProxyTransactionManagementConfiguration:

    • 首先,@EnableTransactionManagement会导入ProxyTransactionManagementConfiguration
    • ProxyTransactionManagementConfiguration的父类是AbstractTransactionManagementConfiguration,它使用@Bean注入TransactionalEventListenerFactory
  2. TransactionalEventListenerFactory解析@TransactionalEventListener:

    • TransactionalEventListenerFactory用于解析@TransactionalEventListener,将其转换为TransactionalApplicationListenerMethodAdapter并注册到事件广播器中。
  3. TransactionalApplicationListenerMethodAdapter的事件接收逻辑:

    • TransactionalApplicationListenerMethodAdapter#onApplicationEvent(ApplicationEvent)方法用于接收事件。
    • 如果是活动的事务且实际激活的事务不为空,会注册事务同步资源,这在事务完成时(rollback或commit)时会调用其listener和callbacks,实现延时事件的发布。
    • 要满足这个条件,必须在事务方法内发布事件,否则条件不成立。
    • 否则,如果annotation.fallbackExecution()为true,则调用processEvent(event)处理事件,即回调@TransactionalEventListener标注的方法。
    • 如果都不满足,则不执行任何操作。
TransactionalApplicationListenerMethodAdapter: 

/**
 * 触发条件使用事件广播器发布事件。
 * 要想收到事务行为的事件:得在事务方法内使用事件广播器发布事件
 * @param event the event to respond to
 */
@Override
public void onApplicationEvent(ApplicationEvent event) {
    /**
     * 是活动的事务(只要进入事务方法就是true)  且 实际激活的事务(必须不是空事务)
     * */
    if (TransactionSynchronizationManager.isSynchronizationActive() &&
            TransactionSynchronizationManager.isActualTransactionActive()) {
        // 注册事务同步资源,在事务完成时(rollback或者commit)会调用其 listener、callbacks
        TransactionSynchronizationManager.registerSynchronization(
                new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks));
    } else if (this.annotation.fallbackExecution()) {
        // 兜底执行
        if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
            logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
        }
        processEvent(event);
    } else {
        // No transactional event execution at all
        if (logger.isDebugEnabled()) {
            logger.debug("No transaction is active - skipping " + event);
        }
    }
}
TransactionalApplicationListenerSynchronization:

class TransactionalApplicationListenerSynchronization<E extends ApplicationEvent>
        implements TransactionSynchronization {

    private final E event;

    private final TransactionalApplicationListener<E> listener;

    private final List<TransactionalApplicationListener.SynchronizationCallback> callbacks;


    public TransactionalApplicationListenerSynchronization(E event, TransactionalApplicationListener<E> listener,
                                                           List<TransactionalApplicationListener.SynchronizationCallback> callbacks) {

        this.event = event;
        this.listener = listener;
        this.callbacks = callbacks;
    }


    @Override
    public int getOrder() {
        return this.listener.getOrder();
    }

    @Override
    public void beforeCommit(boolean readOnly) {
        if (this.listener.getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) {
            processEventWithCallbacks();
        }
    }

    @Override
    public void afterCompletion(int status) {
        TransactionPhase phase = this.listener.getTransactionPhase();
        if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
            processEventWithCallbacks();
        } else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
            processEventWithCallbacks();
        } else if (phase == TransactionPhase.AFTER_COMPLETION) {
            processEventWithCallbacks();
        }
    }

    private void processEventWithCallbacks() {
        // 回调 SynchronizationCallback 前置方法
        this.callbacks.forEach(callback -> callback.preProcessEvent(this.event));
        try {
            /**
             * 回调监听器
             * {@link ApplicationListenerMethodAdapter#processEvent(ApplicationEvent)}
             * */
            this.listener.processEvent(this.event);
        } catch (RuntimeException | Error ex) {
            // 回调 SynchronizationCallback 后置方法
            this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, ex));
            throw ex;
        }
        // 回调 SynchronizationCallback 后置方法
        this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, null));
    }

}