@Transactional源码分析一:事务的大致流程

导读:本篇文章讲解 @Transactional源码分析一:事务的大致流程,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

spring中开启声明式事务
@EnableTransactionManagement
在这里插入图片描述
在TransactionManagementConfigurationSelector中,实现了
ImportSelector接口,重写了selectImports方法,做了两件事情:1、注册事务的入口类;2、配置事务的切面信息
在这里插入图片描述
在这里插入图片描述TransactionAttributeSource是用于解析@Tranactional注解中的属性并封装成TransactionAttribute对象。

事务切面在定义poincut的时候,设置了classFilter,但是classFilter只是做了一些类型的匹配校验工作,如图所示的这种。但大多数最终还是返回true,并没有进行对类的拦截,真正做匹配校验的是在校验方法的methodMatcher中。碍于篇幅,这里不做赘述,有兴趣的可以看一下源码的classFilter的校验。
在这里插入图片描述
在校验方法的时候,会走到以下代码:

代码片段一:

@Nullable
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
	// 非public方法直接返回null,也就是methodMatcher的校验为false,则不走代理
	if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
		return null;
	}
	//获取原始方法,因为第一次methodMatcher的时候为原始方法,第二次methodMatcher的时候不为原始方法(jdk代理中为接口中的方法)
	Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);

	// 拿到原始方法中的@Transactional注解的属性,如果方法中存在,直接返回
	TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
	if (txAttr != null) {
		return txAttr;
	}

	// 如果方法中不存在@Transactional注解,则去类上找
	txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
	if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
		return txAttr;
	}

	if (specificMethod != method) {
		// Fallback is to look at the original method.
		txAttr = findTransactionAttribute(method);
		if (txAttr != null) {
			return txAttr;
		}
		// Last fallback is the class of the original method.
		txAttr = findTransactionAttribute(method.getDeclaringClass());
		if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
			return txAttr;
		}
	}

	return null;
}

以上代码中需要关注的点是为什么通过API拿到原始方法。

如果方法或者类上匹配到@Transactional注解,方法被调用时,会进入advice(TransactionInterceptor)中的invoke方法:

代码片段二:

@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {

	Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
	return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}


@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
		final InvocationCallback invocation) throws Throwable {

	//获取事务属性解析类 AnnotationTransactionAttributeSource
	TransactionAttributeSource tas = getTransactionAttributeSource();
	//获取事务属性
	final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
	//获取事务管理器
	final TransactionManager tm = determineTransactionManager(txAttr);

	.....
	PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
	//获取joinpoint
	final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
	if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
		//开启事务
		TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
		Object retVal;
		try {
			//链式调用
			retVal = invocation.proceedWithInvocation();
		}
		catch (Throwable ex) {
			//事务回滚
			completeTransactionAfterThrowing(txInfo, ex);
			throw ex;
		}
		finally {
			cleanupTransactionInfo(txInfo);
		}

		if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
			TransactionStatus status = txInfo.getTransactionStatus();
			if (status != null && txAttr != null) {
				retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
			}
		}

		//事务提交
		commitTransactionAfterReturning(txInfo);
		return retVal;
	}
	.....
}

看代码片段二中开启事务的代码:
在这里插入图片描述
代码片段三:tm.getTransaction代码:

@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
		throws TransactionException {

	// Use defaults if no transaction definition given.
	TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

	//DataSourceTransactionObject拿到对象
	Object transaction = doGetTransaction();
	boolean debugEnabled = logger.isDebugEnabled();

	//第一次进来connectionHolder为空的,所以不存在事务
	if (isExistingTransaction(transaction)) {
		// Existing transaction found -> check propagation behavior to find out how to behave.
		//如果不是第一次进来,则会走这个逻辑
		return handleExistingTransaction(def, transaction, debugEnabled);
	}

	// Check definition settings for new transaction.
	if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
		throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
	}

	// No existing transaction found -> check propagation behavior to find out how to proceed.
	if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
		throw new IllegalTransactionStateException(
				"No existing transaction found for transaction marked with propagation 'mandatory'");
	}
	//第一次进来大部分会走这里
	else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
			def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
			def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
		//先挂起
		SuspendedResourcesHolder suspendedResources = suspend(null);
		if (debugEnabled) {
			logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
		}
		try {
			return startTransaction(def, transaction, debugEnabled, suspendedResources);
		}
		catch (RuntimeException | Error ex) {
			resume(null, suspendedResources);
			throw ex;
		}
	}
	else {
		// Create "empty" transaction: no actual transaction, but potentially synchronization.
		if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
			logger.warn("Custom isolation level specified but no actual transaction initiated; " +
					"isolation level will effectively be ignored: " + def);
		}
		boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
		return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
	}
}

看代码片段三中的doGetTransaction()方法:
在这里插入图片描述
第一次从ThreadLocal中拿肯定是拿不到ConnectionHolder的,不存在事务,又因为spring默认的传播属性为required,所以一般会走代码片段三中的else if逻辑,如图所示:
在这里插入图片描述
看上图中的startTransaction方法:

代码片段四:

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
			boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

	boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
	//创建一个新的事务状态,这里的 newTransaction 属性为 true
	DefaultTransactionStatus status = newTransactionStatus(
			definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
	//开启事务
	doBegin(transaction, definition);
	//开启事务后,改变事务状态,事务状态实时在变,给业务人员使用的api,用来查看事务的状态
	prepareSynchronization(status, definition);
	return status;
}

说明:spring中用一个boolean类型的属性控制是否是一个新的事务状态,上述代码中为true,表示开启一个新的事务状态,事务提交也是只提交事务状态为true的事务。

看一下代码片段四中的开启事务方法doBegin方法,因为spring中jdbc的事务管理器为DataSourceTransactionManager,所以看它的doBegin方法:

代码片段五:

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
	//事务对象
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
	Connection con = null;

	try {
		//如果事务对象中没连接
		if (!txObject.hasConnectionHolder() ||
				txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
			//从dataSource对象中拿到连接对象
			Connection newCon = obtainDataSource().getConnection();
			if (logger.isDebugEnabled()) {
				logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
			}
			//把连接对象设置到事务对象中
			txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
		}

		txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
		con = txObject.getConnectionHolder().getConnection();

		//设置是否只读连接和设置事务隔离级别
		Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
		//设置该连接的隔离级别
		txObject.setPreviousIsolationLevel(previousIsolationLevel);
		txObject.setReadOnly(definition.isReadOnly());

		// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
		// so we don't want to do it unnecessarily (for example if we've explicitly
		// configured the connection pool to set it already).
		//如果是自动提交
		if (con.getAutoCommit()) {
			txObject.setMustRestoreAutoCommit(true);
			if (logger.isDebugEnabled()) {
				logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
			}
			//把自动提交关闭,因为提交要交给spring来做
			con.setAutoCommit(false);
		}

		//执行只读事务命令
		prepareTransactionalConnection(con, definition);
		//把事务状态设置为true,是否是活跃的
		txObject.getConnectionHolder().setTransactionActive(true);

		//获取事务超时时间
		int timeout = determineTimeout(definition);
		if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
			txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
		}

		// Bind the connection holder to the thread.
		//如果是一个新事务,建立数据源对象和连接对象的绑定关系.且把该绑定关系的map设置到ThreadLocal中
		if (txObject.isNewConnectionHolder()) {
			TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
		}
	}

	catch (Throwable ex) {
		if (txObject.isNewConnectionHolder()) {
			DataSourceUtils.releaseConnection(con, obtainDataSource());
			txObject.setConnectionHolder(null, false);
		}
		throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
	}
}

代码片段五的总结:如果事务对象中没有连接对象,则从数据源对象中获取,接下来配置隔离级别,关闭自动提交,设置事务超时时间,设置事务状态为true(表示活跃的),然后将数据源对象和连接对象做绑定, 放入ThreadLocal当中。

这样代码片段二中的开启事务就完成了,接下来进入链式调用的过程,如果出现异常,则进入catch,进行事务回滚,没有异常则进行事务提交。

看回滚的代码:

代码片段六:

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
	if (txInfo != null && txInfo.getTransactionStatus() != null) {
		if (logger.isTraceEnabled()) {
			logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
					"] after exception: " + ex);
		}
		//判断事务回滚类型
		if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
			try {
				//执行事务回滚
				txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
			}
			catch (TransactionSystemException ex2) {
				logger.error("Application exception overridden by rollback exception", ex);
				ex2.initApplicationException(ex);
				throw ex2;
			}
			catch (RuntimeException | Error ex2) {
				logger.error("Application exception overridden by rollback exception", ex);
				throw ex2;
			}
		}
		else {
			// We don't roll back on this exception.
			// Will still roll back if TransactionStatus.isRollbackOnly() is true.
			try {
				txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
			}
			catch (TransactionSystemException ex2) {
				logger.error("Application exception overridden by commit exception", ex);
				ex2.initApplicationException(ex);
				throw ex2;
			}
			catch (RuntimeException | Error ex2) {
				logger.error("Application exception overridden by commit exception", ex);
				throw ex2;
			}
		}
	}
}

代码片段六说明:
如果事务属性不为空,且异常回滚类型满足,则进行事务的回滚,否则,进行事务的提交。

先看代码片段六中的回滚操作:

代码片段七:

@Override
public final void rollback(TransactionStatus status) throws TransactionException {
	if (status.isCompleted()) {
		throw new IllegalTransactionStateException(
				"Transaction is already completed - do not call commit or rollback more than once per transaction");
	}

	DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
	//执行事务回滚
	processRollback(defStatus, false);
}


private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
	try {
		boolean unexpectedRollback = unexpected;

		try {
			triggerBeforeCompletion(status);

			//如果有回滚点,如果是嵌套事务
			if (status.hasSavepoint()) {
				if (status.isDebug()) {
					logger.debug("Rolling back transaction to savepoint");
				}
				status.rollbackToHeldSavepoint();
			}
			//如果是新事务,newTransaction 为true的情况下才回滚
			else if (status.isNewTransaction()) {
				if (status.isDebug()) {
					logger.debug("Initiating transaction rollback");
				}
				doRollback(status);
			}
			else {
				// Participating in larger transaction
				if (status.hasTransaction()) {
					if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
						if (status.isDebug()) {
							logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
						}
						doSetRollbackOnly(status);
					}
					else {
						if (status.isDebug()) {
							logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
						}
					}
				}
				else {
					logger.debug("Should roll back transaction but cannot - no transaction available");
				}
				// Unexpected rollback only matters here if we're asked to fail early
				if (!isFailEarlyOnGlobalRollbackOnly()) {
					unexpectedRollback = false;
				}
			}
		}
		catch (RuntimeException | Error ex) {
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
			throw ex;
		}

		triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

		// Raise UnexpectedRollbackException if we had a global rollback-only marker
		if (unexpectedRollback) {
			throw new UnexpectedRollbackException(
					"Transaction rolled back because it has been marked as rollback-only");
		}
	}
	finally {
		cleanupAfterCompletion(status);
	}
}

代码片段七说明:事务状态中newTransaction必须为true的情况下,才进行回滚和提交,为true,表示是最外层的事务, 为true的时候,获取到连接对象进行rollback。

看代码片段六中的提交操作:

代码片段八:

@Override
public final void commit(TransactionStatus status) throws TransactionException {
	if (status.isCompleted()) {
		throw new IllegalTransactionStateException(
				"Transaction is already completed - do not call commit or rollback more than once per transaction");
	}

	DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
	if (defStatus.isLocalRollbackOnly()) {
		if (defStatus.isDebug()) {
			logger.debug("Transactional code has requested rollback");
		}
		processRollback(defStatus, false);
		return;
	}

	if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
		if (defStatus.isDebug()) {
			logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
		}
		processRollback(defStatus, true);
		return;
	}
	//执行事务提交
	processCommit(defStatus);
}

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
	try {
		boolean beforeCompletionInvoked = false;

		try {
			boolean unexpectedRollback = false;
			prepareForCommit(status);
			//调用beforeCommit方法,事务提交之前做业务处理
			triggerBeforeCommit(status);
			triggerBeforeCompletion(status);
			beforeCompletionInvoked = true;

			//如果有回滚点
			if (status.hasSavepoint()) {
				if (status.isDebug()) {
					logger.debug("Releasing transaction savepoint");
				}
				unexpectedRollback = status.isGlobalRollbackOnly();
				status.releaseHeldSavepoint();
			}
			//如果是新事务,则提交事务
			else if (status.isNewTransaction()) {
				if (status.isDebug()) {
					logger.debug("Initiating transaction commit");
				}
				unexpectedRollback = status.isGlobalRollbackOnly();
				doCommit(status);
			}
			else if (isFailEarlyOnGlobalRollbackOnly()) {
				unexpectedRollback = status.isGlobalRollbackOnly();
			}

			// Throw UnexpectedRollbackException if we have a global rollback-only
			// marker but still didn't get a corresponding exception from commit.
			if (unexpectedRollback) {
				throw new UnexpectedRollbackException(
						"Transaction silently rolled back because it has been marked as rollback-only");
			}
		}
		catch (UnexpectedRollbackException ex) {
			// can only be caused by doCommit
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
			throw ex;
		}
		catch (TransactionException ex) {
			// can only be caused by doCommit
			if (isRollbackOnCommitFailure()) {
				doRollbackOnCommitException(status, ex);
			}
			else {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
			}
			throw ex;
		}
		catch (RuntimeException | Error ex) {
			if (!beforeCompletionInvoked) {
				triggerBeforeCompletion(status);
			}
			doRollbackOnCommitException(status, ex);
			throw ex;
		}

		// Trigger afterCommit callbacks, with an exception thrown there
		// propagated to callers but the transaction still considered as committed.
		try {
			//触发afterCommit方法,事务提交后做业务处理
			triggerAfterCommit(status);
		}
		finally {
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
		}
	}
	finally {
		//处理挂起事务,在这里恢复绑定关系
		cleanupAfterCompletion(status);
	}
}

补充:
代码片段二中获取事务属性的代码:

protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
	RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();

	Propagation propagation = attributes.getEnum("propagation");
	rbta.setPropagationBehavior(propagation.value());
	Isolation isolation = attributes.getEnum("isolation");
	rbta.setIsolationLevel(isolation.value());
	rbta.setTimeout(attributes.getNumber("timeout").intValue());
	rbta.setReadOnly(attributes.getBoolean("readOnly"));
	rbta.setQualifier(attributes.getString("value"));

	List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
	for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
		rollbackRules.add(new RollbackRuleAttribute(rbRule));
	}
	for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
		rollbackRules.add(new RollbackRuleAttribute(rbRule));
	}
	for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
		rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
	}
	for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
		rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
	}
	rbta.setRollbackRules(rollbackRules);

	return rbta;
}

这里会将属性封装到RuleBasedTransactionAttribute 对象当中。当再进行事务回滚的时候,会判断定义的回滚的异常类型是否匹配。

判断是否回滚:

@Override
public boolean rollbackOn(Throwable ex) {
	if (logger.isTraceEnabled()) {
		logger.trace("Applying rules to determine whether transaction should rollback on " + ex);
	}

	RollbackRuleAttribute winner = null;
	int deepest = Integer.MAX_VALUE;

	if (this.rollbackRules != null) {
		for (RollbackRuleAttribute rule : this.rollbackRules) {
			int depth = rule.getDepth(ex);
			if (depth >= 0 && depth < deepest) {
				deepest = depth;
				winner = rule;
			}
		}
	}

	if (logger.isTraceEnabled()) {
		logger.trace("Winning rollback rule is: " + winner);
	}

	// User superclass behavior (rollback on unchecked) if no rule matches.
	if (winner == null) {
		logger.trace("No relevant rollback rule found: applying default rules");
		return super.rollbackOn(ex);
	}

	return !(winner instanceof NoRollbackRuleAttribute);
}

如果rollbackRules 不为空,也就是自定义了回滚的异常类型,则循环判断,如果能匹配到,则进行回滚,如果匹配不到或者rollbackRules 为空,则走默认的,就是去调用父类DefaultTransactionAttribute的rollbackon方法,如果还是匹配不到,则进行提交。
在这里插入图片描述
在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/13821.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!