集成 Atomikos 实现分布式事务
# 集成 Atomikos 实现分布式事务
在分布式系统中,多个数据源或其他分布式资源可能需要参与到同一个事务中。为了解决这个问题,Spring Boot 推荐使用 Atomikos 来实现分布式事务管理。Atomikos 是一个轻量级的、基于 Java 的事务管理器,支持 XA 规范,用于管理分布式事务。
若依框架已经实现了多数据源的切换功能,但在分布式事务场景下,我们需要使用 Atomikos 来确保多个数据源之间的事务一致性。
# 1. 添加 Atomikos 依赖
在 ruoyi-framework/pom.xml
中添加 Atomikos 的依赖项,以便支持分布式事务管理。
<!-- Atomikos 分布式事务 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
2
3
4
5
# 2. 配置多数据源与 Atomikos 事务管理器
为了使用 Atomikos 进行分布式事务管理,我们需要配置多个数据源,并确保它们在 Atomikos 的管理下。
配置主数据源与从数据源:需要为每个数据源配置独立的 Atomikos 连接池与事务管理器。
配置示例: 假设我们有两个数据源,分别为
master
和slave
,以下是配置示例。
spring:
jta:
enabled: true
atomikos:
datasource:
master:
unique-resource-name: masterDataSource
xa-data-source-class-name: com.mysql.cj.jdbc.MysqlXADataSource
xa-properties:
user: root
password: password
url: jdbc:mysql://localhost:3306/ry?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8
min-pool-size: 5
max-pool-size: 20
borrow-connection-timeout: 30
max-idle-time: 60
maintenance-interval: 60
test-query: SELECT 1 FROM DUAL
slave:
unique-resource-name: slaveDataSource
xa-data-source-class-name: com.mysql.cj.jdbc.MysqlXADataSource
xa-properties:
user: root
password: password
url: jdbc:mysql://localhost:3306/ry-vue?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8
min-pool-size: 5
max-pool-size: 20
borrow-connection-timeout: 30
max-idle-time: 60
maintenance-interval: 60
test-query: SELECT 1 FROM DUAL
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 3. 创建数据源 Bean 配置
在 ruoyi-framework
项目中,创建 Atomikos 相关的数据源配置类,分别配置两个数据源 master
和 slave
。
package com.ruoyi.framework.config;
import javax.sql.DataSource;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import com.mysql.cj.jdbc.MysqlXADataSource;
@Configuration
public class DataSourceConfig {
@Bean(name = "masterDataSource")
public DataSource masterDataSource(Environment env) {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(env.getProperty("spring.jta.atomikos.datasource.master.xa-properties.url"));
mysqlXaDataSource.setUser(env.getProperty("spring.jta.atomikos.datasource.master.xa-properties.user"));
mysqlXaDataSource.setPassword(env.getProperty("spring.jta.atomikos.datasource.master.xa-properties.password"));
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("masterDataSource");
xaDataSource.setMinPoolSize(Integer.parseInt(env.getProperty("spring.jta.atomikos.datasource.master.min-pool-size")));
xaDataSource.setMaxPoolSize(Integer.parseInt(env.getProperty("spring.jta.atomikos.datasource.master.max-pool-size")));
xaDataSource.setBorrowConnectionTimeout(Integer.parseInt(env.getProperty("spring.jta.atomikos.datasource.master.borrow-connection-timeout")));
xaDataSource.setMaxIdleTime(Integer.parseInt(env.getProperty("spring.jta.atomikos.datasource.master.max-idle-time")));
xaDataSource.setMaintenanceInterval(Integer.parseInt(env.getProperty("spring.jta.atomikos.datasource.master.maintenance-interval")));
xaDataSource.setTestQuery(env.getProperty("spring.jta.atomikos.datasource.master.test-query"));
return xaDataSource;
}
@Bean(name = "slaveDataSource")
public DataSource slaveDataSource(Environment env) {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(env.getProperty("spring.jta.atomikos.datasource.slave.xa-properties.url"));
mysqlXaDataSource.setUser(env.getProperty("spring.jta.atomikos.datasource.slave.xa-properties.user"));
mysqlXaDataSource.setPassword(env.getProperty("spring.jta.atomikos.datasource.slave.xa-properties.password"));
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("slaveDataSource");
xaDataSource.setMinPoolSize(Integer.parseInt(env.getProperty("spring.jta.atomikos.datasource.slave.min-pool-size")));
xaDataSource.setMaxPoolSize(Integer.parseInt(env.getProperty("spring.jta.atomikos.datasource.slave.max-pool-size")));
xaDataSource.setBorrowConnectionTimeout(Integer.parseInt(env.getProperty("spring.jta.atomikos.datasource.slave.borrow-connection-timeout")));
xaDataSource.setMaxIdleTime(Integer.parseInt(env.getProperty("spring.jta.atomikos.datasource.slave.max-idle-time")));
xaDataSource.setMaintenanceInterval(Integer.parseInt(env.getProperty("spring.jta.atomikos.datasource.slave.maintenance-interval")));
xaDataSource.setTestQuery(env.getProperty("spring.jta.atomikos.datasource.slave.test-query"));
return xaDataSource;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# 4. 使用 @Transactional
注解管理事务
在需要操作多个数据源的方法上使用 @Transactional
注解,确保事务的一致性。Spring 会通过 Atomikos 来管理这个分布式事务。
import org.springframework.transaction.annotation.Transactional;
import org.springframework.beans.factory.annotation.Autowired;
public class SomeService {
@Autowired
private MasterMapper masterMapper;
@Autowired
private SlaveMapper slaveMapper;
/**
* 示例方法:在主库和从库中执行操作,确保分布式事务的一致性
*/
@Transactional
public void insert() {
// 操作主库
masterMapper.insertMasterData();
// 操作从库
slaveMapper.insertSlaveData();
// 通过事务管理器保证以上两个操作要么同时成功,要么同时回滚
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 5. 测试与验证
在项目中,可以通过实际的数据操作来测试分布式事务是否成功。通过 @Transactional
注解的方法,分别对多个数据源进行操作,确保在出现异常时,所有数据源的操作都能回滚。
# 6. 重要 API 和参数说明
@Transactional
:Spring 的事务管理注解,确保方法中的数据库操作在一个事务中执行。AtomikosDataSourceBean
:Atomikos 提供的 XA 数据源实现,用于配置和管理数据源。unique-resource-name
:每个数据源的唯一标识,用于区分不同的数据源。
结论
通过上述步骤,可以在 RuoYi 项目中集成 Atomikos 以实现分布式事务管理。确保每个数据源配置正确,并在业务方法中使用 @Transactional
注解来管理事务。这样能够确保在分布式环境下的数据一致性,避免因事务问题导致的数据不一致。