Spring Dao编程原理-声明式事务的事务传播行为原理

从 0 开始深入学习 Spring 专栏目录总览

上一章的最后,小册留了一个 createTransactionIfNecessary 方法没有研究,很明显它的作用是创建事务。而前面我们在学习事务传播行为时,了解到嵌套 Service 的方法运行,对于内层方法来说,事务传播行为的配置不同,它对于事务的动作 / 态度也是不一样的。这一章,咱着重来研究事务的传播行为,在 SpringFramework 的底层是如何实现的。

7 种传播行为咱并不都逐个研究,而是针对几种比较常见的、以及比较复杂的传播行为来研究,其余比较简单的传播行为,咱在分析整体处理逻辑时顺道解释就好。

0. 测试代码准备

测试代码,我们可以直接拿 com.linkedbear.spring.transaction.e_spread 中的例子来测试,直接在 PointService 的 addPoint 方法,上面的 @Transactional 注解调整传播行为即可。

当然,只改这些还不够,之前在讲解事务传播行为的时候用的是 xml 的方式,这里要换用注解声明式事务,所以我们要额外编写一个配置类:

@Configuration
@ComponentScan("com.linkedbear.spring.transaction.e_spread.service")
@EnableTransactionManagement
public class TransactionSpreadConfiguration {
    
    @Bean
    public DataSource dataSource() {
        DriverManagerDataSource dataSource = new DriverManagerDataSource();
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        dataSource.setUrl("jdbc:mysql://localhost:3306/spring-dao?characterEncoding=utf8");
        dataSource.setUsername("root");
        dataSource.setPassword("123456");
        return dataSource;
    }
    
    @Bean
    public JdbcTemplate jdbcTemplate() {
        return new JdbcTemplate(dataSource());
    }
    
    @Bean
    public TransactionManager transactionManager() {
        return new DataSourceTransactionManager(dataSource());
    }
}

以及对应的测试启动类:

public class TransactionSpreadSourceApplication {
    
    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(TransactionSpreadConfiguration.class);
        UserService userService = ctx.getBean(UserService.class);
        userService.register();
    }
}

这样测试代码就算全部准备好了。

1. REQUIRED研究

首先咱来测试默认的 REQUIRED 传播,前面咱已经学过了传播行为的效果,当外层已经有事务时,REQUIRED 行为会直接加入到当前事务中。

1.1 Debug至invokeWithinTransaction

上一章我们已经把整个事务控制的逻辑都走了一遍,留下的那个 createTransactionIfNecessary 方法的坑,而这个方法的调用就在 TransactionInterceptor 的父类 TransactionAspectSupport 的 invokeWithinTransaction 方法中。

    // ......
    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
        // 此处会创建/获取事务
        TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

        Object retVal;
        try {
            retVal = invocation.proceedWithInvocation();
        }
        // ......

接下来 createTransactionIfNecessary 方法的逻辑就是研究的重点了,然而这个方法又是类似于前置处理的预操作。。。

1.2 createTransactionIfNecessary

不过讲道理,这段源码还蛮容易理解的:

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
        @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

    // If no name specified, apply method identification as transaction name.
    // 如果事务定义中没有name,则将方法名作为事务定义标识名
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            // 获取事务定义信息对应的事务状态
            status = tm.getTransaction(txAttr);
        } // else logger ......
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

注意这里面的一个重要的动作,是 tm.getTransaction(txAttr); ,它会根据事务定义信息,从事务管理器中获取事务的状态信息。咱前面也都学过事务控制的模型了,想必小伙伴们也记得这个动作的重要性。

1.3 tm.getTransaction

来到 DataSourceTransactionManager 的父类 AbstractPlatformTransactionManager 中,咱来研究 getTransaction 方法。不过这个方法稍微有点长,咱还是拆解开阅读。

1.3.1 doGetTransaction

好家伙,一上来就是 get → doGet 的动作,这也太直接了吧:

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
        throws TransactionException {
    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    Object transaction = doGetTransaction();
    
    // ......

OK 那咱继续往里走,由于 doGetTransaction 方法是模板方法,接下来就要到真正的落地实现 DataSourceTransactionManager 中了:

protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    ConnectionHolder conHolder =
            (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

虽然源码中都是没见过的 API ,但整体思路咱都能看明白:它会创建并返回一个 DataSourceTransactionObject ,这里面包含一个 Connection 。这里面 ConnectionHolder 的设计,跟之前在 IOC 里学过的 BeanDefinitionHolder 很相似,本质都是内部组合了一个对象而已。

那 doGetTransaction 方法执行完毕后,我们就可以简单的理解为,它返回了一个 Connection 的对象罢了。

1.3.2 【传播行为处理】1

    // ......
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(def, transaction, debugEnabled);
    }
    // ......

别看这一段只有一个 if 结构和返回,这个 handleExistingTransaction 方法可大有作为,它里面的处理逻辑很复杂,都是当外部事务已经存在时的处理逻辑。这里面的逻辑,过会咱研究到 PointService 的 addPoint 方法时再说。

1.3.3 超时检测

    // ......
    // Check definition settings for new transaction.
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }
    // ......

这部分就更简单了,自己配置的超时时间最小最小是 -1 (代表永不超时),再小那就是搞事情了。

1.3.4 【传播行为处理】2

    // ......
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHolder suspendedResources = suspend(null);
        // logger ......
        try {
            return startTransaction(def, transaction, debugEnabled, suspendedResources);
        } // catch ......
    }
    else {
        // logger ......
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

接下来的这部分,我们看到了一些传播行为的熟悉的身影。这部分的逻辑咱大概先理一下:

首先,如果方法的事务定义信息配置的传播行为是 MANDATORY ,则直接抛出异常(因为上面检测过了,当前没有事务才会走下面的分支,那既然又要强制要求需要事务,就只好抛出异常咯)。

接着,如果传播行为配置了 REQUIRED 、REQUIRES_NEW 、NESTED ,那就很简单了,直接开启一个新的事务即可(当前没事务嘛,那当然要自己造咯)。

最后的情况,虽然没有显式的说明还有哪些传播行为,但我们也能推理出来。去除上面的 4 种事务传播行为,还剩下 3 种:SUPPORTS 、NOT_SUPPORTED 、NEVER ,它们都可以在没有事务的情况下运行,那目前确实都没有事务,那也就不需要做任何处理了,所以这个 else 块中也就啥事不用干了。

实际 Debug 的时候,它落到了 else-if 块中,很容易理解嘛,当前没有事务,而 UserService 的 register 方法中事务传播行为是默认的 REQUIRED ,那当然要开启一个新的事务咯。

Dao编程原理-声明式事务的事务传播行为原理

那就继续往里走吧。

1.4 startTransaction:开启事务

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
        boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    DefaultTransactionStatus status = newTransactionStatus(
            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    // 开启事务
    doBegin(transaction, definition);
    prepareSynchronization(status, definition);
    return status;
}

在这段源码中,有一个方法格外的扎眼:doBegin ,咱一看就能感觉出来,这个方法就是开启事务的吧!

进入 doBegin 方法中,这个方法很长,小册只截取最吸引你的一小部分:

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;
    try {
        // ......
        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        // 获取到真正的Connection对象
        con = txObject.getConnectionHolder().getConnection();
        // ......

        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            // logger ......
            // 开启事务
            con.setAutoCommit(false);
        }
        // ......
}

看,它这里面的主要工作,就是从 DataSourceTransactionObject 中的 ConnectionHolder 中,提取出真正的 Connection 对象,随后执行 setAutoCommit 方法!这个方法不就是原生 jdbc 事务中的开启事务嘛!

1.5 prepareTransactionInfo

到此为止,TransactionStatus 就已经获取到了,不要忘记此时此刻执行的方法是 UserService 的 register 方法哈。

    // ......
    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            status = tm.getTransaction(txAttr);
        } // else logger ......
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

接下来要执行的是 prepareTransactionInfo 方法,而这个方法相对就简单多了:

protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
        @Nullable TransactionAttribute txAttr, String joinpointIdentification,
        @Nullable TransactionStatus status) {

    TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
    if (txAttr != null) {
        // logger ......
        txInfo.newTransactionStatus(status);
    } // else logger ......

    txInfo.bindToThread();
    return txInfo;
}

很简单,它只是创建了一个 TransactionInfo ,把事务状态信息放入其中,并将其绑定到当前线程中,仅此而已。

当 TransactionInfo 创建并返回之后,整个 createTransactionIfNecessary 方法也就执行完毕了,事务也就已经开启了。

1.6 PointService.addPoint

上面开启事务之后,接下来就可以执行 UserService 的 register 方法了,这里面又要调用 PointService 的 addPoint 方法了。

调用逻辑还是上一章的那一套,这里咱直接快进到 tm.getTransaction -> doGetTransacation 方法步骤了。

Spring Dao编程原理-声明式事务的事务传播行为原理

由于此时线程中已存在事务,所以 doGetTransacation 方法中的这个 if 分支会触发进入:

    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(def, transaction, debugEnabled);
    }

1.7 handleExistingTransaction

这个方法又是好长呀,照例咱拆解来看。

1.7.1 NEVER的处理

private TransactionStatus handleExistingTransaction(
        TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }
    // ......

对于 NEVER 的传播行为,要求线程中不能有事务,所以这里检测到就会抛出异常,非常容易理解。

1.7.2 NOT_SUPPORTED的处理

    // ......
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        // logger ......
        // 挂起当前事务
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
    // ......

对于 NOT_SUPPORTED 的处理,要求方法执行不在事务中,所以这里会把当前的事务挂起,并执行自己的方法,也是很好理解吧。

1.7.3 REQUIRES_NEW的处理

    // ......
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        // logger ......
        // 挂起当前外层事务
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            // 开启一个全新的事务
            return startTransaction(definition, transaction, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error beginEx) {
            // 还原外层事务
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }
    // ......

对于 REQUIRES_NEW 的处理,当然是开一个新的事务啦,不过在此之前它还要挂起已有的外部事务。除此之外,从源码中可以发现,如果新事务抛出异常后,会恢复之前的外部事务。

1.7.4 NESTED的处理

    // ......
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        // 检查是否允许嵌套事务
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        // logger ......
        // 判断是否支持保存点
        if (useSavepointForNestedTransaction()) {
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        } else {
            return startTransaction(definition, transaction, debugEnabled, null);
        }
    }
    // ......

NESTED 嵌套事务的处理,首先需要允许嵌套事务在程序的使用,然后还需要连接的数据库支持基于保存点的嵌套事务。如果这些条件都成立,则会创建保存点,否则会开启一个新的事务。

1.7.5 SUPPORTS&REQUIRED

    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    // logger ......
    if (isValidateExistingTransaction()) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] specifies isolation level which is incompatible with existing transaction: " +
                        (currentIsolationLevel != null ?
                                isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                "(unknown)"));
            }
        }
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] is not marked as read-only but existing transaction is");
            }
        }
    }
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

最后的这一部分,是处理 REQUIRED 和 SUPPORTS 的逻辑,这里面的处理逻辑相对就比较不痛不痒了。由于 isValidateExistingTransaction() 方法在默认情况下返回 false ,所以中间这一部分的逻辑并不会执行,那最后的处理,其实就是把现有的事务信息,包装了一下返回出去而已,并没有任何多余的动作处理。

方法返回后,接下来的处理就是我们熟悉的上一章的处理逻辑了,小册这里就不多赘述了。

2. REQUIRES_NEW研究

将 PointService 的 addPoint 方法的事务传播行为改为 REQUIRES_NEW ,重新 Debug 测试。

根据上面分析的内容,可知 REQUIRES_NEW 会在 handleExistingTransaction 方法中停下并检查:

    // ......
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        // logger ......
        // 挂起当前外层事务
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            // 开启一个全新的事务
            return startTransaction(definition, transaction, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error beginEx) {
            // 还原外层事务
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }
    // ......

这里面有几个要注意的地方,咱一起来注意一下。

2.1 新事务的创建细节

新事务在创建之前,会将外层的原事务挂起,挂起的逻辑咱来瞅一眼:(注意标有注释的源码)

protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
        try {
            Object suspendedResources = null;
            if (transaction != null) {
                // 注意此处:doSuspend
                suspendedResources = doSuspend(transaction);
            }
            String name = TransactionSynchronizationManager.getCurrentTransactionName();
            TransactionSynchronizationManager.setCurrentTransactionName(null);
            boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
            Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
            boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
            TransactionSynchronizationManager.setActualTransactionActive(false);
            return new SuspendedResourcesHolder(
                    suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
        }
        catch (RuntimeException | Error ex) {
            // doSuspend failed - original transaction is still active...
            doResumeSynchronization(suspendedSynchronizations);
            throw ex;
        }
    }
    else if (transaction != null) {
        // Transaction active but no synchronization active.
        // 注意此处:doSuspend
        Object suspendedResources = doSuspend(transaction);
        return new SuspendedResourcesHolder(suspendedResources);
    }
    else {
        // Neither transaction nor synchronization active.
        return null;
    }
}

仔细观察源码中的两行注释,小册标注的都是 doSuspend 方法,这就意味着真正挂起的动作是在 doSuspend 方法中。

来到 DataSourceTransactionManager 中,可以找到 doSuspend 方法的实现:

@Override
protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    txObject.setConnectionHolder(null);
    return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

可以发现,这个操作是把 DataSourceTransactionObject 中的 Connection 拿走了,并且也把 TransactionSynchronizationManager 中的数据源也解除了。

Debug 可以发现,解绑之后返回的,其实就是那个组合了 Connection 的 ConnectionHolder :

Spring Dao编程原理-声明式事务的事务传播行为原理

随后回到上面的 suspend 方法中,下面的一堆 TransactionSynchronizationManager 的方法我们就不看了,里面都是 ThreadLocal 的操作,注意看最后 return 的时候,它将 ConnectionHolder 包装为一个 SuspendedResourcesHolder 之后返回了出去。

2.2 开启事务的细节

注意看接下来的 startTransaction 方法的参数列表:

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
        boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    DefaultTransactionStatus status = newTransactionStatus(
            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    doBegin(transaction, definition);
    prepareSynchronization(status, definition);
    return status;
}

最后一个参数,它把 SuspendedResourcesHolder 传进来了!并且它还存入到了 TransactionStatus 中!这样做的目的,可能不用我说,聪明的你也能想到原因:当内层新事务执行完成后,清理相关的线程同步等信息时,是不是可以拿到事务状态信息,从中取出这个被挂起的事务,然后继续恢复呀

如果没有反应过来,那也没关系,下面咱立马来说这个设计的原因。

2.3 内层新事务执行完成后的细节

当事务执行完成之后,不是会执行一个 cleanupAfterCompletion 方法,清除其中的线程同步信息等等的嘛:

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    status.setCompleted();
    if (status.isNewSynchronization()) {
        TransactionSynchronizationManager.clear();
    }
    if (status.isNewTransaction()) {
        doCleanupAfterCompletion(status.getTransaction());
    }
    if (status.getSuspendedResources() != null) {
        // logger ......
        Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
        resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
    }
}

注意看最下面的 if 部分,这个动作又是恢复挂起的事务了吧!上一章这里留了一个坑,这里咱展开来看看。

protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
        throws TransactionException {

    if (resourcesHolder != null) {
        Object suspendedResources = resourcesHolder.suspendedResources;
        if (suspendedResources != null) {
            doResume(transaction, suspendedResources);
        }
        List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
        if (suspendedSynchronizations != null) {
            TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
            TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
            doResumeSynchronization(suspendedSynchronizations);
        }
    }
}

整个方法的逻辑,越看越像前面 suspend 方法的逆动作!其实还真就这么回事,上面的 doResume 方法对应 suspend 中的 doSuspend 方法,下面的 set 动作对应 suspend 中的 set null 。

至于 doResume 方法的逻辑,那就更简单了:

protected void doResume(@Nullable Object transaction, Object suspendedResources) {
    TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}

刚好又是跟上面一样,一个 bindResource ,一个 unbindResource 。

resume 方法执行完毕后,原来外层的事务又重新被绑定到线程上,相当于恢复了被挂起之前的状态,这也体现了 REQUIRES_NEW 的处理逻辑了。

3. NESTED研究

将 PointService 的 addPoint 方法的事务传播行为改为 NESTED ,重新 Debug 测试。

上面咱也看到了,NESTED 的处理逻辑还挺复杂的:

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        // 检查是否允许嵌套事务
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        // logger ......
        // 判断是否支持保存点
        if (useSavepointForNestedTransaction()) {
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        } else {
            return startTransaction(definition, transaction, debugEnabled, null);
        }
    }

很明显,处理保存点的逻辑在 status.createAndHoldSavepoint(); 中,而 createAndHoldSavepoint 方法的逻辑是相当的简单:

public void createAndHoldSavepoint() throws TransactionException {
    setSavepoint(getSavepointManager().createSavepoint());
}

就是一个创建、一个设置,完事了。想必创建和设置的动作,底层不用看小伙伴也能想到是怎么操作的吧!这里小册就展开了,感兴趣的小伙伴们可以自行深入看一看,这个逻辑还是很简单的。

【OK 到此为止,Dao 编程部分的所有内容也就都讲解完毕了。最后,咱依然是以总结和面试题收尾,帮助小伙伴快速回顾整个 Dao 章节,以及辅助面试备战。】

上一章的最后,小册留了一个 createTransactionIfNecessary 方法没有研究,很明显它的作用是创建事务。而前面我们在学习事务传播行为时,了解到嵌套 Service 的方法运行,对于内层方法来说,事务传播行为的配置不同,它对于事务的动作 / 态度也是不一样的。这一章,咱着重来研究事务的传播行为,在 SpringFramework 的底层是如何实现的。

7 种传播行为咱并不都逐个研究,而是针对几种比较常见的、以及比较复杂的传播行为来研究,其余比较简单的传播行为,咱在分析整体处理逻辑时顺道解释就好。

0. 测试代码准备

测试代码,我们可以直接拿 com.linkedbear.spring.transaction.e_spread 中的例子来测试,直接在 PointService 的 addPoint 方法,上面的 @Transactional 注解调整传播行为即可。

当然,只改这些还不够,之前在讲解事务传播行为的时候用的是 xml 的方式,这里要换用注解声明式事务,所以我们要额外编写一个配置类:

@Configuration
@ComponentScan("com.linkedbear.spring.transaction.e_spread.service")
@EnableTransactionManagement
public class TransactionSpreadConfiguration {
    
    @Bean
    public DataSource dataSource() {
        DriverManagerDataSource dataSource = new DriverManagerDataSource();
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        dataSource.setUrl("jdbc:mysql://localhost:3306/spring-dao?characterEncoding=utf8");
        dataSource.setUsername("root");
        dataSource.setPassword("123456");
        return dataSource;
    }
    
    @Bean
    public JdbcTemplate jdbcTemplate() {
        return new JdbcTemplate(dataSource());
    }
    
    @Bean
    public TransactionManager transactionManager() {
        return new DataSourceTransactionManager(dataSource());
    }
}

以及对应的测试启动类:

public class TransactionSpreadSourceApplication {
    
    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(TransactionSpreadConfiguration.class);
        UserService userService = ctx.getBean(UserService.class);
        userService.register();
    }
}

这样测试代码就算全部准备好了。

1. REQUIRED研究

首先咱来测试默认的 REQUIRED 传播,前面咱已经学过了传播行为的效果,当外层已经有事务时,REQUIRED 行为会直接加入到当前事务中。

1.1 Debug至invokeWithinTransaction

上一章我们已经把整个事务控制的逻辑都走了一遍,留下的那个 createTransactionIfNecessary 方法的坑,而这个方法的调用就在 TransactionInterceptor 的父类 TransactionAspectSupport 的 invokeWithinTransaction 方法中。

    // ......
    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
        // 此处会创建/获取事务
        TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

        Object retVal;
        try {
            retVal = invocation.proceedWithInvocation();
        }
        // ......

接下来 createTransactionIfNecessary 方法的逻辑就是研究的重点了,然而这个方法又是类似于前置处理的预操作。。。

1.2 createTransactionIfNecessary

不过讲道理,这段源码还蛮容易理解的:

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
        @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

    // If no name specified, apply method identification as transaction name.
    // 如果事务定义中没有name,则将方法名作为事务定义标识名
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            // 获取事务定义信息对应的事务状态
            status = tm.getTransaction(txAttr);
        } // else logger ......
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

注意这里面的一个重要的动作,是 tm.getTransaction(txAttr); ,它会根据事务定义信息,从事务管理器中获取事务的状态信息。咱前面也都学过事务控制的模型了,想必小伙伴们也记得这个动作的重要性。

1.3 tm.getTransaction

来到 DataSourceTransactionManager 的父类 AbstractPlatformTransactionManager 中,咱来研究 getTransaction 方法。不过这个方法稍微有点长,咱还是拆解开阅读。

1.3.1 doGetTransaction

好家伙,一上来就是 get → doGet 的动作,这也太直接了吧:

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
        throws TransactionException {
    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    Object transaction = doGetTransaction();
    
    // ......

OK 那咱继续往里走,由于 doGetTransaction 方法是模板方法,接下来就要到真正的落地实现 DataSourceTransactionManager 中了:

protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    ConnectionHolder conHolder =
            (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

虽然源码中都是没见过的 API ,但整体思路咱都能看明白:它会创建并返回一个 DataSourceTransactionObject ,这里面包含一个 Connection 。这里面 ConnectionHolder 的设计,跟之前在 IOC 里学过的 BeanDefinitionHolder 很相似,本质都是内部组合了一个对象而已。

那 doGetTransaction 方法执行完毕后,我们就可以简单的理解为,它返回了一个 Connection 的对象罢了。

1.3.2 【传播行为处理】1

    // ......
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(def, transaction, debugEnabled);
    }
    // ......

别看这一段只有一个 if 结构和返回,这个 handleExistingTransaction 方法可大有作为,它里面的处理逻辑很复杂,都是当外部事务已经存在时的处理逻辑。这里面的逻辑,过会咱研究到 PointService 的 addPoint 方法时再说。

1.3.3 超时检测

    // ......
    // Check definition settings for new transaction.
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }
    // ......

这部分就更简单了,自己配置的超时时间最小最小是 -1 (代表永不超时),再小那就是搞事情了。

1.3.4 【传播行为处理】2

    // ......
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHolder suspendedResources = suspend(null);
        // logger ......
        try {
            return startTransaction(def, transaction, debugEnabled, suspendedResources);
        } // catch ......
    }
    else {
        // logger ......
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

接下来的这部分,我们看到了一些传播行为的熟悉的身影。这部分的逻辑咱大概先理一下:

首先,如果方法的事务定义信息配置的传播行为是 MANDATORY ,则直接抛出异常(因为上面检测过了,当前没有事务才会走下面的分支,那既然又要强制要求需要事务,就只好抛出异常咯)。

接着,如果传播行为配置了 REQUIRED 、REQUIRES_NEW 、NESTED ,那就很简单了,直接开启一个新的事务即可(当前没事务嘛,那当然要自己造咯)。

最后的情况,虽然没有显式的说明还有哪些传播行为,但我们也能推理出来。去除上面的 4 种事务传播行为,还剩下 3 种:SUPPORTS 、NOT_SUPPORTED 、NEVER ,它们都可以在没有事务的情况下运行,那目前确实都没有事务,那也就不需要做任何处理了,所以这个 else 块中也就啥事不用干了。

实际 Debug 的时候,它落到了 else-if 块中,很容易理解嘛,当前没有事务,而 UserService 的 register 方法中事务传播行为是默认的 REQUIRED ,那当然要开启一个新的事务咯。

Dao编程原理-声明式事务的事务传播行为原理

那就继续往里走吧。

1.4 startTransaction:开启事务

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
        boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    DefaultTransactionStatus status = newTransactionStatus(
            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    // 开启事务
    doBegin(transaction, definition);
    prepareSynchronization(status, definition);
    return status;
}

在这段源码中,有一个方法格外的扎眼:doBegin ,咱一看就能感觉出来,这个方法就是开启事务的吧!

进入 doBegin 方法中,这个方法很长,小册只截取最吸引你的一小部分:

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;
    try {
        // ......
        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        // 获取到真正的Connection对象
        con = txObject.getConnectionHolder().getConnection();
        // ......

        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            // logger ......
            // 开启事务
            con.setAutoCommit(false);
        }
        // ......
}

看,它这里面的主要工作,就是从 DataSourceTransactionObject 中的 ConnectionHolder 中,提取出真正的 Connection 对象,随后执行 setAutoCommit 方法!这个方法不就是原生 jdbc 事务中的开启事务嘛!

1.5 prepareTransactionInfo

到此为止,TransactionStatus 就已经获取到了,不要忘记此时此刻执行的方法是 UserService 的 register 方法哈。

    // ......
    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            status = tm.getTransaction(txAttr);
        } // else logger ......
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

接下来要执行的是 prepareTransactionInfo 方法,而这个方法相对就简单多了:

protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
        @Nullable TransactionAttribute txAttr, String joinpointIdentification,
        @Nullable TransactionStatus status) {

    TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
    if (txAttr != null) {
        // logger ......
        txInfo.newTransactionStatus(status);
    } // else logger ......

    txInfo.bindToThread();
    return txInfo;
}

很简单,它只是创建了一个 TransactionInfo ,把事务状态信息放入其中,并将其绑定到当前线程中,仅此而已。

当 TransactionInfo 创建并返回之后,整个 createTransactionIfNecessary 方法也就执行完毕了,事务也就已经开启了。

1.6 PointService.addPoint

上面开启事务之后,接下来就可以执行 UserService 的 register 方法了,这里面又要调用 PointService 的 addPoint 方法了。

调用逻辑还是上一章的那一套,这里咱直接快进到 tm.getTransaction -> doGetTransacation 方法步骤了。

Dao编程原理-声明式事务的事务传播行为原理

由于此时线程中已存在事务,所以 doGetTransacation 方法中的这个 if 分支会触发进入:

    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(def, transaction, debugEnabled);
    }

1.7 handleExistingTransaction

这个方法又是好长呀,照例咱拆解来看。

1.7.1 NEVER的处理

private TransactionStatus handleExistingTransaction(
        TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }
    // ......

对于 NEVER 的传播行为,要求线程中不能有事务,所以这里检测到就会抛出异常,非常容易理解。

1.7.2 NOT_SUPPORTED的处理

    // ......
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        // logger ......
        // 挂起当前事务
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
    // ......

对于 NOT_SUPPORTED 的处理,要求方法执行不在事务中,所以这里会把当前的事务挂起,并执行自己的方法,也是很好理解吧。

1.7.3 REQUIRES_NEW的处理

    // ......
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        // logger ......
        // 挂起当前外层事务
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            // 开启一个全新的事务
            return startTransaction(definition, transaction, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error beginEx) {
            // 还原外层事务
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }
    // ......

对于 REQUIRES_NEW 的处理,当然是开一个新的事务啦,不过在此之前它还要挂起已有的外部事务。除此之外,从源码中可以发现,如果新事务抛出异常后,会恢复之前的外部事务。

1.7.4 NESTED的处理

    // ......
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        // 检查是否允许嵌套事务
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        // logger ......
        // 判断是否支持保存点
        if (useSavepointForNestedTransaction()) {
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        } else {
            return startTransaction(definition, transaction, debugEnabled, null);
        }
    }
    // ......

NESTED 嵌套事务的处理,首先需要允许嵌套事务在程序的使用,然后还需要连接的数据库支持基于保存点的嵌套事务。如果这些条件都成立,则会创建保存点,否则会开启一个新的事务。

1.7.5 SUPPORTS&REQUIRED

    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    // logger ......
    if (isValidateExistingTransaction()) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] specifies isolation level which is incompatible with existing transaction: " +
                        (currentIsolationLevel != null ?
                                isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                "(unknown)"));
            }
        }
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] is not marked as read-only but existing transaction is");
            }
        }
    }
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

最后的这一部分,是处理 REQUIRED 和 SUPPORTS 的逻辑,这里面的处理逻辑相对就比较不痛不痒了。由于 isValidateExistingTransaction() 方法在默认情况下返回 false ,所以中间这一部分的逻辑并不会执行,那最后的处理,其实就是把现有的事务信息,包装了一下返回出去而已,并没有任何多余的动作处理。

方法返回后,接下来的处理就是我们熟悉的上一章的处理逻辑了,小册这里就不多赘述了。

2. REQUIRES_NEW研究

将 PointService 的 addPoint 方法的事务传播行为改为 REQUIRES_NEW ,重新 Debug 测试。

根据上面分析的内容,可知 REQUIRES_NEW 会在 handleExistingTransaction 方法中停下并检查:

    // ......
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        // logger ......
        // 挂起当前外层事务
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            // 开启一个全新的事务
            return startTransaction(definition, transaction, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error beginEx) {
            // 还原外层事务
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }
    // ......

这里面有几个要注意的地方,咱一起来注意一下。

2.1 新事务的创建细节

新事务在创建之前,会将外层的原事务挂起,挂起的逻辑咱来瞅一眼:(注意标有注释的源码)

protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
        try {
            Object suspendedResources = null;
            if (transaction != null) {
                // 注意此处:doSuspend
                suspendedResources = doSuspend(transaction);
            }
            String name = TransactionSynchronizationManager.getCurrentTransactionName();
            TransactionSynchronizationManager.setCurrentTransactionName(null);
            boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
            Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
            boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
            TransactionSynchronizationManager.setActualTransactionActive(false);
            return new SuspendedResourcesHolder(
                    suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
        }
        catch (RuntimeException | Error ex) {
            // doSuspend failed - original transaction is still active...
            doResumeSynchronization(suspendedSynchronizations);
            throw ex;
        }
    }
    else if (transaction != null) {
        // Transaction active but no synchronization active.
        // 注意此处:doSuspend
        Object suspendedResources = doSuspend(transaction);
        return new SuspendedResourcesHolder(suspendedResources);
    }
    else {
        // Neither transaction nor synchronization active.
        return null;
    }
}

仔细观察源码中的两行注释,小册标注的都是 doSuspend 方法,这就意味着真正挂起的动作是在 doSuspend 方法中。

来到 DataSourceTransactionManager 中,可以找到 doSuspend 方法的实现:

@Override
protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    txObject.setConnectionHolder(null);
    return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

可以发现,这个操作是把 DataSourceTransactionObject 中的 Connection 拿走了,并且也把 TransactionSynchronizationManager 中的数据源也解除了。

Debug 可以发现,解绑之后返回的,其实就是那个组合了 Connection 的 ConnectionHolder :

Spring Dao编程原理-声明式事务的事务传播行为原理

随后回到上面的 suspend 方法中,下面的一堆 TransactionSynchronizationManager 的方法我们就不看了,里面都是 ThreadLocal 的操作,注意看最后 return 的时候,它将 ConnectionHolder 包装为一个 SuspendedResourcesHolder 之后返回了出去。

2.2 开启事务的细节

注意看接下来的 startTransaction 方法的参数列表:

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
        boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    DefaultTransactionStatus status = newTransactionStatus(
            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    doBegin(transaction, definition);
    prepareSynchronization(status, definition);
    return status;
}

最后一个参数,它把 SuspendedResourcesHolder 传进来了!并且它还存入到了 TransactionStatus 中!这样做的目的,可能不用我说,聪明的你也能想到原因:当内层新事务执行完成后,清理相关的线程同步等信息时,是不是可以拿到事务状态信息,从中取出这个被挂起的事务,然后继续恢复呀

如果没有反应过来,那也没关系,下面咱立马来说这个设计的原因。

2.3 内层新事务执行完成后的细节

当事务执行完成之后,不是会执行一个 cleanupAfterCompletion 方法,清除其中的线程同步信息等等的嘛:

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    status.setCompleted();
    if (status.isNewSynchronization()) {
        TransactionSynchronizationManager.clear();
    }
    if (status.isNewTransaction()) {
        doCleanupAfterCompletion(status.getTransaction());
    }
    if (status.getSuspendedResources() != null) {
        // logger ......
        Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
        resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
    }
}

注意看最下面的 if 部分,这个动作又是恢复挂起的事务了吧!上一章这里留了一个坑,这里咱展开来看看。

protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
        throws TransactionException {

    if (resourcesHolder != null) {
        Object suspendedResources = resourcesHolder.suspendedResources;
        if (suspendedResources != null) {
            doResume(transaction, suspendedResources);
        }
        List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
        if (suspendedSynchronizations != null) {
            TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
            TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
            doResumeSynchronization(suspendedSynchronizations);
        }
    }
}

整个方法的逻辑,越看越像前面 suspend 方法的逆动作!其实还真就这么回事,上面的 doResume 方法对应 suspend 中的 doSuspend 方法,下面的 set 动作对应 suspend 中的 set null 。

至于 doResume 方法的逻辑,那就更简单了:

protected void doResume(@Nullable Object transaction, Object suspendedResources) {
    TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}

刚好又是跟上面一样,一个 bindResource ,一个 unbindResource 。

resume 方法执行完毕后,原来外层的事务又重新被绑定到线程上,相当于恢复了被挂起之前的状态,这也体现了 REQUIRES_NEW 的处理逻辑了。

3. NESTED研究

将 PointService 的 addPoint 方法的事务传播行为改为 NESTED ,重新 Debug 测试。

上面咱也看到了,NESTED 的处理逻辑还挺复杂的:

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        // 检查是否允许嵌套事务
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        // logger ......
        // 判断是否支持保存点
        if (useSavepointForNestedTransaction()) {
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        } else {
            return startTransaction(definition, transaction, debugEnabled, null);
        }
    }

很明显,处理保存点的逻辑在 status.createAndHoldSavepoint(); 中,而 createAndHoldSavepoint 方法的逻辑是相当的简单:

public void createAndHoldSavepoint() throws TransactionException {
    setSavepoint(getSavepointManager().createSavepoint());
}

就是一个创建、一个设置,完事了。想必创建和设置的动作,底层不用看小伙伴也能想到是怎么操作的吧!这里小册就展开了,感兴趣的小伙伴们可以自行深入看一看,这个逻辑还是很简单的。

【OK 到此为止,Dao 编程部分的所有内容也就都讲解完毕了。最后,咱依然是以总结和面试题收尾,帮助小伙伴快速回顾整个 Dao 章节,以及辅助面试备战。】

上一章的最后,小册留了一个 createTransactionIfNecessary 方法没有研究,很明显它的作用是创建事务。而前面我们在学习事务传播行为时,了解到嵌套 Service 的方法运行,对于内层方法来说,事务传播行为的配置不同,它对于事务的动作 / 态度也是不一样的。这一章,咱着重来研究事务的传播行为,在 SpringFramework 的底层是如何实现的。

7 种传播行为咱并不都逐个研究,而是针对几种比较常见的、以及比较复杂的传播行为来研究,其余比较简单的传播行为,咱在分析整体处理逻辑时顺道解释就好。

0. 测试代码准备

测试代码,我们可以直接拿 com.linkedbear.spring.transaction.e_spread 中的例子来测试,直接在 PointService 的 addPoint 方法,上面的 @Transactional 注解调整传播行为即可。

当然,只改这些还不够,之前在讲解事务传播行为的时候用的是 xml 的方式,这里要换用注解声明式事务,所以我们要额外编写一个配置类:

@Configuration
@ComponentScan("com.linkedbear.spring.transaction.e_spread.service")
@EnableTransactionManagement
public class TransactionSpreadConfiguration {
    
    @Bean
    public DataSource dataSource() {
        DriverManagerDataSource dataSource = new DriverManagerDataSource();
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        dataSource.setUrl("jdbc:mysql://localhost:3306/spring-dao?characterEncoding=utf8");
        dataSource.setUsername("root");
        dataSource.setPassword("123456");
        return dataSource;
    }
    
    @Bean
    public JdbcTemplate jdbcTemplate() {
        return new JdbcTemplate(dataSource());
    }
    
    @Bean
    public TransactionManager transactionManager() {
        return new DataSourceTransactionManager(dataSource());
    }
}

以及对应的测试启动类:

public class TransactionSpreadSourceApplication {
    
    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(TransactionSpreadConfiguration.class);
        UserService userService = ctx.getBean(UserService.class);
        userService.register();
    }
}

这样测试代码就算全部准备好了。

1. REQUIRED研究

首先咱来测试默认的 REQUIRED 传播,前面咱已经学过了传播行为的效果,当外层已经有事务时,REQUIRED 行为会直接加入到当前事务中。

1.1 Debug至invokeWithinTransaction

上一章我们已经把整个事务控制的逻辑都走了一遍,留下的那个 createTransactionIfNecessary 方法的坑,而这个方法的调用就在 TransactionInterceptor 的父类 TransactionAspectSupport 的 invokeWithinTransaction 方法中。

    // ......
    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
        // 此处会创建/获取事务
        TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

        Object retVal;
        try {
            retVal = invocation.proceedWithInvocation();
        }
        // ......

接下来 createTransactionIfNecessary 方法的逻辑就是研究的重点了,然而这个方法又是类似于前置处理的预操作。。。

1.2 createTransactionIfNecessary

不过讲道理,这段源码还蛮容易理解的:

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
        @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

    // If no name specified, apply method identification as transaction name.
    // 如果事务定义中没有name,则将方法名作为事务定义标识名
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            // 获取事务定义信息对应的事务状态
            status = tm.getTransaction(txAttr);
        } // else logger ......
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

注意这里面的一个重要的动作,是 tm.getTransaction(txAttr); ,它会根据事务定义信息,从事务管理器中获取事务的状态信息。咱前面也都学过事务控制的模型了,想必小伙伴们也记得这个动作的重要性。

1.3 tm.getTransaction

来到 DataSourceTransactionManager 的父类 AbstractPlatformTransactionManager 中,咱来研究 getTransaction 方法。不过这个方法稍微有点长,咱还是拆解开阅读。

1.3.1 doGetTransaction

好家伙,一上来就是 get → doGet 的动作,这也太直接了吧:

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
        throws TransactionException {
    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    Object transaction = doGetTransaction();
    
    // ......

OK 那咱继续往里走,由于 doGetTransaction 方法是模板方法,接下来就要到真正的落地实现 DataSourceTransactionManager 中了:

protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    ConnectionHolder conHolder =
            (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

虽然源码中都是没见过的 API ,但整体思路咱都能看明白:它会创建并返回一个 DataSourceTransactionObject ,这里面包含一个 Connection 。这里面 ConnectionHolder 的设计,跟之前在 IOC 里学过的 BeanDefinitionHolder 很相似,本质都是内部组合了一个对象而已。

那 doGetTransaction 方法执行完毕后,我们就可以简单的理解为,它返回了一个 Connection 的对象罢了。

1.3.2 【传播行为处理】1

    // ......
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(def, transaction, debugEnabled);
    }
    // ......

别看这一段只有一个 if 结构和返回,这个 handleExistingTransaction 方法可大有作为,它里面的处理逻辑很复杂,都是当外部事务已经存在时的处理逻辑。这里面的逻辑,过会咱研究到 PointService 的 addPoint 方法时再说。

1.3.3 超时检测

    // ......
    // Check definition settings for new transaction.
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }
    // ......

这部分就更简单了,自己配置的超时时间最小最小是 -1 (代表永不超时),再小那就是搞事情了。

1.3.4 【传播行为处理】2

    // ......
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHolder suspendedResources = suspend(null);
        // logger ......
        try {
            return startTransaction(def, transaction, debugEnabled, suspendedResources);
        } // catch ......
    }
    else {
        // logger ......
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

接下来的这部分,我们看到了一些传播行为的熟悉的身影。这部分的逻辑咱大概先理一下:

首先,如果方法的事务定义信息配置的传播行为是 MANDATORY ,则直接抛出异常(因为上面检测过了,当前没有事务才会走下面的分支,那既然又要强制要求需要事务,就只好抛出异常咯)。

接着,如果传播行为配置了 REQUIRED 、REQUIRES_NEW 、NESTED ,那就很简单了,直接开启一个新的事务即可(当前没事务嘛,那当然要自己造咯)。

最后的情况,虽然没有显式的说明还有哪些传播行为,但我们也能推理出来。去除上面的 4 种事务传播行为,还剩下 3 种:SUPPORTS 、NOT_SUPPORTED 、NEVER ,它们都可以在没有事务的情况下运行,那目前确实都没有事务,那也就不需要做任何处理了,所以这个 else 块中也就啥事不用干了。

实际 Debug 的时候,它落到了 else-if 块中,很容易理解嘛,当前没有事务,而 UserService 的 register 方法中事务传播行为是默认的 REQUIRED ,那当然要开启一个新的事务咯。

Dao编程原理-声明式事务的事务传播行为原理

那就继续往里走吧。

1.4 startTransaction:开启事务

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
        boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    DefaultTransactionStatus status = newTransactionStatus(
            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    // 开启事务
    doBegin(transaction, definition);
    prepareSynchronization(status, definition);
    return status;
}

在这段源码中,有一个方法格外的扎眼:doBegin ,咱一看就能感觉出来,这个方法就是开启事务的吧!

进入 doBegin 方法中,这个方法很长,小册只截取最吸引你的一小部分:

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;
    try {
        // ......
        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        // 获取到真正的Connection对象
        con = txObject.getConnectionHolder().getConnection();
        // ......

        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            // logger ......
            // 开启事务
            con.setAutoCommit(false);
        }
        // ......
}

看,它这里面的主要工作,就是从 DataSourceTransactionObject 中的 ConnectionHolder 中,提取出真正的 Connection 对象,随后执行 setAutoCommit 方法!这个方法不就是原生 jdbc 事务中的开启事务嘛!

1.5 prepareTransactionInfo

到此为止,TransactionStatus 就已经获取到了,不要忘记此时此刻执行的方法是 UserService 的 register 方法哈。

    // ......
    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            status = tm.getTransaction(txAttr);
        } // else logger ......
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

接下来要执行的是 prepareTransactionInfo 方法,而这个方法相对就简单多了:

protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
        @Nullable TransactionAttribute txAttr, String joinpointIdentification,
        @Nullable TransactionStatus status) {

    TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
    if (txAttr != null) {
        // logger ......
        txInfo.newTransactionStatus(status);
    } // else logger ......

    txInfo.bindToThread();
    return txInfo;
}

很简单,它只是创建了一个 TransactionInfo ,把事务状态信息放入其中,并将其绑定到当前线程中,仅此而已。

当 TransactionInfo 创建并返回之后,整个 createTransactionIfNecessary 方法也就执行完毕了,事务也就已经开启了。

1.6 PointService.addPoint

上面开启事务之后,接下来就可以执行 UserService 的 register 方法了,这里面又要调用 PointService 的 addPoint 方法了。

调用逻辑还是上一章的那一套,这里咱直接快进到 tm.getTransaction -> doGetTransacation 方法步骤了。

Dao编程原理-声明式事务的事务传播行为原理

由于此时线程中已存在事务,所以 doGetTransacation 方法中的这个 if 分支会触发进入:

    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(def, transaction, debugEnabled);
    }

1.7 handleExistingTransaction

这个方法又是好长呀,照例咱拆解来看。

1.7.1 NEVER的处理

private TransactionStatus handleExistingTransaction(
        TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }
    // ......

对于 NEVER 的传播行为,要求线程中不能有事务,所以这里检测到就会抛出异常,非常容易理解。

1.7.2 NOT_SUPPORTED的处理

    // ......
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        // logger ......
        // 挂起当前事务
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
    // ......

对于 NOT_SUPPORTED 的处理,要求方法执行不在事务中,所以这里会把当前的事务挂起,并执行自己的方法,也是很好理解吧。

1.7.3 REQUIRES_NEW的处理

    // ......
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        // logger ......
        // 挂起当前外层事务
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            // 开启一个全新的事务
            return startTransaction(definition, transaction, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error beginEx) {
            // 还原外层事务
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }
    // ......

对于 REQUIRES_NEW 的处理,当然是开一个新的事务啦,不过在此之前它还要挂起已有的外部事务。除此之外,从源码中可以发现,如果新事务抛出异常后,会恢复之前的外部事务。

1.7.4 NESTED的处理

    // ......
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        // 检查是否允许嵌套事务
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        // logger ......
        // 判断是否支持保存点
        if (useSavepointForNestedTransaction()) {
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        } else {
            return startTransaction(definition, transaction, debugEnabled, null);
        }
    }
    // ......

NESTED 嵌套事务的处理,首先需要允许嵌套事务在程序的使用,然后还需要连接的数据库支持基于保存点的嵌套事务。如果这些条件都成立,则会创建保存点,否则会开启一个新的事务。

1.7.5 SUPPORTS&REQUIRED

    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    // logger ......
    if (isValidateExistingTransaction()) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] specifies isolation level which is incompatible with existing transaction: " +
                        (currentIsolationLevel != null ?
                                isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                "(unknown)"));
            }
        }
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] is not marked as read-only but existing transaction is");
            }
        }
    }
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

最后的这一部分,是处理 REQUIRED 和 SUPPORTS 的逻辑,这里面的处理逻辑相对就比较不痛不痒了。由于 isValidateExistingTransaction() 方法在默认情况下返回 false ,所以中间这一部分的逻辑并不会执行,那最后的处理,其实就是把现有的事务信息,包装了一下返回出去而已,并没有任何多余的动作处理。

方法返回后,接下来的处理就是我们熟悉的上一章的处理逻辑了,小册这里就不多赘述了。

2. REQUIRES_NEW研究

将 PointService 的 addPoint 方法的事务传播行为改为 REQUIRES_NEW ,重新 Debug 测试。

根据上面分析的内容,可知 REQUIRES_NEW 会在 handleExistingTransaction 方法中停下并检查:

    // ......
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        // logger ......
        // 挂起当前外层事务
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            // 开启一个全新的事务
            return startTransaction(definition, transaction, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error beginEx) {
            // 还原外层事务
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }
    // ......

这里面有几个要注意的地方,咱一起来注意一下。

2.1 新事务的创建细节

新事务在创建之前,会将外层的原事务挂起,挂起的逻辑咱来瞅一眼:(注意标有注释的源码)

protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
        try {
            Object suspendedResources = null;
            if (transaction != null) {
                // 注意此处:doSuspend
                suspendedResources = doSuspend(transaction);
            }
            String name = TransactionSynchronizationManager.getCurrentTransactionName();
            TransactionSynchronizationManager.setCurrentTransactionName(null);
            boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
            Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
            boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
            TransactionSynchronizationManager.setActualTransactionActive(false);
            return new SuspendedResourcesHolder(
                    suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
        }
        catch (RuntimeException | Error ex) {
            // doSuspend failed - original transaction is still active...
            doResumeSynchronization(suspendedSynchronizations);
            throw ex;
        }
    }
    else if (transaction != null) {
        // Transaction active but no synchronization active.
        // 注意此处:doSuspend
        Object suspendedResources = doSuspend(transaction);
        return new SuspendedResourcesHolder(suspendedResources);
    }
    else {
        // Neither transaction nor synchronization active.
        return null;
    }
}

仔细观察源码中的两行注释,小册标注的都是 doSuspend 方法,这就意味着真正挂起的动作是在 doSuspend 方法中。

来到 DataSourceTransactionManager 中,可以找到 doSuspend 方法的实现:

@Override
protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    txObject.setConnectionHolder(null);
    return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

可以发现,这个操作是把 DataSourceTransactionObject 中的 Connection 拿走了,并且也把 TransactionSynchronizationManager 中的数据源也解除了。

Debug 可以发现,解绑之后返回的,其实就是那个组合了 Connection 的 ConnectionHolder :

Dao编程原理-声明式事务的事务传播行为原理

随后回到上面的 suspend 方法中,下面的一堆 TransactionSynchronizationManager 的方法我们就不看了,里面都是 ThreadLocal 的操作,注意看最后 return 的时候,它将 ConnectionHolder 包装为一个 SuspendedResourcesHolder 之后返回了出去。

2.2 开启事务的细节

注意看接下来的 startTransaction 方法的参数列表:

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
        boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    DefaultTransactionStatus status = newTransactionStatus(
            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    doBegin(transaction, definition);
    prepareSynchronization(status, definition);
    return status;
}

最后一个参数,它把 SuspendedResourcesHolder 传进来了!并且它还存入到了 TransactionStatus 中!这样做的目的,可能不用我说,聪明的你也能想到原因:当内层新事务执行完成后,清理相关的线程同步等信息时,是不是可以拿到事务状态信息,从中取出这个被挂起的事务,然后继续恢复呀

如果没有反应过来,那也没关系,下面咱立马来说这个设计的原因。

2.3 内层新事务执行完成后的细节

当事务执行完成之后,不是会执行一个 cleanupAfterCompletion 方法,清除其中的线程同步信息等等的嘛:

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    status.setCompleted();
    if (status.isNewSynchronization()) {
        TransactionSynchronizationManager.clear();
    }
    if (status.isNewTransaction()) {
        doCleanupAfterCompletion(status.getTransaction());
    }
    if (status.getSuspendedResources() != null) {
        // logger ......
        Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
        resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
    }
}

注意看最下面的 if 部分,这个动作又是恢复挂起的事务了吧!上一章这里留了一个坑,这里咱展开来看看。

protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
        throws TransactionException {

    if (resourcesHolder != null) {
        Object suspendedResources = resourcesHolder.suspendedResources;
        if (suspendedResources != null) {
            doResume(transaction, suspendedResources);
        }
        List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
        if (suspendedSynchronizations != null) {
            TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
            TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
            doResumeSynchronization(suspendedSynchronizations);
        }
    }
}

整个方法的逻辑,越看越像前面 suspend 方法的逆动作!其实还真就这么回事,上面的 doResume 方法对应 suspend 中的 doSuspend 方法,下面的 set 动作对应 suspend 中的 set null 。

至于 doResume 方法的逻辑,那就更简单了:

protected void doResume(@Nullable Object transaction, Object suspendedResources) {
    TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}

刚好又是跟上面一样,一个 bindResource ,一个 unbindResource 。

resume 方法执行完毕后,原来外层的事务又重新被绑定到线程上,相当于恢复了被挂起之前的状态,这也体现了 REQUIRES_NEW 的处理逻辑了。

3. NESTED研究

将 PointService 的 addPoint 方法的事务传播行为改为 NESTED ,重新 Debug 测试。

上面咱也看到了,NESTED 的处理逻辑还挺复杂的:

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        // 检查是否允许嵌套事务
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        // logger ......
        // 判断是否支持保存点
        if (useSavepointForNestedTransaction()) {
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        } else {
            return startTransaction(definition, transaction, debugEnabled, null);
        }
    }

很明显,处理保存点的逻辑在 status.createAndHoldSavepoint(); 中,而 createAndHoldSavepoint 方法的逻辑是相当的简单:

public void createAndHoldSavepoint() throws TransactionException {
    setSavepoint(getSavepointManager().createSavepoint());
}

就是一个创建、一个设置,完事了。想必创建和设置的动作,底层不用看小伙伴也能想到是怎么操作的吧!这里小册就展开了,感兴趣的小伙伴们可以自行深入看一看,这个逻辑还是很简单的。

【OK 到此为止,Dao 编程部分的所有内容也就都讲解完毕了。最后,咱依然是以总结和面试题收尾,帮助小伙伴快速回顾整个 Dao 章节,以及辅助面试备战。】

 

免责声明:
1.本站所有内容由本站原创、网络转载、消息撰写、网友投稿等几部分组成。
2.本站原创文字内容若未经特别声明,则遵循协议CC3.0共享协议,转载请务必注明原文链接。
3.本站部分来源于网络转载的文章信息是出于传递更多信息之目的,不意味着赞同其观点。
4.本站所有源码与软件均为原作者提供,仅供学习和研究使用。
5.如您对本网站的相关版权有任何异议,或者认为侵犯了您的合法权益,请及时通知我们处理。
火焰兔 » Spring Dao编程原理-声明式事务的事务传播行为原理