spring 事务原理
传播机制 事务挂起
spring自带的JdbcTemplate使用示例
查询
public <T> T queryForObject(String sql, RowMapper<T> rowMapper) throws DataAccessException {
List<T> results = query(sql, rowMapper);
return DataAccessUtils.nullableSingleResult(results);
}
public <T> T query(final String sql, final ResultSetExtractor<T> rse) throws DataAccessException {
Assert.notNull(sql, "SQL must not be null");
Assert.notNull(rse, "ResultSetExtractor must not be null");
if (logger.isDebugEnabled()) {
logger.debug("Executing SQL query [" + sql + "]");
}
/**
* Callback to execute the query. 回调函数
*/
class QueryStatementCallback implements StatementCallback<T>, SqlProvider {
@Override
@Nullable
public T doInStatement(Statement stmt) throws SQLException {
ResultSet rs = null;
try {
// 回调
rs = stmt.executeQuery(sql);
return rse.extractData(rs);
}
finally {
JdbcUtils.closeResultSet(rs);
}
}
@Override
public String getSql() {
return sql;
}
}
return execute(new QueryStatementCallback());
}
剩下就是按照jdbc流程来执行
Connection con = DataSourceUtils.getConnection(obtainDataSource());
@Nullable
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}
private static Object doGetResource(Object actualKey) {
// resources是一个ThreadLocal,表示每个线程中缓存的某个DataSource对应的数据库连接
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
事务
事务(Transaction)是访问和更新数据库的程序执行单元;事务中可能包含一个或多个sql语句,这些语句要么都执行成功,要么全部执行失败
事务的四大特性(ACID)
-
原子性(Atomicity,或称不可分割性)
-
一致性(Consistency)
-
隔离性(Isolation)
-
持久性(Durability)
事务的隔离级别
未提交读(READ UNCOMMITTED)
提交读(REDA COMMITED)
可重复读(REPEATABLE READ)
可串行化(SERIALIZABLE)
保存点 SAVEPOINT 保存点名称; ROLLBACK [WORK] TO [SAVEPOINT] 保存点名称
spring的事务管理
编程式事务
- 使用
TransactionTemplate
或者 TransactionalOperator
.
- 直接实现
TransactionManager
接口
TransactionManager
实际上TransactionTemplate内部也是使用TransactionManager来完成事务管理的,我们之前也看过它的execute方法的实现了,其实内部就是调用了TransactionManager的方法,实际上就是分为这么几步
开启事务
执行业务逻辑
出现异常进行回滚
正常执行则提交事务
private PlatformTransactionManager txManager;
public void test(){
// 定义事务
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setName("SomeTxName");
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
// txManager,事务管理器
// 通过事务管理器开启一个事务
TransactionStatus status = txManager.getTransaction(def);
try {
// 完成自己的业务逻辑
}
catch (Exception ex) {
// 出现异常,进行回滚
txManager.rollback(status);
throw ex;
}
// 正常执行完成,提交事务
txManager.commit(status);
}
PlatformTransactionManager
PlatformTransactionManager接口定义
public interface PlatformTransactionManager extends TransactionManager {
//开启事务
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
// 提交事务
void commit(TransactionStatus status) throws TransactionException;
//回滚事务
void rollback(TransactionStatus status) throws TransactionException;
}
TransactionDefinition 对事务定义
public interface TransactionDefinition {
// 如果当前存在事务,则加入当前事务,如果不存在当前事务,则创建一个新事务
int PROPAGATION_REQUIRED = 0;
// 如果当前存在事务,则加入该事务;如果当前没有事务,则以非事务的方式继续运行。
int PROPAGATION_SUPPORTS = 1;
// 加入当前事务,如果当前没有事务则抛异常
int PROPAGATION_MANDATORY = 2;
// 创建一个新事务,如果存在当前事务,则把当前事务挂起,等新事务执行结束后,继续运行被挂起的事务
int PROPAGATION_REQUIRES_NEW = 3;
// 如果当前存在事务,则把当前事务挂起,然后以非事务的方式运行
int PROPAGATION_NOT_SUPPORTED = 4;
// 不开启事务,如果存在当前事务,则抛异常
int PROPAGATION_NEVER = 5;
// 和PROPAGATION_REQUIRED类似,如果当前存在事务,则加入当前事务,如果不存在当前事务,则创建一个新事务
// 只不过它利用的是savepoint来达到在当前事务中运行的目的
int PROPAGATION_NESTED = 6;
int ISOLATION_DEFAULT = -1;
int ISOLATION_READ_UNCOMMITTED = 1; // same as java.sql.Connection.TRANSACTION_READ_UNCOMMITTED;
int ISOLATION_READ_COMMITTED = 2; // same as java.sql.Connection.TRANSACTION_READ_COMMITTED;
SOLATION_REPEATABLE_READ = 4; // same as java.sql.Connection.TRANSACTION_REPEATABLE_READ;
int ISOLATION_SERIALIZABLE = 8; // same as java.sql.Connection.TRANSACTION_SERIALIZABLE;
/**
* Use the default timeout of the underlying transaction system,
* or none if timeouts are not supported.
*/
int TIMEOUT_DEFAULT = -1;
default int getPropagationBehavior() {
return PROPAGATION_REQUIRED;
}
default int getIsolationLevel() {
return ISOLATION_DEFAULT;
}
default int getTimeout() {
return TIMEOUT_DEFAULT;
}
default boolean isReadOnly() {
return false;
}
@Nullable
default String getName() {
return null;
}
static TransactionDefinition withDefaults() {
return StaticTransactionDefinition.INSTANCE;
}
我们在使用申明式事务的时候,会通过@Transactional这个注解去申明某个方法需要进行事务管理,在@Transactional中可以定义事务的属性,这些属性实际上就会被封装到一个TransactionDefinition中,当然封装的时候肯定不是直接使用的接口,而是这个接口的一个实现类RuleBasedTransactionAttribute。RuleBasedTransactionAttribute,该类的继承关系如下
默认的传播机制为required,没有事务新建一个事务// 有事务的话加入当前事务private int propagationBehavior = PROPAGATION_REQUIRED;// 隔离级别跟数据库默认的隔离级别一直private int isolationLevel = ISOLATION_DEFAULT;// 默认为-1,不设置超时时间private int timeout = TIMEOUT_DEFAULT;// 非只读private boolean readOnly = false;
TransactionAttribute
@Nullable// 用于指定事务使用的事务管理器的名称String getQualifier();/** * Should we roll back on the given exception? * @param ex the exception to evaluate * @return whether to perform a rollback or not */// 指定在出现哪种异常时才进行回滚boolean rollbackOn(Throwable ex);
DefaultTransactionAttribute
,继承了DefaultTransactionDefinition
,同时实现了TransactionAttribute
接口,定义了默认的回滚异常
public boolean rollbackOn(Throwable ex) { return (ex instanceof RuntimeException || ex instanceof Error);}
-
RuleBasedTransactionAttribute
,@Transactional
注解的rollbackFor
等属性就会被封装到这个类中,允许程序员自己定义回滚的异常,如果没有指定回滚的异常,默认抛出RuntimeException/Error才进行回滚
TransactionStatus 这个接口主要用于描述Spring事务的状态
public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable { /* // 判断当前事务是否是一个新的事务 // 不是一个新事务的话,那么需要加入到已经存在的事务中 @see #isNewTransaction() * @see #createSavepoint() //创建保存点 * @see #rollbackToSavepoint(Object) //回滚到指定保存点 * @see #releaseSavepoint(Object) //移除回滚点 */ boolean hasSavepoint(); @Override void flush();}
TransactionDefinition的主要作用是给出一份事务属性的定义,然后事务管理器根据给出的定义来创建事务,TransactionStatus主要是用来描述创建后的事务的状态
在对TransactionDefinition跟TransactionStatus有一定了解后,我们再回到PlatformTransactionManager接口本身,PlatformTransactionManager作为事务管理器的基础接口只是定义管理一个事务必须的三个方法:开启事务,提交事务,回滚事务,接口仅仅是定义了规范而已,真正做事的还是要依赖它的实现类,所以我们来看看它的继承关系
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mQvglAmg-1620307016260)(C:/Users/huxinyu/AppData/Roaming/Typora/typora-user-images/image-20210421114459221.png)]
AbstractPlatformTransactionManager,Spring提供的一个事务管理的基类,提供了事务管理的模板,实现了Spring事务管理的一个标准流程
判断当前是否已经存在一个事务
应用合适的事务传播行为
在必要的时候挂起/恢复事务
提交时检查事务是否被标记成为rollback-only
在回滚时做适当的修改(是执行真实的回滚/还是将事务标记成rollback-only)
触发注册的同步回调
Spring中事务的同步机制
那么Spring是如何来管理同步的呢?同样的,Spring也提供了一个同步管理器TransactionSynchronizationManager
,这是一个抽象类,其中所有的方法都是静态的,并且所有的方法都是围绕它所申明的几个静态常量字段,如下:
// 这就是同步的资源,Spring就是使用这个完成了连接的同步private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");// TransactionSynchronization完成了行为的同步// 关于TransactionSynchronization在后文进行分析private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations");// 事务的名称private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>("Current transaction name");// 事务是否被标记成只读private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status");// 事物的隔离级别private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level");// 是否真实开启了事务 private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>("Actual transaction active");
/** Completion status in case of proper commit. */int STATUS_COMMITTED = 0;/** Completion status in case of proper rollback. */int STATUS_ROLLED_BACK = 1;/** Completion status in case of heuristic mixed completion or system errors. */int STATUS_UNKNOWN = 2;default void suspend() {}default void resume() {}@Overridedefault void flush() {}default void beforeCommit(boolean readOnly) {}default void beforeCompletion() {}default void afterCommit() {}default void afterCompletion(int status) {}
模拟spring事务
@Import({TransactionManagementConfigurationSelector.class})public @interface EnableTransactionManagement { boolean proxyTargetClass() default false; AdviceMode mode() default AdviceMode.PROXY; int order() default 2147483647;}
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> { public TransactionManagementConfigurationSelector() { } protected String[] selectImports(AdviceMode adviceMode) { switch(adviceMode) { case PROXY: return new String[]{AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()}; case ASPECTJ: return new String[]{this.determineTransactionAspectClass()}; default: return null; } }
AutoProxyRegistrar`跟一个`ProxyTransactionManagementConfiguration
public class AutoProxyRegistrar implements ImportBeanDefinitionRegistrar
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { boolean candidateFound = false; Set<String> annTypes = importingClassMetadata.getAnnotationTypes(); for (String annType : annTypes) { // 获取EnableTransactionManagement注解的属性值 AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType); if (candidate == null) { continue; } Object mode = candidate.get("mode"); Object proxyTargetClass = candidate.get("proxyTargetClass"); if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() && Boolean.class == proxyTargetClass.getClass()) { candidateFound = true; if (mode == AdviceMode.PROXY) { // 注册一个InfrastructureAdvisorAutoProxyCreator类型的Bean AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry); if ((Boolean) proxyTargetClass) { AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry); return; } } } } if (!candidateFound && logger.isInfoEnabled()) { String name = getClass().getSimpleName(); logger.info(String.format("%s was imported but no annotations were found " + "having both 'mode' and 'proxyTargetClass' attributes of type " + "AdviceMode and boolean respectively. This means that auto proxy " + "creator registration and configuration may not have occurred as " + "intended, and components may not be proxied as expected. Check to " + "ensure that %s has been @Import'ed on the same class where these " + "annotations are declared; otherwise remove the import of %s " + "altogether.", name, name, name)); }}
ProxyTransactionManagementConfiguration
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uS3jJ4rA-1620307016262)(C:/Users/huxinyu/AppData/Roaming/Typora/typora-user-images/image-20210422161430589.png)]
@Configurationpublic abstract class AbstractTransactionManagementConfiguration implements ImportAware {@Nullableprotected AnnotationAttributes enableTx;@Nullableprotected TransactionManager txManager;// 这个方法就是获取@EnableTransactionManagement的属性// importMetadata:就是@EnableTransactionManagement这个注解所在类的元信息@Overridepublic void setImportMetadata(AnnotationMetadata importMetadata) { // 将EnableTransactionManagement注解中的属性对存入到map中 // AnnotationAttributes实际上就是个map this.enableTx = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableTransactionManagement.class.getName(), false)); // 这里可以看到,限定了导入的注解必须使用@EnableTransactionManagement if (this.enableTx == null) { throw new IllegalArgumentException( "@EnableTransactionManagement is not present on importing class " + importMetadata.getClassName()); }}// 我们可以配置TransactionManagementConfigurer// 通过TransactionManagementConfigurer向容器中注册一个事务管理器// 一般不会这么使用,更多的是通过@Bean的方式直接注册@Autowired(required = false)void setConfigurers(Collection<TransactionManagementConfigurer> configurers) { // ..... TransactionManagementConfigurer configurer = configurers.iterator().next(); this.txManager = configurer.annotationDrivenTransactionManager();}// 向容器中注册一个TransactionalEventListenerFactory// 这个类用于处理@TransactionalEventListener注解// 可以实现对事件的监听,并且在事务的特定阶段对事件进行处理@Bean(name = TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public static TransactionalEventListenerFactory transactionalEventListenerFactory() { return new TransactionalEventListenerFactory();}}
@TransactionalEventListener 是监听事务
spring 监听器原理 是如何执行?
TransactionalEventListenerFactory ApplicationListenerMethodTransactionalAdapter
public void onApplicationEvent(ApplicationEvent event) { //激活了同步,并且真实存在事务 if (TransactionSynchronizationManager.isSynchronizationActive() && TransactionSynchronizationManager.isActualTransactionActive()) { //实际上是依赖事务的同步机制实现的事件监听 TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event); TransactionSynchronizationManager.registerSynchronization(transactionSynchronization); } // 沒有开启事务 else if (this.annotation.fallbackExecution()) { if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) { logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase"); } // 如果注解中的fallbackExecution为true,意味着没有事务开启的话 processEvent(event); } else { // No transactional event execution at all if (logger.isDebugEnabled()) { logger.debug("No transaction is active - skipping " + event); } }}
public static void registerSynchronization(TransactionSynchronization synchronization) throws IllegalStateException { Assert.notNull(synchronization, "TransactionSynchronization must not be null"); Set<TransactionSynchronization> synchs = synchronizations.get(); if (synchs == null) { throw new IllegalStateException("Transaction synchronization is not active"); } // 同步机制 行为上的同步 synchs.add(synchronization);}
ProxyTransactionManagementConfiguration