spring事务
spring事务
- @EnableTransactionManagement会import到容器中TransactionManagementConfigurationSelector。
- TransactionManagementConfigurationSelector会向容器注册相关的切面、切点、增强逻辑。
- 切点逻辑是匹配事务注解的方法。
- 增强逻辑是TransactionInterceptor实现的。
关键类
PlatformTransactionManager
事务管理器,定义了实现事务的规范。
public interface PlatformTransactionManager extends TransactionManager {
// 开启事务
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
// 提交事务
void commit(TransactionStatus status) throws TransactionException;
// 回滚事务
void rollback(TransactionStatus status) throws TransactionException;
}
AbstractPlatformTransactionManager
- 判断当前是否已经存在一个事务
- 应用合适的事务传播行为
- 在必要的时候挂起/恢复事务
- 提交时检查事务是否被标记成为
rollback-only
- 在回滚时做适当的修改(是执行真实的回滚/还是将事务标记成
rollback-only
) - 触发注册的同步回调
TransactionDefinition
对事务定义的抽象,这些定义有些是数据库层面本身就有的,例如隔离级别、是否只读、超时时间、名称
。
public interface TransactionDefinition {
// 定义了7中事务的传播机制
int PROPAGATION_REQUIRED = 0;
int PROPAGATION_SUPPORTS = 1;
int PROPAGATION_MANDATORY = 2;
int PROPAGATION_REQUIRES_NEW = 3;
int PROPAGATION_NOT_SUPPORTED = 4;
int PROPAGATION_NEVER = 5;
int PROPAGATION_NESTED = 6;
// 4种隔离级别,-1代表的是使用数据库默认的隔离级别
// 比如在MySQL下,使用的就是ISOLATION_REPEATABLE_READ(可重复读)
int ISOLATION_DEFAULT = -1;
int ISOLATION_READ_UNCOMMITTED = 1;
int ISOLATION_READ_COMMITTED = 2;
int ISOLATION_REPEATABLE_READ = 4;
int ISOLATION_SERIALIZABLE = 8;
// 事务的超时时间,默认不限制时间
int TIMEOUT_DEFAULT = -1;
// 提供了对上面三个属性的get方法
default int getPropagationBehavior() {
return PROPAGATION_REQUIRED;
}
default int getIsolationLevel() {
return ISOLATION_DEFAULT;
}
default int getTimeout() {
return TIMEOUT_DEFAULT;
}
// 事务是否是只读的,默认不是
default boolean isReadOnly() {
return false;
}
// 事务的名称
@Nullable
default String getName() {
return null;
}
// 返回一个只读的TransactionDefinition
// 只对属性提供了getter方法,所有属性都是接口中定义的默认值
static TransactionDefinition withDefaults() {
return StaticTransactionDefinition.INSTANCE;
}
}
public class DefaultTransactionDefinition implements TransactionDefinition, Serializable {
public static final String PREFIX_PROPAGATION = "PROPAGATION_";
public static final String PREFIX_ISOLATION = "ISOLATION_";
public static final String PREFIX_TIMEOUT = "timeout_";
public static final String READ_ONLY_MARKER = "readOnly";
static final Constants constants = new Constants(TransactionDefinition.class);
private int propagationBehavior = PROPAGATION_REQUIRED;
private int isolationLevel = ISOLATION_DEFAULT;
private int timeout = TIMEOUT_DEFAULT;
private boolean readOnly = false;
@Nullable
private String name;
}
public interface TransactionAttribute extends TransactionDefinition {
/**
* 返回与此事务属性关联的限定符值。
* <p>这可用于选择相应的事务管理器
* 处理此特定交易。
* @since 3.0
*/
@Nullable
String getQualifier();
/**
* 返回与此交易属性关联的标签。
* <p>这可用于应用特定的事务行为
* 或遵循纯粹的描述性。
* @since 5.3
*/
Collection<String> getLabels();
/**
* 我们是否应该回滚给定的异常
* @param ex the exception to evaluate
* @return whether to perform a rollback or not
*/
boolean rollbackOn(Throwable ex);
}
DefaultTransactionAttribute:
@Override
public boolean rollbackOn(Throwable ex) {
// 默认只会滚RuntimeException和Error类型
return (ex instanceof RuntimeException || ex instanceof Error);
}
RuleBasedTransactionAttribute:
@Override
public boolean rollbackOn(Throwable ex) {
RollbackRuleAttribute winner = null;
int deepest = Integer.MAX_VALUE;
if (this.rollbackRules != null) {
/**
* 规则是啥看这里 {@link SpringTransactionAnnotationParser#parseTransactionAnnotation(AnnotationAttributes)}
* */
for (RollbackRuleAttribute rule : this.rollbackRules) {
/**
* depth = -1 表示没匹配到
* */
int depth = rule.getDepth(ex);
if (depth >= 0 && depth < deepest) {
deepest = depth;
// 记录匹配的规则
winner = rule;
}
}
}
// User superclass behavior (rollback on unchecked) if no rule matches.
if (winner == null) {
/**
* 使用父类匹配规则 {@link DefaultTransactionAttribute#rollbackOn(Throwable)}
* 很简单 `return (ex instanceof RuntimeException || ex instanceof Error);`
* */
return super.rollbackOn(ex);
}
// 不是 NoRollbackRuleAttribute 就回滚
return !(winner instanceof NoRollbackRuleAttribute);
}
TransactionStatus
描述创建后的事务的状态
public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {
// 于判断当前事务是否设置了保存点
boolean hasSavepoint();
// 复写了父接口Flushable中的方法
// 主要用于刷新会话
// 对于Hibernate/jpa而言就是调用了其session/entityManager的flush方法
@Override
void flush();
}
// 判断当前事务是否是一个新的事务
// 不是一个新事务的话,那么需要加入到已经存在的事务中
boolean isNewTransaction();
// 事务是否被标记成RollbackOnly
// 如果被标记成了RollbackOnly,意味着事务只能被回滚
void setRollbackOnly();
boolean isRollbackOnly();
// 是否事务完成,回滚或提交都意味着事务完成了
boolean isCompleted();
// 创建保存点
Object createSavepoint() throws TransactionException;
// 回滚到指定保存点
void rollbackToSavepoint(Object savepoint) throws TransactionException;
// 移除回滚点
void releaseSavepoint(Object savepoint) throws TransactionException;
TransactionSynchronizationManager
管理资源同步和行为同步。
资源同步:数据库连接就是跟这个事务同步的一个资源。
行为同步:在事务开启之前我们需要先获取一个数据库连接,同样的在事务提交时我们需要将连接关闭(不一定是真正的关闭,如果是连接池只是归还到连接池中),这个时候关闭连接这个行为也需要跟事务进行同步。
public abstract class TransactionSynchronizationManager {
/**
* 事务资源,就是当前事物内涉及到的所有资源(数据库连接)
*<p/></p?>
* 比如数据库连接:
* Key:DataSource 对象
* Value:ConnectionHolder
*<p/>
* 什么时候会设置值:<p/>
* 1. 开启新事务时,会通过 DataSource 获取连接,并将连接存到这里<p/>
* 2. 执行 {@link org.springframework.jdbc.datasource.DataSourceUtils#getConnection(DataSource)} 获取连接,使用 DataSource 做为key从 resources中找不到连接,
* 就会使用 DataSource获取连接,存到 synchronizations 和 这里<p/>
* 注:前提是在事务内执行。简单来说就是Java虚拟机栈中存在@Transactional的方法(就是有{@link TransactionInterceptor#invoke(MethodInvocation)})
*<p/>
* 什么时候移除key对应属性值:
* - 暂停当前事务
* - 完成当前事务(rollback或者commit)
* Tips:{@link TransactionSynchronizationManager#doUnbindResource(Object)}
*/
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
/**
* 事务同步资源,就是在事务中产生的 非事务管理器数据源生成的连接或者是用于在事务完成时(rollback或者commit)要触发事件,都算是事务同步资源
*<p/>
* 比如:
* - ConnectionSynchronization
* 使用工具类获取连接会设置 {@link org.springframework.jdbc.datasource.DataSourceUtils#doGetConnection(DataSource)}
* 这种连接和事务连接不一样,是直接通过数据源获取的连接,不会配置自动提交、超时时间、隔离级别,默认是啥就是啥,
* 唯一的作用就是在事务完成时(rollback或者commit) 释放掉这些连接
* 比如:获取连接用的数据源(d1)和事务管理的数据源(d2)不是同一个时,就会将d1创建的连接装饰成 ConnectionSynchronization,
* 并将该对象存到 synchronizations 属性中,然后对应的连接也会存到注册到 resources 中
*
* - TransactionalApplicationListenerSynchronization
* 发布事务事件时会设置 {@link TransactionalApplicationListenerMethodAdapter#onApplicationEvent(ApplicationEvent)}
* 作用就是在事务完成时(rollback或者commit) 回调其 TransactionalApplicationListener、SynchronizationCallback
*<p/>
* 什么时候会设置值:简单来说就是Java虚拟机栈中存在@Transactional的方法(就是有{@link TransactionInterceptor#invoke(MethodInvocation)})
* 具体一点就是执行完 {@link TransactionAspectSupport#createTransactionIfNecessary(PlatformTransactionManager, TransactionAttribute, String)}
* 该属性就会被初始化,在使用过程中会根据 `synchronizations.get() != null ` 来判断是激活了事务同步 {@link TransactionSynchronizationManager#isSynchronizationActive()}
*<p/>
* 什么时候清空该属性值:
* 1. 暂停当前事务
* 2. 完成当前事务(rollback或者commit)
* Tips:{@link TransactionSynchronizationManager#clear()}
*/
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");
/**
* 实际激活的事务。
* - 当前线程的事务不是空事务 就是 true
* - 空事务 或者 执行非事务方法就是 false
*/
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");
}
TransactionSynchronization
行为的同步
public interface TransactionSynchronization extends Flushable {
// 事务完成的状态
// 0 提交
// 1 回滚
// 2 异常状态,例如在事务执行时出现异常,然后回滚,回滚时又出现异常
// 就会被标记成状态2
int STATUS_COMMITTED = 0;
int STATUS_ROLLED_BACK = 1;
int STATUS_UNKNOWN = 2;
// 我们绑定的这些TransactionSynchronization需要跟事务同步
// 1.如果事务挂起,我们需要将其挂起
// 2.如果事务恢复,我们需要将其恢复
default void suspend() {
}
default void resume() {
}
@Override
default void flush() {
}
// 在事务执行过程中,提供的一些回调方法
default void beforeCommit(boolean readOnly) {
}
default void beforeCompletion() {
}
default void afterCommit() {
}
default void afterCompletion(int status) {
}
}
@EnableTransactionManagement
@EnableTransactionManagement注解用于启用Spring的事务管理功能。
@EnableTransactionManagement:
- 使用
@EnableTransactionManagement
会导入@Import(TransactionManagementConfigurationSelector.class)
。 - 解析配置类时,因为
TransactionManagementConfigurationSelector
的父类AdviceModeImportSelector
实现了ImportSelector
,所以会回调AdviceModeImportSelector#selectImports(AnnotationMetadata)
方法。 - 在这个回调方法中,获取
@EnableTransactionManagement(mode = AdviceMode.PROXY)
注解的mode属性值,回调子类方法TransactionManagementConfigurationSelector#selectImports(AdviceMode)
。
- 使用
TransactionManagementConfigurationSelector的工作流程:
TransactionManagementConfigurationSelector#selectImports(AdviceMode)
方法返回两个类:AutoProxyRegistrar
和ProxyTransactionManagementConfiguration
,将它们添加到BeanDefinitionMap中。
AutoProxyRegistrar:
- 继承
ImportBeanDefinitionRegistrar
,所以解析@Import
时会回调AutoProxyRegistrar#registerBeanDefinitions(AnnotationMetadata, BeanDefinitionRegistry)
方法。 - 该方法会执行
AopConfigUtils#registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry)
,注册InfrastructureAdvisorAutoProxyCreator
到容器中。 InfrastructureAdvisorAutoProxyCreator
是BeanPostProcessor,在实例化前、提前AOP、初始后判断bean是否要进行代理。
- 继承
ProxyTransactionManagementConfiguration:
- 继承
AbstractTransactionManagementConfiguration
。 - 通过@Bean注册
TransactionalEventListenerFactory
,用于处理@TransactionalEventListener
标注的方法,将方法构造成事件监听器,注册到事件广播器中。 - 通过@Bean注册Advisor、Advisor的Advice和AnnotationTransactionAttributeSource。
BeanFactoryTransactionAttributeSourceAdvisor
实现PointcutAdvisor接口,决定是否进行代理的依据是PointcutAdvisor#getPointcut()
,其Pointcut是TransactionAttributeSourcePointcut
。TransactionInterceptor
是Advice,实现了事务增强逻辑。- Advisor和Advice都依赖
AnnotationTransactionAttributeSource
这个bean,用来查找、解析@Transactional。在方法上找、类上找。
- 继承
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
/**
* 指示是否要创建基于子类的 (CGLIB) 代理 ({@code true}) 为
* 与基于标准 Java 接口的代理相反 ({@code false})。默认值为
* {@code false}。<strong>仅当 {@link #mode()} 设置为
* {@link AdviceMode#PROXY}</strong>。
* <p>请注意,将此属性设置为 {@code true} 将影响<em>所有</em>
* Spring 管理的 Bean 需要代理,而不仅仅是那些标有
* {@code @Transactional}。例如,标有 Spring 的其他豆子
* {@code @Async} 注解将同时升级为子类代理
*时间。这种方法在实践中没有负面影响,除非明确
* 期望一种类型的代理与另一种类型的代理,例如在测试中。
*/
boolean proxyTargetClass() default false;
/**
* Indicate how transactional advice should be applied.
* <p><b>The default is {@link AdviceMode#PROXY}.</b>
* Please note that proxy mode allows for interception of calls through the proxy
* only. Local calls within the same class cannot get intercepted that way; an
* {@link Transactional} annotation on such a method within a local call will be
* ignored since Spring's interceptor does not even kick in for such a runtime
* scenario. For a more advanced mode of interception, consider switching this to
* {@link AdviceMode#ASPECTJ}.
*/
AdviceMode mode() default AdviceMode.PROXY;
/**
* Indicate the ordering of the execution of the transaction advisor
* when multiple advices are applied at a specific joinpoint.
* <p>The default is {@link Ordered#LOWEST_PRECEDENCE}.
*/
int order() default Ordered.LOWEST_PRECEDENCE;
}
AutoProxyRegistrar
向容器中注册InfrastructureAdvisorAutoProxyCreator,查找容器中特定类型的Advisor。
// AnnotationMetadata,代表的是AutoProxyRegistrar的导入类的元信息
// 既包含了类元信息,也包含了注解元信息
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
boolean candidateFound = false;
// 获取@EnableTransactionManagement所在配置类上的注解元信息
Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
// 遍历注解
for (String annType : annTypes) {
// 可以理解为将注解中的属性转换成一个map
AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
if (candidate == null) {
continue;
}
// 直接从map中获取对应的属性
Object mode = candidate.get("mode");
Object proxyTargetClass = candidate.get("proxyTargetClass");
// mode,代理模型,一般都是SpringAOP
// proxyTargetClass,是否使用cglib代理
if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
Boolean.class == proxyTargetClass.getClass()) {
// 注解中存在这两个属性,并且属性类型符合要求,表示找到了合适的注解
candidateFound = true;
// 实际上会往容器中注册一个InfrastructureAdvisorAutoProxyCreator
if (mode == AdviceMode.PROXY) {
AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
if ((Boolean) proxyTargetClass) {
AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
return;
}
}
}
}
// ......
}
@EnableAspectJAutoProxy注解也向容器中注册了一个能实现自动代理的bd,那么当@EnableAspectJAutoProxy跟@EnableTransactionManagement同时使用,会根据优先级仅生效一个。
private static BeanDefinition registerOrEscalateApcAsRequired(
Class<?> cls, BeanDefinitionRegistry registry, @Nullable Object source) {
if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) {
BeanDefinition apcDefinition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME);
if (!cls.getName().equals(apcDefinition.getBeanClassName())) {
// 当前已经注册到容器中的Bean的优先级
int currentPriority = findPriorityForClass(apcDefinition.getBeanClassName());
// 当前准备注册到容器中的Bean的优先级
int requiredPriority = findPriorityForClass(cls);
// 谁的优先级大就注册谁,AnnotationAwareAspectJAutoProxyCreator是最大的
// 所以AnnotationAwareAspectJAutoProxyCreator会覆盖别的Bean
if (currentPriority < requiredPriority) {
apcDefinition.setBeanClassName(cls.getName());
}
}
return null;
}
// 注册bd
RootBeanDefinition beanDefinition = new RootBeanDefinition(cls);
beanDefinition.setSource(source);
beanDefinition.getPropertyValues().add("order", Ordered.HIGHEST_PRECEDENCE);
beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
registry.registerBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME, beanDefinition);
return beanDefinition;
}
ProxyTransactionManagementConfiguration
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
/**
* transactionAdvisor 就是 @EnableTransactionManagement 的增强器,而 @Role(BeanDefinition.ROLE_INFRASTRUCTURE) 也是有用的
* 因为 InfrastructureAdvisorAutoProxyCreator 只会使用Role的值是 {@link BeanDefinition.ROLE_INFRASTRUCTURE} 的 Advisor 来判断后置处理的bean是否需要代理
* {@link BeanFactoryAdvisorRetrievalHelper#findAdvisorBeans()}
* */
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
/**
* 实现 PointcutAdvisor 接口,所以是否要进行代理得看 {@link PointcutAdvisor#getPointcut()}
* 而 BeanFactoryTransactionAttributeSourceAdvisor 的 Pointcut是这个 {@link TransactionAttributeSourcePointcut}
* 而 TransactionAttributeSourcePointcut 类匹配和方法匹配是使用 transactionAttributeSource 来解析注解的
* - ClassFilter {@link TransactionAttributeSourcePointcut.TransactionAttributeSourceClassFilter}
* 类不是java包下的 不是 Ordered类 就是匹配
*
* - MethodMatcher {@link TransactionAttributeSourcePointcut#matches(Method, Class)}
* 查找 方法->方法声明的类 有@Transactional 就是匹配
* */
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
// 默认是注册的这个类型 AnnotationTransactionAttributeSource
advisor.setTransactionAttributeSource(transactionAttributeSource);
/**
* Advice, 也就是具体的增强逻辑
* */
advisor.setAdvice(transactionInterceptor);
if (this.enableTx != null) {
/**
* 设置排序值
* */
advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
}
return advisor;
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {
/**
* Advisor 和 Advice 都依赖了这个bean。
* TransactionAttributeSource 该对象很简单,就是解析 @Transactional 注解,解析成 RuleBasedTransactionAttribute 对象
* */
return new AnnotationTransactionAttributeSource();
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
/**
* 就是 BeanFactoryTransactionAttributeSourceAdvisor 的 advice
* */
TransactionInterceptor interceptor = new TransactionInterceptor();
/**
* 依赖了 transactionAttributeSource,这东西是用来拿到方法、类上的 @Transactional注解,解析成 RuleBasedTransactionAttribute 对象
* */
interceptor.setTransactionAttributeSource(transactionAttributeSource);
if (this.txManager != null) {
/**
* 设置事务管理器,该属性值是父类依赖注入 TransactionManagementConfigurer 类型的bean设置的
* */
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}
@Configuration
public abstract class AbstractTransactionManagementConfiguration implements ImportAware {
/**
* 就是 @EnableTransactionManagement 的元数据
*/
@Nullable
protected AnnotationAttributes enableTx;
/**
* Default transaction manager, as configured through a {@link TransactionManagementConfigurer}.
*/
@Nullable
protected TransactionManager txManager;
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
this.enableTx = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableTransactionManagement.class.getName(), false));
if (this.enableTx == null) {
throw new IllegalArgumentException(
"@EnableTransactionManagement is not present on importing class " + importMetadata.getClassName());
}
}
@Autowired(required = false)
void setConfigurers(Collection<TransactionManagementConfigurer> configurers) {
/**
* 配置事务管理器。事务管理器是用于事务的开启、回滚、提交 就是通过这个接口统一调用的,
* 其依赖 TransactionSynchronizationManager 管理事务的状态
* */
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one TransactionManagementConfigurer may exist");
}
TransactionManagementConfigurer configurer = configurers.iterator().next();
this.txManager = configurer.annotationDrivenTransactionManager();
}
@Bean(name = TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public static TransactionalEventListenerFactory transactionalEventListenerFactory() {
/**
* {@link EventListenerMethodProcessor} 后置处理器会用到 EventListenerFactory,
* 而 TransactionalEventListenerFactory 用于处理 @TransactionalEventListener 标注的方法,将方法构造成事件监听器,注册到事件广播器中
* */
return new TransactionalEventListenerFactory();
}
}
BeanFactoryTransactionAttributeSourceAdvisor
切面,看切点匹配逻辑以及通知增强逻辑。
/**
* 由 {@link TransactionAttributeSource} 驱动的顾问程序,用于包括
* 事务性方法的事务建议 Bean。
*
* @author Juergen Hoeller
* @since 2.5.5
* @see #setAdviceBeanName
* @see TransactionInterceptor
* @see TransactionAttributeSourceAdvisor
*/
@SuppressWarnings("serial")
public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {
@Nullable
private TransactionAttributeSource transactionAttributeSource;
private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
@Override
@Nullable
protected TransactionAttributeSource getTransactionAttributeSource() {
return transactionAttributeSource;
}
};
/**
* Set the transaction attribute source which is used to find transaction
* attributes. This should usually be identical to the source reference
* set on the transaction interceptor itself.
* @see TransactionInterceptor#setTransactionAttributeSource
*/
public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
this.transactionAttributeSource = transactionAttributeSource;
}
/**
* Set the {@link ClassFilter} to use for this pointcut.
* Default is {@link ClassFilter#TRUE}.
*/
public void setClassFilter(ClassFilter classFilter) {
this.pointcut.setClassFilter(classFilter);
}
@Override
public Pointcut getPointcut() {
return this.pointcut;
}
}
TransactionAttributeSourcePointcut
切点,类以及方法匹配。
abstract class TransactionAttributeSourcePointcut extends StaticMethodMatcherPointcut implements Serializable {
protected TransactionAttributeSourcePointcut() {
/**
* TransactionAttributeSourceClassFilter 会执行抽象方法 `getTransactionAttributeSource` 然后执行 {@link TransactionAttributeSource#isCandidateClass(Class)}
* 判断类是否匹配。
*
* 注:过滤规则很简单,只要类不是java包下的 不是 Ordered接口 就是匹配
* */
setClassFilter(new TransactionAttributeSourceClassFilter());
}
@Override
public boolean matches(Method method, Class<?> targetClass) {
/**
* TransactionAttributeSourcePointcut 实现了 MethodMatcher,所以判断方法匹配的时候会执行当前方法。
* 会执行抽象方法 `getTransactionAttributeSource` 然后执行 {@link TransactionAttributeSource#getTransactionAttribute(Method, Class)}
*
* {@link AbstractFallbackTransactionAttributeSource#getTransactionAttribute(Method, Class)}
* 注:
* 1. 过滤规则很简单,方法 -> 方法声明的类 先找到@Transactional就返回。也就是有注解就是匹配
* 2. 如果方法不是public的,直接返回null 不解析上面的@Transactional注解,也就是不代理
* */
TransactionAttributeSource tas = getTransactionAttributeSource();
return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}
/**
* 获取基础 TransactionAttributeSource(可能为 {@code null})。
* 由子类实现。
*/
@Nullable
protected abstract TransactionAttributeSource getTransactionAttributeSource();
/**
* {@link ClassFilter} that delegates to {@link TransactionAttributeSource#isCandidateClass}
* for filtering classes whose methods are not worth searching to begin with.
*/
private class TransactionAttributeSourceClassFilter implements ClassFilter {
@Override
public boolean matches(Class<?> clazz) {
if (TransactionalProxy.class.isAssignableFrom(clazz) ||
TransactionManager.class.isAssignableFrom(clazz) ||
PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) {
return false;
}
TransactionAttributeSource tas = getTransactionAttributeSource();
return (tas == null || tas.isCandidateClass(clazz));
}
}
}
public abstract class AbstractFallbackTransactionAttributeSource
implements TransactionAttributeSource, EmbeddedValueResolverAware {
@Nullable
private transient StringValueResolver embeddedValueResolver;
/**
* Cache of TransactionAttributes, keyed by method on a specific target class.
* <p>As this base class is not marked Serializable, the cache will be recreated
* after serialization - provided that the concrete subclass is Serializable.
*/
private final Map<Object, TransactionAttribute> attributeCache = new ConcurrentHashMap<>(1024);
@Override
public void setEmbeddedValueResolver(StringValueResolver resolver) {
this.embeddedValueResolver = resolver;
}
/**
* Determine a cache key for the given method and target class.
* <p>Must not produce same key for overloaded methods.
* Must produce same key for different instances of the same method.
* @param method the method (never {@code null})
* @param targetClass the target class (may be {@code null})
* @return the cache key (never {@code null})
*/
protected Object getCacheKey(Method method, @Nullable Class<?> targetClass) {
return new MethodClassKey(method, targetClass);
}
/**
* Subclasses need to implement this to return the transaction attribute for the
* given class, if any.
* @param clazz the class to retrieve the attribute for
* @return all transaction attribute associated with this class, or {@code null} if none
*/
@Nullable
protected abstract TransactionAttribute findTransactionAttribute(Class<?> clazz);
/**
* Subclasses need to implement this to return the transaction attribute for the
* given method, if any.
* @param method the method to retrieve the attribute for
* @return all transaction attribute associated with this method, or {@code null} if none
*/
@Nullable
protected abstract TransactionAttribute findTransactionAttribute(Method method);
/**
* Should only public methods be allowed to have transactional semantics?
* <p>The default implementation returns {@code false}.
*/
protected boolean allowPublicMethodsOnly() {
return false;
}
}
TransactionInterceptor
这个nb了,增强逻辑。
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// 拿到被代理类。
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 执行增强逻辑。
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
@Override
@Nullable
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
@Override
public Object getTarget() {
return invocation.getThis();
}
@Override
public Object[] getArguments() {
return invocation.getArguments();
}
});
}
TransactionAspectSupport:
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 用来解析 方法、类上是否有@Transactional
TransactionAttributeSource tas = getTransactionAttributeSource();
// 1.1 拿到@Transactional注解,解析后的属性值
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
/**
* 1.2 推断出要用的事务管理器:@Transactional("tm1")
* 默认的({@link org.springframework.transaction.annotation.AbstractTransactionManagementConfiguration#setConfigurers)}) -> BeanFactory中找TransactionManager
* */
final TransactionManager tm = determineTransactionManager(txAttr);
/**
* 强转,tm 必须是 PlatformTransactionManager 类型的组
* */
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
// 就是一个method的标识
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
/**
* 没有@Transactional注解 或者 事务管理器不是CallbackPreferringPlatformTransactionManager类型
* */
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
/**
* 1.3 如果需要就创建事务(主要是根据事务传播行为来判断的)
* 就是使用DataSource创建Connection,然后设置为非自动提交 `Connection.setAutoCommit(false)`
* */
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// 放行方法
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();
} catch (Throwable ex) {
/**
* 1.4 出现异常的处理,看看是回滚还是提交事务。
* 看看异常类型是不是要回滚的类型,是就回滚,否则就提交事务 {@link RuleBasedTransactionAttribute#rollbackOn(Throwable)}
*
* 而具体的回滚类型是根据 @Transactional(rollbackFor = RuntimeException.class, rollbackForClassName = "a",
* noRollbackFor = Throwable.class, noRollbackForClassName = "b") 的值解析的
*
*
* 无论是回滚,还是提交事务 最终都会执行这个,恢复上一个事务的内容到ThreadLocal中 {@link AbstractPlatformTransactionManager#cleanupAfterCompletion(DefaultTransactionStatus)}
* */
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
} finally {
/**
* 1.5 清除当前事务信息。就是将当前txInfo之前的txInfo恢复到ThreadLocal中
* {@link TransactionAspectSupport#transactionInfoHolder}
* */
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
/**
* 1.6 提交事务。
* {@link TransactionAspectSupport#commitTransactionAfterReturning(TransactionInfo)}
*
* 最终会执行这个,恢复上一个事务的内容到ThreadLocal中 {@link AbstractPlatformTransactionManager#cleanupAfterCompletion(DefaultTransactionStatus)}
* */
commitTransactionAfterReturning(txInfo);
return retVal;
} else {
Object result;
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
try {
Object retVal = invocation.proceedWithInvocation();
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
return retVal;
} catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
} else {
throw new ThrowableHolderException(ex);
}
} else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
} finally {
cleanupTransactionInfo(txInfo);
}
});
} catch (ThrowableHolderException ex) {
throw ex.getCause();
} catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
} catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
}
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 设置事务状态
// The transaction manager will flag an error if an incompatible tx already exists.
txInfo.newTransactionStatus(status);
} else {
// The TransactionInfo.hasTransaction() method will return false. We created it only
// to preserve the integrity of the ThreadLocal stack maintained in this class.
if (logger.isTraceEnabled()) {
logger.trace("No need to create transaction for [" + joinpointIdentification +
"]: This method is not transactional.");
}
}
/**
* 绑定到这个属性上 {@link TransactionAspectSupport#transactionInfoHolder}
* 并且会记录原来ThreadLocal的值,当事务结束了要恢复回去 {@link TransactionAspectSupport#cleanupTransactionInfo(TransactionInfo)}
* */
// We always bind the TransactionInfo to the thread, even if we didn't create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
txInfo.bindToThread();
return txInfo;
}
/**
* 处理Throwable,完成事务。
* 我们可能会提交或回滚,具体取决于配置。
* @param txInfo information about the current transaction
* @param ex throwable encountered
*/
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
/**
* 异常是需要回滚 {@link RuleBasedTransactionAttribute#rollbackOn(Throwable)}
*
* - 使用 @Transactional 配置的rollback进行匹配,匹配就返回结果
* - 没有匹配rollback,就判断异常是不是 RuntimeException 、Error 是就回滚
* */
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
/**
* 回滚
* {@link AbstractPlatformTransactionManager#rollback(TransactionStatus)}
* */
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
} else {
/**
* 提交事务
* {@link AbstractPlatformTransactionManager#commit(TransactionStatus)}
* */
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
/**
* {@link AbstractPlatformTransactionManager#commit(TransactionStatus)}
* */
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
1.1 解析Transactional注解属性
AbstractFallbackTransactionAttributeSource:
@Override
@Nullable
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
if (method.getDeclaringClass() == Object.class) {
return null;
}
// 从缓存取 避免重复解析
Object cacheKey = getCacheKey(method, targetClass);
TransactionAttribute cached = this.attributeCache.get(cacheKey);
if (cached != null) {
if (cached == NULL_TRANSACTION_ATTRIBUTE) {
return null;
} else {
return cached;
}
} else {
// 计算事务属性 方法必须是public的 依次从方法、类上获取。
TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
if (txAttr == null) {
this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
} else {
String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
if (txAttr instanceof DefaultTransactionAttribute) {
DefaultTransactionAttribute dta = (DefaultTransactionAttribute) txAttr;
dta.setDescriptor(methodIdentification);
dta.resolveAttributeStrings(this.embeddedValueResolver);
}
if (logger.isTraceEnabled()) {
logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
}
this.attributeCache.put(cacheKey, txAttr);
}
return txAttr;
}
}
// 计算事务属性 方法必须是public的 依次从方法、类上获取。
@Nullable
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// 方法不是public 就直接返回null, 也就是说@Transactional 得标注在public方法上才有用
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}
// 就是targetClass是代理对象的情况,返回的是其被代理类的方法。
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
// 1.1.1 方法有@Transactional就解析注解值,然后返回
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}
// 1.1.2 尝试从方法声明类上找,有@Transactional就解析注解值,然后返回
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
/**
* 不相等,说明method是代理对象的方法, 上面解析被代理对象的方法找不到注解信息,尝试从 代理对象的方法、代理对象找 @Transactional,
* 找到就返回
* */
if (specificMethod != method) {
// Fallback is to look at the original method.
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
// Last fallback is the class of the original method.
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}
return null;
}
1.1.1 解析方法上事务注解属性
AnnotationTransactionAttributeSource:
@Override
@Nullable
protected TransactionAttribute findTransactionAttribute(Method method) {
/**
* 遍历 annotationParsers,执行 {@link TransactionAnnotationParser#parseTransactionAnnotation(AnnotatedElement)} 返回注解对应的属性信息,
* 解析的对象是 method,说白了就是看看 method 是否有注解
* 没得就返回null
*
* 注:默认有这个 SpringTransactionAnnotationParser,就是是否有 @Transactional
* */
return determineTransactionAttribute(method);
}
1.1.2 解析类上事务注解
AnnotationTransactionAttributeSource:
@Override
@Nullable
protected TransactionAttribute findTransactionAttribute(Class<?> clazz) {
/**
* 遍历 annotationParsers,执行 {@link TransactionAnnotationParser#parseTransactionAnnotation(AnnotatedElement)} 返回注解对应的属性信息,
* 解析的对象是 clazz ,说白了就是看看 clazz是否有注解
* 没得就返回null
*
* 注:默认有这个 SpringTransactionAnnotationParser,就是是否有 @Transactional
* */
return determineTransactionAttribute(clazz);
}
遍历注解解析器解析注解属性
@Nullable
protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
for (TransactionAnnotationParser parser : this.annotationParsers) {
TransactionAttribute attr = parser.parseTransactionAnnotation(element);
if (attr != null) {
return attr;
}
}
return null;
}
解析出事务属性
public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
// 从 AnnotatedElement 找到 @Transactional 注解信息
AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
element, Transactional.class, false, false);
if (attributes != null) {
// 就是将注解的值 设置到 TransactionAttribute 对象中
return parseTransactionAnnotation(attributes);
} else {
return null;
}
}
// 解析注解属性 事务回滚规则用的是RollbackRuleAttribute
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
rbta.setTimeout(attributes.getNumber("timeout").intValue());
String timeoutString = attributes.getString("timeoutString");
Assert.isTrue(!StringUtils.hasText(timeoutString) || rbta.getTimeout() < 0,
"Specify 'timeout' or 'timeoutString', not both");
rbta.setTimeoutString(timeoutString);
rbta.setReadOnly(attributes.getBoolean("readOnly"));
// 这个属性是指定 事务管理的
rbta.setQualifier(attributes.getString("value"));
rbta.setLabels(Arrays.asList(attributes.getStringArray("label")));
/**
* 回滚规则,两种类型:
* - RollbackRuleAttribute 表示要回滚
* - NoRollbackRuleAttribute 表示不回滚
*
* 在这里会用到 {@link RuleBasedTransactionAttribute#rollbackOn(Throwable)}
* */
List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
rbta.setRollbackRules(rollbackRules);
return rbta;
}
1.2推断事务管理器
TransactionAspectSupport:
@Nullable
protected TransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
// 如果未设置 tx 属性,请勿尝试查找 tx 管理器
if (txAttr == null || this.beanFactory == null) {
return getTransactionManager();
}
/**
* 通过限定符查找 @Transactional("tm1") {@link SpringTransactionAnnotationParser#parseTransactionAnnotation(AnnotationAttributes)}
* */
String qualifier = txAttr.getQualifier();
if (StringUtils.hasText(qualifier)) {
/**
* 注解 value值不为空,就根据value从BeanFactory拿到事务管理器。找不到就报错
* */
return determineQualifiedTransactionManager(this.beanFactory, qualifier);
} else if (StringUtils.hasText(this.transactionManagerBeanName)) {
// 设置了 transactionManagerBeanName,就使用这个name找
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
} else {
TransactionManager defaultTransactionManager = getTransactionManager();
if (defaultTransactionManager == null) {
defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
if (defaultTransactionManager == null) {
/**
* 通过 TransactionManager 类型从 BeanFactory拿到事务管理器,
* 类型匹配到多个bean,会通过 @Primary、@Priority确定唯一一个,确定不能就报错了
* */
defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class);
this.transactionManagerCache.putIfAbsent(
DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
}
}
return defaultTransactionManager;
}
}
private TransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) {
TransactionManager txManager = this.transactionManagerCache.get(qualifier);
if (txManager == null) {
txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType(
beanFactory, TransactionManager.class, qualifier);
this.transactionManagerCache.putIfAbsent(qualifier, txManager);
}
return txManager;
}
1.3 createTransactionIfNecessary
AbstractPlatformTransactionManager:
/**
* 如有必要,根据给定的 TransactionAttribute 创建事务。
*/
@SuppressWarnings("serial")
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
/**
* 使用 @Transactional,默认的parser是无法设置name值的,所以都会装饰一下
* {@link SpringTransactionAnnotationParser#parseTransactionAnnotation(AnnotationAttributes)}
* */
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
/**
* 1.3.1 获取事务状态。
*
* 使用 txAttr 通过事务管理器 获取事务状态
*
* {@link AbstractPlatformTransactionManager#getTransaction(TransactionDefinition)}
* - 会根据事务传播行为来决定,是新创建事务、暂停事务、savepoint 等操作
* - 是新建事务,就会使用DataSource获取Connection,然后将Connection设置为非自动提交
*
* DefaultTransactionStatus 事务状态,由这三个东西组成:
* - RuleBasedTransactionAttribute(事务属性):就是描述@Transactional注解的对象
* - DataSourceTransactionObject(事务对象):记录事务的ConnectionHolder
* - SuspendedResourcesHolder(暂停资源持有者):记录上一个事务的ConnectionHolder和TransactionSynchronization
* 注:事务说白了就是一个数据库连接
* */
status = tm.getTransaction(txAttr);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
/**
* 1.3.2 将事务管理器、事务属性、方法标识和事务状态 装饰成 TransactionInfo 返回,
* */
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
1.3.1 获取事务状态
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
/**
* definition 就是描述@Transactional注解的对象,
* definition 是空就给一个默认值
* */
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
/**
* 1.3.1.1 创建事务对象。从 resources 中拿到资源设置给事务对象
* 这个资源并不一定是开启事务时创建的连接,比如空事务情况下,使用 {@link org.springframework.jdbc.datasource.DataSourceUtils#doGetConnection(DataSource)} 获取连接
* 这个连接也会存到 resources 中,目的是在事务方法内能重复使用
*
* 比如:{@link org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction()}
* 会创建这个对象 DataSourceTransactionObject
* 1. 从ThreadLocal中拿到 Map<DataSource,ConnectionHolder> {@link TransactionSynchronizationManager#resources}
* 2. DataSource 作为key,从 resources 拿到 ConnectionHolder 设置给事务对象
* Tips:创建的是这个对象DataSourceTransactionObject,事务对象说白了就是 一个数据库连接的包装对象
* */
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
/**
* 1.3.1.2 判断是否存在事务。就是出现了事务方法嵌套调用的情况
*
* 比如这个事务管理器:
* {@link org.springframework.jdbc.datasource.DataSourceTransactionManager#isExistingTransaction(Object)}
* `txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()`
* 就是事务对象有连接 且 连接是活动的事务 才是true
*
* 空事务不算存在事务,因为其 isTransactionActive 是false
* */
if (isExistingTransaction(transaction)) {
/**
* 1.3.1.3 存在事务的处理,就是根据传播行为看看是:暂停当前事务,新建事务,设置保存点
* */
return handleExistingTransaction(def, transaction, debugEnabled);
}
// 能往下执行,说明还没有事务
/**
* 校验 @Transactional() 设置的值 是否合法
* */
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
/**
* 校验事务隔离级别参数,不存在事务就抛出异常
* */
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
} else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
/**
* 1.3.1.4 暂停事务。入参就是要暂停的事务。其实就是从ThreadLocal中取出这两个属性内容,记录到 SuspendedResourcesHolder 中
* {@link TransactionSynchronizationManager#synchronizations}
* {@link TransactionSynchronizationManager#resources}
* */
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
/**
* 1.3.1.5 开启事务(校验事务隔离级别参数,创建新事务):
* 1. 事务对象,没有连接或者连接isSynchronizedWithTransaction 就通过数据源创建连接然后设置给事务对象
* 2. 将 @Transactional 的信息设置到连接和事务对象中(自动提交、是否只读、隔离级别、超时时间)
* 3. 是新创建的连接,就使用数据源作为key,将连接绑定是事务资源中 {@link TransactionSynchronizationManager#resources}
* 4. 设置 ConnectionHolder 属性 isSynchronizedWithTransaction 为true
* 5. 记录信息到 TransactionSynchronizationManager
* */
return startTransaction(def, transaction, debugEnabled, suspendedResources);
} catch (RuntimeException | Error ex) {
// 1.3.1.6 出现异常,就恢复之前的事务状态到ThreadLocal中
resume(null, suspendedResources);
throw ex;
}
} else {
/**
* 创建空事务,其实就是没有事务
* */
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 1.3.1.7 创建事务状态
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
1.3.1.1 创建事务对象
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
/**
* 使用 DataSource 作为key从 ThreadLocal中拿 ConnectionHolder,
* 没有就是 null,第一次开启事务 这个值就是null咯
* */
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
1.3.1.2 判断是否存在事务
DataSourceTransactionManager:
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
/**
* 在开始事务时会将 isTransactionActive 设置为true
* 只有在事务完成时,才会将事务的持有的 getConnectionHolder().isTransactionActive() 设置为false
* 所以可以通过这个判断 是否存在事务
*
* 空事务不算存在事务,因为空事务不会开启事务(不会给事务对象创建数据库连接),也就不会将 isTransactionActive 设置为true
*
* 在空事务下,使用{@link DataSourceUtils#doGetConnection(DataSource)}获取连接,也不会设置 isTransactionActive 为true,
* 只是存到了 {@link TransactionSynchronizationManager#resources}
* */
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
1.3.1.3 存在事务的处理
/**
* 存在的事务创建事务状态(主要是根据事务传播行为来判断)
*/
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
/**
* 传播行为:存在事务就报错
* */
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
/**
* 传播行为:存在事务就挂起
* */
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
/**
* 1.3.1.3.1 暂停事务。返回 SuspendedResourcesHolder,这里面记录了被移除的资源,和所暂停事务的状态信息
*
* 就是从 {@link TransactionSynchronizationManager#resources} 移除资源,移除的资源会存到 SuspendedResourcesHolder 中
* */
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 1.3.1.3.2 处理事务状态
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
/**
* 传播行为:创建新的事物,已经存在事物就挂起
* */
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
/**
* 1.3.1.3.1 暂停事务,返回的是暂停事务的资源
* */
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
/**
* 1.3.1.3.3 开启新事物,会记录暂停事务的信息
* */
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
} catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
/**
* 传播行为:如果当前事务存在就创建嵌套事务,否则就像 PROPAGATION_REQUIRED 一样处理
*
* 通过这里能体现,没有事务的时候就和PROPAGATION_REQUIRED一样 创建新的事务 {@link AbstractPlatformTransactionManager#getTransaction(TransactionDefinition) }
* */
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
/**
* 不支持嵌套事务 就报错,{@link org.springframework.jdbc.datasource.DataSourceTransactionManager#DataSourceTransactionManager()} 是支持的
* */
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
/**
* 使用保存点来实现 嵌套事务
* DataSourceTransactionManager 就是true
* */
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 执行sql创建保存点
status.createAndHoldSavepoint();
return status;
} else {
// 1.3.1.3.3 开启事务
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition
+ "] specifies isolation level which is incompatible with existing transaction: "
+
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition
+ "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 1.3.1.3.2 处理事务状态
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
- 暂停事务
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
/**
* 是活动的事务。
* 可以这里理解,只要Java虚拟机栈中存在@Transactional的方法,这个判断就是true
* 1. 暂停当前事务
* 2. 完成当前事务(rollback或者commit)
* */
if (TransactionSynchronizationManager.isSynchronizationActive()) {
/**
* 1️⃣ 暂停事务的行为同步。暂停了 TransactionSynchronizationManager.isSynchronizationActive() 就是 false了
*
* 就是拿到 {@link TransactionSynchronizationManager#synchronizations},遍历执行 {@link TransactionSynchronization#suspend()}
* */
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
// 存在事务对象
if (transaction != null) {
/**
* 2️⃣ 暂停事务资源同步。
* {@link org.springframework.jdbc.datasource.DataSourceTransactionManager#doSuspend(Object)}
* 1. 取消事务对象绑定的连接信息 txObject.setConnectionHolder(null);
* 2. 从事务资源中移除事务对象对应的资源(就是移除DataSource作为key的资源) {@link TransactionSynchronizationManager#unbindResource(Object)}
*
* Tips:返回值,就是事务资源中移除的资源
* */
suspendedResources = doSuspend(transaction);
}
// 拿到原来的值
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
/**
* 装饰一下,就是记录现在得值,后面恢复事务需要重新设置到 TransactionSynchronizationManager
* */
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
} catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
} else if (transaction != null) {
/**
* 2️⃣ 暂停事务资源同步。
* */
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
} else {
// Neither transaction nor synchronization active.
return null;
}
}
1️⃣ 暂停行为同步
AbstractPlatformTransactionManager:
private List<TransactionSynchronization> doSuspendSynchronization() {
// 拿到事务同步资源
List<TransactionSynchronization> suspendedSynchronizations =
TransactionSynchronizationManager.getSynchronizations();
// 遍历,然后挂起
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
/**
* 对于数据库连接:
* {@link org.springframework.jdbc.datasource.DataSourceUtils.ConnectionSynchronization#suspend()}
* 如果该Connection存在事务资源中,就取消事务资源对该Connection的引用;
* 否则就关闭连接
* */
synchronization.suspend();
}
// 清空线程事务同步资源
TransactionSynchronizationManager.clearSynchronization();
// 返回 事务同步资源
return suspendedSynchronizations;
}
2️⃣ 暂停资源同步
DataSourceTransactionManager:
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
- 处理事务状态
protected final DefaultTransactionStatus prepareTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
// 事务状态
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
// 初始化行为同步
prepareSynchronization(status, definition);
return status;
}
// 创建事务状态
protected DefaultTransactionStatus newTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
/**
* newSynchronization 且 不是同步激活状态 就是 真的新同步
*
* 同步激活状态:
* - true:简单来说就是Java虚拟机栈中存在@Transactional的方法
* - false:暂停当前事务、完成当前事务(rollback或者commit)
*
* 结论:当前线程在 TransactionSynchronizationManager 没有记录信息, actualNewSynchronization 就是true
* */
boolean actualNewSynchronization = newSynchronization &&
!TransactionSynchronizationManager.isSynchronizationActive();
return new DefaultTransactionStatus(
transaction, newTransaction, actualNewSynchronization,
definition.isReadOnly(), debug, suspendedResources);
}
/**
* 适当地初始化事务同步。
* Initialize transaction synchronization as appropriate.
*/
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
/**
* 是新的同步,才记录这些信息(注:空事务也算的)
* */
if (status.isNewSynchronization()) {
/**
* 真的活动的事务。就是得有事务,且事务是新的才是true
* */
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
// 记录事务隔离级别
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
// 是否只读
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
// 事务的name
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
// 同步属性初始化,会根据这个属性是否为null判断 是不是 isNewSynchronization
TransactionSynchronizationManager.initSynchronization();
}
}
开启事务
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) { /** * 不是从不同步。这个值是看你用的啥事务管理,是事务管理器的属性 * */ boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // new DefaultTransactionStatus 对象 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); /** * {@link org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin(Object, TransactionDefinition)} * 1. 事务对象没有连接或者连接isSynchronizedWithTransaction 就通过数据源创建连接然后设置给事务对象 * 2. 将 @Transactional 的信息设置到连接和事务对象中(设置的内容:自动提交、是否只读、隔离级别、超时时间) * 3. 是新创建的连接,就使用数据源作为key,将连接绑定是事务资源中 {@link TransactionSynchronizationManager#resources} * 4. 设置 ConnectionHolder 属性 isSynchronizedWithTransaction 为true * */ doBegin(transaction, definition); /** * status.isNewSynchronization() 就设置 TransactionSynchronizationManager: * - 记录事务隔离级别 * - 是否只读 * - 事务的name * - {@link TransactionSynchronizationManager#synchronizations} 属性初始化,会根据这个属性是否为null判断 是不是 isNewSynchronization * */ prepareSynchronization(status, definition); return status; }
doBegin
/**
* 1. 事务对象,没有连接或者连接isSynchronizedWithTransaction 就通过数据源创建连接然后设置给事务对象<br/>
* 2. 将 @Transactional 的信息设置到连接和事务对象中(自动提交、是否只读、隔离级别、超时时间) <br/>
* 3. 是新创建的连接,就使用数据源作为key,将连接绑定是事务资源中 {@link TransactionSynchronizationManager#resources} <br/>
* 4. 设置 ConnectionHolder 属性 isSynchronizedWithTransaction 为true
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
/**
* 没有连接 或者 连接是事务同步 那就应该创建一个新的连接
*
* 可以这么理解,当前方法叫 doBegin ,也就是要给事务对象绑定连接。连接没有那就创建一个在绑定合情合理,
* 而连接有了,但是连接是事务同步 说明上一个事务还未完成,你还要绑定 那也给你创建一个新的连接,这种属于非法操作了,如果你老老实实用Spring 别搞啥自定义,是不会出现这种情况的
*
* 因为 doBegin 是在 startTransaction 的时候会执行,而 startTransaction 之前会先暂停当前线程的事物资源,在开启事物,
* 而暂停就是
* 1. 移除事物对象的引用 `txObject.setConnectionHolder(null);`
* 2. 从 {@link TransactionSynchronizationManager#resources} 移除 ConnectionHolder 存起来。这个ConnectionHolder 其实就是上面置空的属性
* Tips: 暂停事务的代码 {@link DataSourceTransactionManager#doSuspend(Object)}
* 记住代码是死的,Spring的事物代码都这么写了,所以基本不可能出现 isSynchronizedWithTransaction 的情况,除非开发人员玩花活
*
* 注:在一个Spring事务(单一事务、嵌套事务)
* 事务暂停:`txObject.setConnectionHolder(null)`
* 事务完成:` txObject.getConnectionHolder().setSynchronizedWithTransaction(false);`
* */
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
/**
* 拿到连接 {@link DataSource#getConnection()}
*
* 这里就是动态数据源的原理了 {@link AbstractRoutingDataSource#getConnection()}
* */
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 装饰 Connection 并设置给事务对象
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
/**
* 表示 这个事务对象的连接 是事务同步
*
* 在事物完成时 会将该属性设置为 false
* {@link DataSourceTransactionManager#doCleanupAfterCompletion(Object)}
* {@link ConnectionHolder#clear()}
* */
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
/**
* 就是根据 @Transactional 注解值,给Connection设置 只读属性、隔离级别属性。
* 返回的是 Connection 之前的隔离饥级别信息
* */
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
// 隔离级别
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 是否只读
txObject.setReadOnly(definition.isReadOnly());
// 如果连接是自动提交,就设置为非自动提交
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
// 设置为非自动提交
con.setAutoCommit(false);
}
/**
* 就比如 `@Transactional(readOnly = true)` 就设置事务为只读的
* 如果是只读的就执行sql设置为只读事务 `SET TRANSACTION READ ONLY`
* */
prepareTransactionalConnection(con, definition);
/**
* 活动的事务。
* 在事务完成时才会将该属性设置为false,暂停事务并不会
* */
txObject.getConnectionHolder().setTransactionActive(true);
/**
* `@Transactional(timeout = -1)` 不是-1才拿注解值,没有就返回默认值 -1
* */
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
// 设置超时时间
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
/**
* 是新创建的connect的,就绑定到ThreadLocal中 {@link TransactionSynchronizationManager#resources}
* 缓存的Map<DataSource,ConnectionHolder>。使用DataSource作为key,是因为一个线程可能有来回切换数据源的情况(动态数据源)
* */
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
/**
* 绑定到事务资源中 {@link TransactionSynchronizationManager#resources}
* */
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
/**
* 从事务资源 {@link TransactionSynchronizationManager#resources} 中移除该连接,或者是关闭连接
*
* */
DataSourceUtils.releaseConnection(con, obtainDataSource());
// 清除 txObject 的连接信息
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
1.3.1.4 暂停事务
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
/**
* 是活动的事务。
* 可以这里理解,只要Java虚拟机栈中存在@Transactional的方法,这个判断就是true
* 1. 暂停当前事务
* 2. 完成当前事务(rollback或者commit)
* */
if (TransactionSynchronizationManager.isSynchronizationActive()) {
/**
* 1️⃣ 暂停事务的行为同步。暂停了 TransactionSynchronizationManager.isSynchronizationActive() 就是 false了
*
* 就是拿到 {@link TransactionSynchronizationManager#synchronizations},遍历执行 {@link TransactionSynchronization#suspend()}
* */
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
// 存在事务对象
if (transaction != null) {
/**
* 2️⃣ 暂停事务资源同步。
* {@link org.springframework.jdbc.datasource.DataSourceTransactionManager#doSuspend(Object)}
* 1. 取消事务对象绑定的连接信息 txObject.setConnectionHolder(null);
* 2. 从事务资源中移除事务对象对应的资源(就是移除DataSource作为key的资源) {@link TransactionSynchronizationManager#unbindResource(Object)}
*
* Tips:返回值,就是事务资源中移除的资源
* */
suspendedResources = doSuspend(transaction);
}
// 拿到原来的值
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
/**
* 装饰一下,就是记录现在得值,后面恢复事务需要重新设置到 TransactionSynchronizationManager
* */
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
} catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
} else if (transaction != null) {
/**
* 2️⃣ 暂停事务资源同步。
* */
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
} else {
// Neither transaction nor synchronization active.
return null;
}
}
1️⃣ 暂停行为同步
AbstractPlatformTransactionManager:
private List<TransactionSynchronization> doSuspendSynchronization() {
// 拿到事务同步资源
List<TransactionSynchronization> suspendedSynchronizations =
TransactionSynchronizationManager.getSynchronizations();
// 遍历,然后挂起
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
/**
* 对于数据库连接:
* {@link org.springframework.jdbc.datasource.DataSourceUtils.ConnectionSynchronization#suspend()}
* 如果该Connection存在事务资源中,就取消事务资源对该Connection的引用;
* 否则就关闭连接
* */
synchronization.suspend();
}
// 清空线程事务同步资源
TransactionSynchronizationManager.clearSynchronization();
// 返回 事务同步资源
return suspendedSynchronizations;
}
2️⃣ 暂停资源同步
DataSourceTransactionManager:
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
1.3.1.5 开启事务
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
/**
* 不是从不同步。这个值是看你用的啥事务管理,是事务管理器的属性
* */
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// new DefaultTransactionStatus 对象
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
/**
* {@link org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin(Object, TransactionDefinition)}
* 1. 事务对象没有连接或者连接isSynchronizedWithTransaction 就通过数据源创建连接然后设置给事务对象
* 2. 将 @Transactional 的信息设置到连接和事务对象中(设置的内容:自动提交、是否只读、隔离级别、超时时间)
* 3. 是新创建的连接,就使用数据源作为key,将连接绑定是事务资源中 {@link TransactionSynchronizationManager#resources}
* 4. 设置 ConnectionHolder 属性 isSynchronizedWithTransaction 为true
* */
doBegin(transaction, definition);
/**
* status.isNewSynchronization() 就设置 TransactionSynchronizationManager:
* - 记录事务隔离级别
* - 是否只读
* - 事务的name
* - {@link TransactionSynchronizationManager#synchronizations} 属性初始化,会根据这个属性是否为null判断 是不是 isNewSynchronization
* */
prepareSynchronization(status, definition);
return status;
}
doBegin
/**
* 1. 事务对象,没有连接或者连接isSynchronizedWithTransaction 就通过数据源创建连接然后设置给事务对象<br/>
* 2. 将 @Transactional 的信息设置到连接和事务对象中(自动提交、是否只读、隔离级别、超时时间) <br/>
* 3. 是新创建的连接,就使用数据源作为key,将连接绑定是事务资源中 {@link TransactionSynchronizationManager#resources} <br/>
* 4. 设置 ConnectionHolder 属性 isSynchronizedWithTransaction 为true
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
/**
* 没有连接 或者 连接是事务同步 那就应该创建一个新的连接
*
* 可以这么理解,当前方法叫 doBegin ,也就是要给事务对象绑定连接。连接没有那就创建一个在绑定合情合理,
* 而连接有了,但是连接是事务同步 说明上一个事务还未完成,你还要绑定 那也给你创建一个新的连接,这种属于非法操作了,如果你老老实实用Spring 别搞啥自定义,是不会出现这种情况的
*
* 因为 doBegin 是在 startTransaction 的时候会执行,而 startTransaction 之前会先暂停当前线程的事物资源,在开启事物,
* 而暂停就是
* 1. 移除事物对象的引用 `txObject.setConnectionHolder(null);`
* 2. 从 {@link TransactionSynchronizationManager#resources} 移除 ConnectionHolder 存起来。这个ConnectionHolder 其实就是上面置空的属性
* Tips: 暂停事务的代码 {@link DataSourceTransactionManager#doSuspend(Object)}
* 记住代码是死的,Spring的事物代码都这么写了,所以基本不可能出现 isSynchronizedWithTransaction 的情况,除非开发人员玩花活
*
* 注:在一个Spring事务(单一事务、嵌套事务)
* 事务暂停:`txObject.setConnectionHolder(null)`
* 事务完成:` txObject.getConnectionHolder().setSynchronizedWithTransaction(false);`
* */
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
/**
* 拿到连接 {@link DataSource#getConnection()}
*
* 这里就是动态数据源的原理了 {@link AbstractRoutingDataSource#getConnection()}
* */
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 装饰 Connection 并设置给事务对象
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
/**
* 表示 这个事务对象的连接 是事务同步
*
* 在事物完成时 会将该属性设置为 false
* {@link DataSourceTransactionManager#doCleanupAfterCompletion(Object)}
* {@link ConnectionHolder#clear()}
* */
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
/**
* 就是根据 @Transactional 注解值,给Connection设置 只读属性、隔离级别属性。
* 返回的是 Connection 之前的隔离饥级别信息
* */
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
// 隔离级别
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 是否只读
txObject.setReadOnly(definition.isReadOnly());
// 如果连接是自动提交,就设置为非自动提交
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
// 设置为非自动提交
con.setAutoCommit(false);
}
/**
* 就比如 `@Transactional(readOnly = true)` 就设置事务为只读的
* 如果是只读的就执行sql设置为只读事务 `SET TRANSACTION READ ONLY`
* */
prepareTransactionalConnection(con, definition);
/**
* 活动的事务。
* 在事务完成时才会将该属性设置为false,暂停事务并不会
* */
txObject.getConnectionHolder().setTransactionActive(true);
/**
* `@Transactional(timeout = -1)` 不是-1才拿注解值,没有就返回默认值 -1
* */
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
// 设置超时时间
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
/**
* 是新创建的connect的,就绑定到ThreadLocal中 {@link TransactionSynchronizationManager#resources}
* 缓存的Map<DataSource,ConnectionHolder>。使用DataSource作为key,是因为一个线程可能有来回切换数据源的情况(动态数据源)
* */
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
/**
* 绑定到事务资源中 {@link TransactionSynchronizationManager#resources}
* */
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
/**
* 从事务资源 {@link TransactionSynchronizationManager#resources} 中移除该连接,或者是关闭连接
*
* */
DataSourceUtils.releaseConnection(con, obtainDataSource());
// 清除 txObject 的连接信息
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
1.3.1.6 恢复事务
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
if (resourcesHolder != null) {
// 挂起的资源 比如数据库连接
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
/**
* 1️⃣恢复事务资源。
* 就是重新设置回事务资源ThreadLocal中 {@link TransactionSynchronizationManager#resources}
* */
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
// 2️⃣恢复事务同步资源
doResumeSynchronization(suspendedSynchronizations);
}
}
}
1️⃣恢复资源同步
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}
2️⃣恢复行为同步
private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) {
TransactionSynchronizationManager.initSynchronization();
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
/**
* {@link org.springframework.jdbc.datasource.DataSourceUtils.ConnectionSynchronization#resume()}
* 就是将其Connection重新设置到 {@link TransactionSynchronizationManager#resources} 中
* */
synchronization.resume();
/**
* 重新注册回 {@link TransactionSynchronizationManager#synchronizations}
* */
TransactionSynchronizationManager.registerSynchronization(synchronization);
}
}
1.3.1.7 创建事务状态
protected final DefaultTransactionStatus prepareTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
// 事务状态
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
// 初始化行为同步
prepareSynchronization(status, definition);
return status;
}
// 创建事务状态
protected DefaultTransactionStatus newTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
/**
* newSynchronization 且 不是同步激活状态 就是 真的新同步
*
* 同步激活状态:
* - true:简单来说就是Java虚拟机栈中存在@Transactional的方法
* - false:暂停当前事务、完成当前事务(rollback或者commit)
*
* 结论:当前线程在 TransactionSynchronizationManager 没有记录信息, actualNewSynchronization 就是true
* */
boolean actualNewSynchronization = newSynchronization &&
!TransactionSynchronizationManager.isSynchronizationActive();
return new DefaultTransactionStatus(
transaction, newTransaction, actualNewSynchronization,
definition.isReadOnly(), debug, suspendedResources);
}
/**
* 适当地初始化事务同步。
* Initialize transaction synchronization as appropriate.
*/
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
/**
* 是新的同步,才记录这些信息(注:空事务也算的)
* */
if (status.isNewSynchronization()) {
/**
* 真的活动的事务。就是得有事务,且事务是新的才是true
* */
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
// 记录事务隔离级别
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
// 是否只读
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
// 事务的name
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
// 同步属性初始化,会根据这个属性是否为null判断 是不是 isNewSynchronization
TransactionSynchronizationManager.initSynchronization();
}
}
1.3.2 创建事务信息
TransactionAspectSupport:
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 设置事务状态
// The transaction manager will flag an error if an incompatible tx already exists.
txInfo.newTransactionStatus(status);
} else {
// The TransactionInfo.hasTransaction() method will return false. We created it only
// to preserve the integrity of the ThreadLocal stack maintained in this class.
if (logger.isTraceEnabled()) {
logger.trace("No need to create transaction for [" + joinpointIdentification +
"]: This method is not transactional.");
}
}
/**
* 绑定到这个属性上 {@link TransactionAspectSupport#transactionInfoHolder}
* 并且会记录原来ThreadLocal的值,当事务结束了要恢复回去 {@link TransactionAspectSupport#cleanupTransactionInfo(TransactionInfo)}
* */
// We always bind the TransactionInfo to the thread, even if we didn't create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
txInfo.bindToThread();
return txInfo;
}
1.4 出现异常的处理
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
/**
* 异常是需要回滚 {@link RuleBasedTransactionAttribute#rollbackOn(Throwable)}
*
* - 使用 @Transactional 配置的rollback进行匹配,匹配就返回结果
* - 没有匹配rollback,就判断异常是不是 RuntimeException 、Error 是就回滚
* */
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
/**
* 1️⃣回滚
* {@link AbstractPlatformTransactionManager#rollback(TransactionStatus)}
* */
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
} else {
/**
* 2️⃣提交事务
* {@link AbstractPlatformTransactionManager#commit(TransactionStatus)}
* */
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
1️⃣回滚
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
/**
* 回调 TransactionSynchronization
* 就是遍历 {@link TransactionSynchronizationManager#synchronizations} 属性
* 完成事务前回调
* */
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
/**
* 回滚到最前的保存点
* */
status.rollbackToHeldSavepoint();
} else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
/**
* 回滚
* {@link org.springframework.jdbc.datasource.DataSourceTransactionManager#doRollback(DefaultTransactionStatus)}
* */
doRollback(status);
} else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
} else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
} else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
} catch (RuntimeException | Error ex) {
// 回调 TransactionSynchronization 完成事务后回调
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
// 回调 TransactionSynchronization 完成事务后回调
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
} finally {
// 恢复上个事务的资源
cleanupAfterCompletion(status);
}
}
2️⃣提交
AbstractPlatformTransactionManager:
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
// 回滚
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
// 回滚
processRollback(defStatus, true);
return;
}
// commit
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
// 回调 TransactionSynchronization 提交前回调
triggerBeforeCommit(status);
// 回调 TransactionSynchronization 完成前回调
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 提交事务
doCommit(status);
} else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
} catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
} else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
} catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
// 回调 TransactionSynchronization 提交后回调
triggerAfterCommit(status);
} finally {
// 回调 TransactionSynchronization
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
} finally {
// 就是恢复暂停的事务资源
cleanupAfterCompletion(status);
}
}
1.5 清空事务信息
TransactionAspectSupport:
protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
if (txInfo != null) {
txInfo.restoreThreadLocalStatus();
}
}
TransactionInfo:
private void restoreThreadLocalStatus() {
// Use stack to restore old transaction TransactionInfo.
// Will be null if none was set.
transactionInfoHolder.set(this.oldTransactionInfo);
}
1.6 提交事务
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
/**
* {@link AbstractPlatformTransactionManager#commit(TransactionStatus)}
* */
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
AbstractPlatformTransactionManager:
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
// 回滚
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
// 回滚
processRollback(defStatus, true);
return;
}
// commit
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
// 回调 TransactionSynchronization 提交前回调
triggerBeforeCommit(status);
// 回调 TransactionSynchronization 完成前回调
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 提交事务
doCommit(status);
} else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
} catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
} else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
} catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
// 回调 TransactionSynchronization 提交后回调
triggerAfterCommit(status);
} finally {
// 回调 TransactionSynchronization
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
} finally {
// 就是恢复暂停的事务资源
cleanupAfterCompletion(status);
}
}
TransactionalEventListenerFactory
事务事件监听
@EnableTransactionManagement导入ProxyTransactionManagementConfiguration:
- 首先,
@EnableTransactionManagement
会导入ProxyTransactionManagementConfiguration
。 ProxyTransactionManagementConfiguration
的父类是AbstractTransactionManagementConfiguration
,它使用@Bean
注入TransactionalEventListenerFactory
。
- 首先,
TransactionalEventListenerFactory解析@TransactionalEventListener:
TransactionalEventListenerFactory
用于解析@TransactionalEventListener
,将其转换为TransactionalApplicationListenerMethodAdapter
并注册到事件广播器中。
TransactionalApplicationListenerMethodAdapter的事件接收逻辑:
TransactionalApplicationListenerMethodAdapter#onApplicationEvent(ApplicationEvent)
方法用于接收事件。- 如果是活动的事务且实际激活的事务不为空,会注册事务同步资源,这在事务完成时(rollback或commit)时会调用其listener和callbacks,实现延时事件的发布。
- 要满足这个条件,必须在事务方法内发布事件,否则条件不成立。
- 否则,如果
annotation.fallbackExecution()
为true,则调用processEvent(event)
处理事件,即回调@TransactionalEventListener
标注的方法。 - 如果都不满足,则不执行任何操作。
TransactionalApplicationListenerMethodAdapter:
/**
* 触发条件使用事件广播器发布事件。
* 要想收到事务行为的事件:得在事务方法内使用事件广播器发布事件
* @param event the event to respond to
*/
@Override
public void onApplicationEvent(ApplicationEvent event) {
/**
* 是活动的事务(只要进入事务方法就是true) 且 实际激活的事务(必须不是空事务)
* */
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
// 注册事务同步资源,在事务完成时(rollback或者commit)会调用其 listener、callbacks
TransactionSynchronizationManager.registerSynchronization(
new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks));
} else if (this.annotation.fallbackExecution()) {
// 兜底执行
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
}
processEvent(event);
} else {
// No transactional event execution at all
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " + event);
}
}
}
TransactionalApplicationListenerSynchronization:
class TransactionalApplicationListenerSynchronization<E extends ApplicationEvent>
implements TransactionSynchronization {
private final E event;
private final TransactionalApplicationListener<E> listener;
private final List<TransactionalApplicationListener.SynchronizationCallback> callbacks;
public TransactionalApplicationListenerSynchronization(E event, TransactionalApplicationListener<E> listener,
List<TransactionalApplicationListener.SynchronizationCallback> callbacks) {
this.event = event;
this.listener = listener;
this.callbacks = callbacks;
}
@Override
public int getOrder() {
return this.listener.getOrder();
}
@Override
public void beforeCommit(boolean readOnly) {
if (this.listener.getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) {
processEventWithCallbacks();
}
}
@Override
public void afterCompletion(int status) {
TransactionPhase phase = this.listener.getTransactionPhase();
if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
processEventWithCallbacks();
} else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
processEventWithCallbacks();
} else if (phase == TransactionPhase.AFTER_COMPLETION) {
processEventWithCallbacks();
}
}
private void processEventWithCallbacks() {
// 回调 SynchronizationCallback 前置方法
this.callbacks.forEach(callback -> callback.preProcessEvent(this.event));
try {
/**
* 回调监听器
* {@link ApplicationListenerMethodAdapter#processEvent(ApplicationEvent)}
* */
this.listener.processEvent(this.event);
} catch (RuntimeException | Error ex) {
// 回调 SynchronizationCallback 后置方法
this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, ex));
throw ex;
}
// 回调 SynchronizationCallback 后置方法
this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, null));
}
}