分布式事务Seata之AT模式,TCC模式,安装教程,原理等

码农在囧途

万千灯火中终于找到属于自己那一盏灯了 若要持续为你亮着 那你得加点油吧。

前言

前面我们说了三篇分布式事务的解决方案,分别是强一致性分布式事务2P,3PC,最终一致性事务TCC,不过它们只是提供了原理,今天我们来说一下具体的 分布式事务框架,对于分布式事务,有很多现成框架,今天我们主要来说一下阿里的Seata,想必Java开发者没有不知道Seata的吧,Seata提供了很多分布式 事务解决方案,比如AT模式,TCC模式,XA模式,Saga模式。

架构

Seata分为三大模块,分别是TC(Transaction Coordinator),TM(Transaction Manager),RM(Resource Manager),

  • TC(事务协调者):是一个独立的中间件,它的作用是协调和管理整个分布式事务过程,在Seata中我们部署在服务器上的seata-server就是TC。

  • TM(事务管理者):TM需要嵌入我们的应用中,全局事务就是由它发起的,全局事务的提交和回滚也是由它发起的。

  • RM(资源管理者):RM其实就是分支事务,它也需要嵌入我们的应用中,它会向TC注册自己,并且接收TC的指令,并决定事务的提交和回滚。

上面说的可能有点抽象,不好理解,那么我们就用一个图来表示,下面是一个下单的过程,我们看出TM位于BusinessService,而BusinessService则通过TRPC调用其他的四个服务, RM其他四个服务上,那么由此可见,TM是一个“领头羊”,那么我们就可以理解,分布式事务的发起方就是TM(BusinessService通过RPC调用其他的服务),而TM和RM都由TC来进行协调。
分布式事务Seata之AT模式,TCC模式,安装教程,原理等

处理过程 分布式事务Seata之AT模式,TCC模式,安装教程,原理等

AT模式

Seata的AT模式对业务是没有入侵的,AT模式其实能满足80%的业务场景,所以AT模式也是用得最多的,Seata的AT模式是2PC的具体实现,但是它和传统的2PC又有着 区别,区别不在于实现的思想上,而是在于处理的方式不一样。

  • 传统的2PC:传统的2PC第一阶段是预提交,它会在本地发起一个事务,但是没有提交,而是将事务日志保存到数据库undo_log和redo_log中,它是数据库级别的, 所以在事务回滚事务的时候,会去读取日志来进行恢复。

  • Seata AT模式在第第一阶段会对事务进行解析,提取出sql语句,并制作出原始快照和新快照,然后保存进undo_log表中,只不过这里的undo_log表是一张数据表, 参与分布式事务的参与者都需要这一张表,是Seata设计的,在进行事务提交和回滚的时候也是基于这张表。

实战

我们来模拟下单过程,下单后我们需要保存订单,扣减库存,增加积分,扣减账户余额,系统分为5个服务,分别为业务微服务(BusinessMicroservice)库存微服务(StockMicroservice)订单微服务(OrderMicroservice)积分微服务(IntegralMicroservice)账户微服务(AccountMicroservice),在业务微服务中通过Feign统一调用其他微服务,其中在订单服务又调用了账户服务。

分布式事务Seata之AT模式,TCC模式,安装教程,原理等

下单总接口

/**
* @author 刘牌
* @date 2022/3/116:22
*/

@Component
@AllArgsConstructor
public class PlaceOrderExecute {

private final IntegralClient integralClient;
private final StockClient stockClient;
private final OrderClient orderClient;

@GlobalTransactional
public Response execute(PlaceOrderDTO placeOrderDTO) {
//保存订单
Order order = new Order()
.setUserId(placeOrderDTO.getUserId())
.setCommodityId(placeOrderDTO.getCommodityId())
.setCount(placeOrderDTO.getCount())
.setMoney(placeOrderDTO.getMoney());
orderClient.saveOrder(order);
//扣减积分
Integral integral = new Integral().setUserId(placeOrderDTO.getUserId()).setIntegral(10);
integralClient.increaseIntegral(integral);
//扣减库存
Stock stock = new Stock().setCommodityId(placeOrderDTO.getCommodityId()).setStockNum(placeOrderDTO.getCount());
stockClient.decreaseStock(stock);
return new Response(200,"placeOrder success",null);
}
}

订单服务调用账户服务扣减余额

@Component
@AllArgsConstructor
public class OrderExecute {

final OrderDao orderDao;
final AccountClient accountClient;

public Response execute(Order order) {
//保存订单信息
orderDao.saveOrder(order);
//扣减账户余额
accountClient.decreaseBalance(order);
return new Response(200,"placeOrder success",null);
}
}

上面就完成了,我们发现,只在下单接口加了一个@GlobalTransactional注解便可,所以AT模式对代码是零入侵的。

TCC模式

对于TCC模式的解释,前面发了一篇文章专门说,这里就不再重复阐述,直接进入实战。

使用TCC模式,它就和数据库没有关联了,前面使用AT模式,它是依赖数据库的,每个业务表中需要一张undo_log表来保存事务的快照等信息,而TCC则 不需要,我们只需要去设计我们的代码,设计Try阶段,Confirm阶段,Cancel阶段,而操作的数据不单单是数据库,还可以是其他的,如Redis,ES等。

Seata的TCC是要使用接口,我们需要在总事务接口上面使用@LocalTCC注解,在方法上使用@TwoPhaseBusinessAction注解,注解参数name 是事务名称,唯一的,commitMethod是事务提交方法,rollbackMethod是回滚方法,两个方法都需要我们编写。


/**
* @author 刘牌
* @date 2022/3/915:03
*/

@LocalTCC
public interface IBusinessService {

@TwoPhaseBusinessAction(name = "placeOrder",
commitMethod = "commitOrder" ,
rollbackMethod = "rollbackOrder")

Response placeOrder(@BusinessActionContextParameter(paramName = "params")Map<String,String> params);

void commitOrder(BusinessActionContext context);

void rollbackOrder(BusinessActionContext context);
}


实现类,里面就是编写Try阶段预留资源,Confirm阶段提交事务,Cancel阶段回滚事务的逻辑。


/**
* @author 刘牌
* @date 2022/3/116:22
*/

@Component
@AllArgsConstructor
public class BusinessServiceImpl implements IBusinessService {
private final IntegralClient integralClient;
private final StockClient stockClient;
private final OrderClient orderClient;
private final AccountClient accountClient;
/**
* Try
* @param
* @return
*/

@Override
@Transactional(rollbackFor = Exception.class)
public Response placeOrder(Map<String,String> params){
PlaceOrderDTO placeOrderDTO = JSON.parseObject(JSON.toJSONString(params), PlaceOrderDTO.class);
FreezeStock freezeStock = stockClient.queryByUserId(placeOrderDTO.getUserId());
if (null != freezeStock){
throw new RuntimeException("abort the transaction");
}
//insert freeze stock record
stockClient.saveFreezeStock(buildFreezeStock(placeOrderDTO));
//update stock
stockClient.decreaseStock(buildStockDTO(placeOrderDTO));
//update integral
integralClient.increaseIntegral(buildIntegralDTO(placeOrderDTO));
//update account
accountClient.decreaseBalance(buildAccountDTO(placeOrderDTO));
return new Response(200,"place order success",null);
}

/**
* Confirm
* @param context
* @return
*/

@Override
public void commitOrder(BusinessActionContext context) {
Map<String,String> params = (Map<String, String>) context.getActionContext("params");
Integer userId = Integer.valueOf(params.get("userId"));
//update order status to 2
String orderId =params.get("orderId");
int confirmStatus = 2;
orderClient.updateOrderStatus(buildUpdateOrderDTO(orderId,confirmStatus));
//delete the freeze table
stockClient.deleteFreezeRecordByUserId(userId);
//update the freeze integral to 0
integralClient.updateFreezeIntegralToZeroByUserId(userId);
//update the account balance to 0
accountClient.updateAccountBalanceToZeroById(userId);
}

/**
* Cancel
* @param context
* @return
*/

@Override
public void rollbackOrder(BusinessActionContext context) {
Map<String,String> params = (Map<String, String>) context.getActionContext("params");
Integer userId = Integer.valueOf(params.get("userId").toString());
//update the order status to 3 or delete the order
String orderId = params.get("orderId").toString();
Integer commodityId = Integer.valueOf(params.get("commodityId").toString());
Integer count = Integer.valueOf(params.get("count").toString());
Integer integral = Integer.valueOf(params.get("integral").toString());
BigDecimal money = BigDecimal.valueOf(Long.parseLong(params.get("money").toString()));
int confirmStatus = 3;
orderClient.updateOrderStatus(buildUpdateOrderDTO(orderId,confirmStatus));
//recovery stock and delete freeze record
stockClient.updateStockNumByUserId(buildUpdateStockDTO(commodityId,count));
stockClient.deleteFreezeRecordByUserId(userId);
//recovery the integral balance
integralClient.updateIntegralBalance(buildUpdateIntegralDTO(integral,userId));
//recovery the account
accountClient.updateAccountBalance(buildUpdateAccountDTO(userId,money));
}

public AccountDTO buildAccountDTO(PlaceOrderDTO placeOrderDTO){
return new AccountDTO()
.setUserId(placeOrderDTO.getUserId())
.setMoney(placeOrderDTO.getMoney());
}

public IntegralDTO buildIntegralDTO(PlaceOrderDTO placeOrderDTO){
return new IntegralDTO()
.setIntegral(placeOrderDTO.getIntegral())
.setUserId(placeOrderDTO.getUserId());
}

public StockDTO buildStockDTO(PlaceOrderDTO placeOrderDTO){
return new StockDTO()
.setCommodityId(placeOrderDTO.getCommodityId())
.setCount(placeOrderDTO.getCount());
}

public FreezeStock buildFreezeStock(PlaceOrderDTO placeOrderDTO){
return new FreezeStock()
.setUserId(placeOrderDTO.getUserId())
.setFreezeStock(placeOrderDTO.getCount())
.setCommodityId(placeOrderDTO.getCommodityId());
}

public Order buildOrder(PlaceOrderDTO placeOrderDTO , Integer status){
return new Order()
.setUserId(placeOrderDTO.getUserId())
.setCommodityId(placeOrderDTO.getCommodityId())
.setCount(placeOrderDTO.getCount())
.setMoney(placeOrderDTO.getMoney())
.setOrderId(placeOrderDTO.getOrderId())
.setStatus(status);
}

public UpdateOrderDTO buildUpdateOrderDTO(String orderId , Integer status){
return new UpdateOrderDTO()
.setStatus(status)
.setOrderId(orderId);
}

public UpdateStockDTO buildUpdateStockDTO(Integer commodityId , Integer count){
return new UpdateStockDTO()
.setCommodityId(commodityId)
.setCount(count);
}

public IntegralDTO buildUpdateIntegralDTO(Integer integral , Integer userId){
return new IntegralDTO()
.setIntegral(integral)
.setUserId(userId);
}

public AccountDTO buildUpdateAccountDTO(Integer userId , BigDecimal money){
return new AccountDTO()
.setUserId(userId)
.setMoney(money);
}
}


然后调用接口,我们发现TCC模式也需要使用@GlobalTransactional注解。


/**
* @author 刘牌
* @date 2022/3/19:50
*/

@RestController
@AllArgsConstructor
public class PlaceOrderApi {

private final IBusinessService businessService;
private final OrderClient orderClient;

@GlobalTransactional
@PostMapping("placeOrder")
public Response placeOrder(@RequestBody PlaceOrderDTO placeOrderDTO) throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
int tryStatus = 1;
Order order = buildOrder(placeOrderDTO, tryStatus);
orderClient.saveOrder(order);
Map<String, String> map = BeanUtils.describe(placeOrderDTO.setOrderId(order.getOrderId()));
return businessService.placeOrder(map);
}

public Order buildOrder(PlaceOrderDTO placeOrderDTO , Integer status){
return new Order()
.setUserId(placeOrderDTO.getUserId())
.setCommodityId(placeOrderDTO.getCommodityId())
.setCount(placeOrderDTO.getCount())
.setMoney(placeOrderDTO.getMoney())
.setOrderId(IdWorker.fastId())
.setStatus(status);
}
}

查看事务相关的表

因为seata的分布式事务执行很快,如果需要看分布式事务相关表中存储的数据,那么需要debug调试一下,如下。
分布式事务Seata之AT模式,TCC模式,安装教程,原理等

branch_table表

分布式事务Seata之AT模式,TCC模式,安装教程,原理等

global_table表

分布式事务Seata之AT模式,TCC模式,安装教程,原理等

lock_table表

分布式事务Seata之AT模式,TCC模式,安装教程,原理等

undo_log表

分布式事务Seata之AT模式,TCC模式,安装教程,原理等


undo_log里面保存了事务前记录的快照和事务后的快照,json如下,beforeImage是执行事务前,afterImage是事务后,对事务进行回滚就是使用 快照,其他的参数大家可以自己去了解。


{
"@class": "io.seata.rm.datasource.undo.BranchUndoLog",
"xid": "116.198.160.39:8091:2711408664311631873",
"branchId": 2711408664311631885,
"sqlUndoLogs": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType": "UPDATE",
"tableName": "integral",
"beforeImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "integral",
"rows": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": 4,
"value": 1
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "integral",
"keyType": "NULL",
"type": 4,
"value": 170
}
]
]
}
]
]
},
"afterImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "integral",
"rows": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": 4,
"value": 1
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "integral",
"keyType": "NULL",
"type": 4,
"value": 180
}
]
]
}
]
]
}
}
]
]
}


关于Seata的架构介绍和它的AT模式和TCC模式,我们就说完了,也进行了实战,不过只是简单说出了使用的大致内容,这一切都是建立在部署好了 seata-server(事务协调者TC)的前提下,需要进行测试可以使用我的代码demo进行测试。

Seata安装

1.下载seata-server,去github下载

2.解压后进入seata-server

3.进入conf目录,修改file.conf

seata目前支持三种模式,分别为db,redis,file,默认为file,file的效率比较高,因为直接内存操作,但是不能高可用,不能集群,只要seata发生 宕机,数据就会丢失,redis效率高,不过单节点的redis存在风险,应该做好主从。db则是基于数据库,效率不高。

可以直接修改store.mode更换模式,使用db模式需要数据库的支持,需要三张表global_table,branch_table,lock_table。


store {
mode = "db"
publicKey = ""
file {
dir = "sessionStoreGuns"
maxBranchSessionSize = 16384
maxGlobalSessionSize = 512
fileWriteBufferCacheSize = 16384
sessionReloadReadSize = 100
flushDiskMode = async
}

## database store property

db {
datasource = "druid"
dbType = "MySQL"
driverClassName = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://120.78.171.43:3306/seata?rewriteBatchedStatements=true"
user = "root"
password = "123456"
minConn = 5
maxConn = 100
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
maxWait = 5000
}

## redis store property

redis {
## redis mode: single、sentinel
mode = "single"
## single mode property
single {
host = "127.0.0.1"
port = "6379"
}
## sentinel mode property
sentinel {
masterName = ""
## such as "10.28.235.65:26379,10.28.235.65:26380,10.28.235.65:26381"
sentinelHosts = ""
}
password = ""
database = "0"
minConn = 1
maxConn = 10
maxTotal = 100
queryLimit = 100
}
}

4.配置参数导入注册中心

执行seata源码的seata-1.4.2scriptconfig-centernacosnacos-config.sh , 它会导入seata-1.4.2scriptconfig-centerconfig.txt 的配置到nacos注册中心,可对下面配置进行修改。


transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=file
store.publicKey=
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=username
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.mode=single
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
store.redis.sentinel.masterName=
store.redis.sentinel.sentinelHosts=
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
store.redis.password=
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

5.启动seata-server

执行seata-server的bin目录下的seata-server.sh,注意,如果是公网,那么需要指定公网ip,不然客户端注册全局事务和分支事务到seate-server 的时候找不到地址,可以可以指定参数-h , 也可以在seata-server.sh文件中指定ip地址

exec "$JAVACMD" $JAVA_OPTS -server -Xmx2048m -Xms2048m -Xmn1024m -Xss512k -XX:SurvivorRatio=10 -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=256m -XX:MaxDirectMemorySize=1024m -XX:-OmitStackTraceInFastThrow -XX:-UseAdaptiveSizePolicy -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath="$BASEDIR"/logs/java_heapdump.hprof -XX:+DisableExplicitGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=75 -Xloggc:"$BASEDIR"/logs/seata_gc.log -verbose:gc -Dio.netty.leakDetectionLevel=advanced -Dlogback.color.disable-for-bat=true 
-classpath "$CLASSPATH"
-Dapp.name="seata-server"
-Dapp.pid="$$"
-Dapp.repo="$REPO"
-Dapp.home="$BASEDIR"
-Dbasedir="$BASEDIR"
io.seata.server.Server
-h ip地址
"$@"

注意:如果使用db模式,那么在启动的时候需要通过-m参数指定模式,不然就默认为file模式。

./seata-server.sh -m db

项目demo地址

https://gitee.com/steakliu/design-pattern/tree/master/microservices-transaction

今天的分享就到这里,感谢你的观看,下期见


原文始发于微信公众号(刘牌):分布式事务Seata之AT模式,TCC模式,安装教程,原理等

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/150787.html

(1)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!