Spring多线程事务一致性

原文链接

业务场景

  • 希望将步骤 1 和步骤 2 并行执行,

  • 然后确保步骤 1 和步骤 2 执行成功后,再执行步骤 3,

  • 等到步骤 3 执行完毕后,再提交全部事务

1
2
3
4
5
6
7
8
public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
//1.查询出当前资源模块下所有资源,查询出来后进行删除
deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService);
//2.查询出当前资源模块下所有子模块,递归查询,当删除完所有子模块下的资源后,再删除所有子模块,最终删除当前资源模块
deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService);
//3.删除当前资源模块
removeById(authorityModuleId);
}

解决异步执行

@Async

Spring 提供的异步执行任务能力并不足以解决当前的需求。

@Async 注解原理简单来说,就是扫描 IOC 中的 bean,给方法上标注有 @Async 注解的 bean 进行代理,代理的核心是添加一个 MethodInterceptor,即AsyncExecutionInterceptor。该方法拦截器负责将方法真正的执行包装为任务,放入线程池中执行。

CompletableFuture

1
2
3
4
5
6
7
8
9
10
11
public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
CompletableFuture.runAsync(() - > {
//两个并行执行的任务
CompletableFuture < Void > future1 = CompletableFuture.runAsync(() - >
deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor);
CompletableFuture < Void > future2 = CompletableFuture.runAsync(() - >
deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor);
//等待两个并行任务执行完后,再执行最后一个步骤
CompletableFuture.allOf(future1, future2).thenRun(() - > removeById(authorityModuleId));
}, executor);
}

多线程事务一致性

事务管理大体分为三个流程: 事务创建、事务执行、事务结束。

事务创建涉及到一些属性的配置,例如:

  • 事务的隔离级别

  • 事务的传播行为

  • 事务的超时时间

  • 是否为只读事务

由于涉及属性颇多,并且后期还有可能进行扩展,因此必须通过一个类来封装这些属性,在 Spring 中对应 TransactionDefinition。

有了事务相关属性定义后,我们就可以利用 TransactionDefinition 来创建一个事务了。

在 Spring 中局部事务由 PlatformTransactionManager 负责管理,创建事务也是由 PlatformTransactionManager 负责提供。

如果我们希望追踪事务的状态,例如事务已完成,事务回滚等,那么就需要一个事务状态类贯穿当前事务的执行流程,在 Spring 中由 TransactionStatus 负责完成。

1
2
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;

对于常见的数据源而言,通常需要记录的事务状态有如下几点:

  • 当前事务是否是新事务

  • 当前事务是否结束

  • 当前事务是否需要回滚(通过标记来判断,因此我也可以在业务流程中手动设置标记为 true 来让事务在没有发生异常的情况下进行回滚)

  • 当前事务是否设置了回滚点(savePoint)

事务的执行过程就是具体业务代码的执行流程。

事务的结束分为两种情况:需要进行事务回滚或者事务正常提交,如果是事务回滚,还需要判断 TransactionStatus 中的 savePoint 是否被设置了。

声明式事务

声明式事务就是使用我们常见的 @Transactional 注解完成的。声明式事务优点就在于让事务代码与业务代码解耦,通过 Spring 中提供的声明式事务使用,我们也可以发觉我们只需要编写业务代码即可。而事务的管理基本不需要我们操心,Spring 帮我们自动完成了。

之所以那么神奇,本质还是依靠 Spring 框架提供的 Bean 生命周期相关回调接口和 AOP 结合完成的,简述如下:

  • 通过自动代理创建器依次尝试为每个放入容器中的 bean 尝试进行代理;

  • 尝试进行代理的过程对于事务管理来说,就是利用事务管理涉及到的增强器 advisor,即 TransactionAttributeSourceAdvisor;

  • 判断当前增强器是否能够应用与当前 bean 上,怎么判断呢? 当然是 advisor 内部的 pointCut;

  • 如果能够应用,那么好,为当前 bean 创建代理对象返回,并且往代理对象内部添加一个 TransactionInterceptor 拦截器。

  • 此时我们再从容器中获取,拿到的就是代理对象了,当我们调用代理对象的方法时,首先要经过代理对象内部拦截器链的处理,处理完后,最终才会调用被代理对象的方法(这里其实就是责任链模式的应用)。

对于被事务增强器 TransactionAttributeSourceAdvisor 代理的 bean 而言,代理对象内部会存在一个 TransactionInterceptor,该拦截器内部构造了一个事务执行的模板流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
protected Object invokeWithinTransaction(Method method, @Nullable Class << ? > targetClass,
final InvocationCallback invocation) throws Throwable {
//TransactionAttributeSource内部保存着当前类某个方法对应的TransactionAttribute---事务属性源
//可以看做是一个存放TransactionAttribute与method方法映射的池子
TransactionAttributeSource tas = getTransactionAttributeSource();
//获取当前事务方法对应的TransactionAttribute
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
//定位TransactionManager
final TransactionManager tm = determineTransactionManager(txAttr);
.....
//类型转换为局部事务管理器
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
//TransactionManager根据TransactionAttribute创建事务后返回
//TransactionInfo封装了当前事务的信息--包括TransactionStatus
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

Object retVal;
try {
//继续执行过滤器链---过滤链最终会调用目标方法
//因此可以理解为这里是调用目标方法
retVal = invocation.proceedWithInvocation();
} catch (Throwable ex) {
//目标方法抛出异常则进行判断是否需要回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
} finally {
//清除当前事务信息
cleanupTransactionInfo(txInfo);
}
...
//正常返回,那么就正常提交事务呗(当然还是需要判断TransactionStatus状态先)
commitTransactionAfterReturning(txInfo);
return retVal;
}
...

编程式事务

在前面,我们已经解决了任务异步并行执行的难题,下面我们要解决如何确保 Spring 在多线程环境下也能保持事务一致性。

声明式事务并不能解决我们当前的问题,那就只能求助于编程式事务了。

那么编程式事务是什么样子呢?

其实上面 TransactionInterceptor 给出的那套模板流程,就是编程式事务使用的模范案例,简化上面的模板流程,使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class TransactionMain {
public static void main(String[] args) throws ClassNotFoundException, SQLException {
test();
}

private static void test() {
DataSource dataSource = getDS();
JdbcTransactionManager jtm = new JdbcTransactionManager(dataSource);
//JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置
//包括隔离级别和传播行为等
DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
//开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
TransactionStatus ts = jtm.getTransaction(transactionDef);
//进行业务逻辑操作
try {
update(dataSource);
jtm.commit(ts);
} catch (Exception e) {
jtm.rollback(ts);
System.out.println("发生异常,我已回滚");
}
}

private static void update(DataSource dataSource) throws Exception {
JdbcTemplate jt = new JdbcTemplate();
jt.setDataSource(dataSource);
jt.update("UPDATE Department SET Dname=\"大忽悠\" WHERE id=6");
throw new Exception("我是来捣乱的");
}
}

编程式事务解决问题

下面我给出一份看似正确的解决方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

@Component
@RequiredArgsConstructor
public class MultiplyThreadTransactionManager {
/**
* 如果是多数据源的情况下,需要指定具体是哪一个数据源
*/
private final DataSource dataSource;

/**
* 执行的是无返回值的任务
* @param tasks 异步执行的任务列表
* @param executor 异步执行任务需要用到的线程池,考虑到线程池需要隔离,这里强制要求传
*/
public void runAsyncButWaitUntilAllDown(List < Runnable > tasks, Executor executor) {
if (executor == null) {
throw new IllegalArgumentException("线程池不能为空");
}
DataSourceTransactionManager transactionManager = getTransactionManager();
//是否发生了异常
AtomicBoolean ex = new AtomicBoolean();

List < CompletableFuture > taskFutureList = new ArrayList < > (tasks.size());
List < TransactionStatus > transactionStatusList = new ArrayList < > (tasks.size());

tasks.forEach(task - > {
taskFutureList.add(CompletableFuture.runAsync(
() - > {
try {
//1.开启新事务
transactionStatusList.add(openNewTransaction(transactionManager));
//2.异步任务执行
task.run();
} catch (Throwable throwable) {
//打印异常
throwable.printStackTrace();
//其中某个异步任务执行出现了异常,进行标记
ex.set(Boolean.TRUE);
//其他任务还没执行的不需要执行了
taskFutureList.forEach(completableFuture - > completableFuture.cancel(true));
}
}, executor));
});

try {
//阻塞直到所有任务全部执行结束---如果有任务被取消,这里会抛出异常滴,需要捕获
CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[] {})).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

//发生了异常则进行回滚操作,否则提交
if (ex.get()) {
System.out.println("发生异常,全部事务回滚");
transactionStatusList.forEach(transactionManager::rollback);
} else {
System.out.println("全部事务正常提交");
transactionStatusList.forEach(transactionManager::commit);
}
}

private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
//JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置
//包括隔离级别和传播行为等
DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
//开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
return transactionManager.getTransaction(transactionDef);
}

private DataSourceTransactionManager getTransactionManager() {
return new DataSourceTransactionManager(dataSource);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public void test() {
List <Runnable> tasks = new ArrayList<>();

tasks.add(() -> {
userMapper.deleteById(26);
});

tasks.add(() -> {
signMapper.deleteById(10);
});

multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool());
}

任务正常都执行完毕,事务进行提交,但是会抛出异常,导致事务回滚:

1
2
No value for key [HikariDataSource (HikariPool-1)] bound to thread [main]
解释: 无法在当前线程绑定的threadLocal中寻找到HikariDataSource作为key,对应关联的资源对象ConnectionHolder

一次事务的完成通常都是默认在当前线程内完成的,又因为一次事务的执行过程中,涉及到对当前数据库连接 Connection 的操作,因此为了避免将 Connection 在事务执行过程中来回传递,我们可以将 Connextion 绑定到当前事务执行线程对应的 ThreadLocalMap 内部,顺便还可以将一些其他属性也放入其中进行保存,在 Spring 中,负责保存这些 ThreadLocal 属性的实现类由 TransactionSynchronizationManager 承担。

TransactionSynchronizationManager 类内部默认提供了下面六个 ThreadLocal 属性,分别保存当前线程对应的不同事务资源:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//保存当前事务关联的资源--默认只会在新建事务的时候保存当前获取到的DataSource和当前事务对应Connection的映射关系--当然这里Connection被包装为了ConnectionHolder
private static final ThreadLocal < Map < Object, Object >> resources =
new NamedThreadLocal < > ("Transactional resources");
//事务监听者--在事务执行到某个阶段的过程中,会去回调监听者对应的回调接口(典型观察者模式的应用)---默认为空集合
private static final ThreadLocal < Set < TransactionSynchronization >> synchronizations =
new NamedThreadLocal < > ("Transaction synchronizations");
//见名知意: 存放当前事务名字
private static final ThreadLocal < String > currentTransactionName =
new NamedThreadLocal < > ("Current transaction name");
//见名知意: 存放当前事务是否是只读事务
private static final ThreadLocal < Boolean > currentTransactionReadOnly =
new NamedThreadLocal < > ("Current transaction read-only status");
//见名知意: 存放当前事务的隔离级别
private static final ThreadLocal < Integer > currentTransactionIsolationLevel =
new NamedThreadLocal < > ("Current transaction isolation level");
//见名知意: 存放当前事务是否处于激活状态
private static final ThreadLocal < Boolean > actualTransactionActive =
new NamedThreadLocal < > ("Actual transaction active");

那么上面抛出的异常的原因也就很清楚了,无法在 main 线程找到当前事务对应的资源,原因如下:

开启新事务时,事务相关资源都被绑定到了 thread-cache-pool-1 线程对应的 threadLocalMap 内部,而当执行事务提交代码时,commit 内部需要从 TransactionSynchronizationManager 中获取当前事务的资源,显然我们无法从 main 线程对应的 threadLocalMap 中获取到对应的事务资源,这也就是异常抛出的原因。

解决方法:用 CopyTransactionResource 将事务资源在两个线程间来回复制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import lombok.Builder;
import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.sql.DataSource;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

@Component
@RequiredArgsConstructor
public class MultiplyThreadTransactionManager {
/**
* 如果是多数据源的情况下,需要指定具体是哪一个数据源
*/
private final DataSource dataSource;

/**
* 执行的是无返回值的任务
* @param tasks 异步执行的任务列表
* @param executor 异步执行任务需要用到的线程池,考虑到线程池需要隔离,这里强制要求传
*/
public void runAsyncButWaitUntilAllDown(List < Runnable > tasks, Executor executor) {
if (executor == null) {
throw new IllegalArgumentException("线程池不能为空");
}
DataSourceTransactionManager transactionManager = getTransactionManager();
//是否发生了异常
AtomicBoolean ex = new AtomicBoolean();

List < CompletableFuture > taskFutureList = new ArrayList < > (tasks.size());
List < TransactionStatus > transactionStatusList = new ArrayList < > (tasks.size());
List < TransactionResource > transactionResources = new ArrayList < > (tasks.size());

tasks.forEach(task - > {
taskFutureList.add(CompletableFuture.runAsync(
() - > {
try {
//1.开启新事务
transactionStatusList.add(openNewTransaction(transactionManager));
//2.copy事务资源
transactionResources.add(TransactionResource.copyTransactionResource());
//3.异步任务执行
task.run();
} catch (Throwable throwable) {
//打印异常
throwable.printStackTrace();
//其中某个异步任务执行出现了异常,进行标记
ex.set(Boolean.TRUE);
//其他任务还没执行的不需要执行了
taskFutureList.forEach(completableFuture - > completableFuture.cancel(true));
}
}, executor));
});

try {
//阻塞直到所有任务全部执行结束---如果有任务被取消,这里会抛出异常滴,需要捕获
CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[] {})).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

//发生了异常则进行回滚操作,否则提交
if (ex.get()) {
System.out.println("发生异常,全部事务回滚");
for (int i = 0; i < tasks.size(); i++) {
transactionResources.get(i).autoWiredTransactionResource();
transactionManager.rollback(transactionStatusList.get(i));
transactionResources.get(i).removeTransactionResource();
}
} else {
System.out.println("全部事务正常提交");
for (int i = 0; i < tasks.size(); i++) {
transactionResources.get(i).autoWiredTransactionResource();
transactionManager.commit(transactionStatusList.get(i));
transactionResources.get(i).removeTransactionResource();
}
}
}

private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
//JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置
//包括隔离级别和传播行为等
DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
//开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
return transactionManager.getTransaction(transactionDef);
}

private DataSourceTransactionManager getTransactionManager() {
return new DataSourceTransactionManager(dataSource);
}

/**
* 保存当前事务资源,用于线程间的事务资源COPY操作
*/
@Builder
private static class TransactionResource {
//事务结束后默认会移除集合中的DataSource作为key关联的资源记录
private Map < Object, Object > resources = new HashMap < > ();

//下面五个属性会在事务结束后被自动清理,无需我们手动清理
private Set < TransactionSynchronization > synchronizations = new HashSet < > ();

private String currentTransactionName;

private Boolean currentTransactionReadOnly;

private Integer currentTransactionIsolationLevel;

private Boolean actualTransactionActive;

public static TransactionResource copyTransactionResource() {
return TransactionResource.builder()
//返回的是不可变集合
.resources(TransactionSynchronizationManager.getResourceMap())
//如果需要注册事务监听者,这里记得修改--我们这里不需要,就采用默认负责--spring事务内部默认也是这个值
.synchronizations(new LinkedHashSet < > ())
.currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName())
.currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly())
.currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel())
.actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive())
.build();
}

public void autoWiredTransactionResource() {
resources.forEach(TransactionSynchronizationManager::bindResource);
//如果需要注册事务监听者,这里记得修改--我们这里不需要,就采用默认负责--spring事务内部默认也是这个值
TransactionSynchronizationManager.initSynchronization();
TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly);
}

public void removeTransactionResource() {
//事务结束后默认会移除集合中的DataSource作为key关联的资源记录
//DataSource如果重复移除,unbindResource时会因为不存在此key关联的事务资源而报错
resources.keySet().forEach(key - > {
if (!(key instanceof DataSource)) {
TransactionSynchronizationManager.unbindResource(key);
}
});
}
}
}

增加异常抛出,测试是否能够保证多线程间的事务一致性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@SpringBootTest(classes = UserMain.class)
public class Test {
@Resource
private UserMapper userMapper;
@Resource
private SignMapper signMapper;
@Resource
private MultiplyThreadTransactionManager multiplyThreadTransactionManager;

@SneakyThrows
@org.junit.jupiter.api.Test
public void test() {
List <Runnable> tasks = new ArrayList<>();

tasks.add(() -> {
userMapper.deleteById(26);
throw new RuntimeException("我就要抛出异常!");
});

tasks.add(() -> {
signMapper.deleteById(10);
});

multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool());
}

}

事务都进行了回滚,数据库数据没变。

0%