SpringBoot分布式事务之可靠消息最终一致性

导读:本篇文章讲解 SpringBoot分布式事务之可靠消息最终一致性,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

环境:springboot2.3.9 + RocketMQ4.8.0


可靠消息最终一致性原理

SpringBoot分布式事务之可靠消息最终一致性

 

  • 执行流程
  1. Producer发送Prepare message到broker。
  2. Prepare Message发送成功后开始执行本地事务。
  3. 如果本地事务执行成功的话则返回commit,如果执行失败则返回rollback。(这个是在事务消息的回调方法里由开发者自己决定commit or rollback)
  4. Producer发送上一步的commit还是rollback到broker,这里有以下两种情况:

1、如果broker收到了commit/rollback消息 :

如果收到了commit,则broker认为整个事务是没问题的,执行成功的。那么会下发消息给Consumer端消费。

如果收到了rollback,则broker认为本地事务执行失败了,broker将会删除Half Message,不下发给Consumer端。

2、如果broker未收到消息(如果执行本地事务突然宕机了,相当执行本地事务(executeLocalTransaction)执行结果返回unknow,则和broker未收到确认消息的情况一样处理。):

broker会定时回查本地事务的执行结果:如果回查结果是本地事务已经执行则返回commit,若未执行,则返回unknow。

Producer端回查的结果发送给Broker。Broker接收到的如果是commit,则broker视为整个事务执行成功,如果是rollback,则broker视为本地事务执行失败,broker删除Half Message,不下发给consumer。如果broker未接收到回查的结果(或者查到的是unknow),则broker会定时进行重复回查,以确保查到最终的事务结果。重复回查的时间间隔和次数都可配。

工程结构

SpringBoot分布式事务之可靠消息最终一致性

 

建立父子工程,两个子项目account-manager,integral-manager。


依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
	 <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
	 <groupId>org.springframework.boot</groupId>
	 <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
	 <groupId>org.apache.rocketmq</groupId>
	 <artifactId>rocketmq-spring-boot-starter</artifactId>
	 <version>2.2.0</version>
</dependency>

Account子模块

  • 配置文件
server:
  port: 8081
---
rocketmq:
  nameServer: localhost:9876
  producer:
    group: pack-mq
---
spring:
  jpa:
    generateDdl: false
    hibernate:
      ddlAuto: update
    openInView: true
    show-sql: true
---
spring:
  datasource:
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/account?serverTimezone=GMT%2B8
    username: root
    password: ******
    type: com.zaxxer.hikari.HikariDataSource
    hikari:
      minimumIdle: 10
      maximumPoolSize: 200
      autoCommit: true
      idleTimeout: 30000
      poolName: MasterDatabookHikariCP
      maxLifetime: 1800000
      connectionTimeout: 30000
      connectionTestQuery: SELECT 1
  • 业务实体类
// 用户表
@Entity
@Table(name = "t_account")
public class Account {
	@Id
	private Long id;
	private String name ;
}
// 业务记录表(用来查询去重)
@Entity
@Table(name = "t_account_log")
public class AccountLog {
	@Id
	private Long txid;
	private Date createTime ;
}  
  • DAO相关类
public interface AccountRepository extends JpaRepository<Account, Long> {
}
public interface AccountLogRepository extends JpaRepository<AccountLog, Long> {
}
  • Service相关类
@Resource
private AccountRepository accountRepository ;
@Resource
private AccountLogRepository accountLogRepository ;
	
// 该方法保存业务数据,同时保存操作记录;操作记录用来回查。
@Transactional
public boolean register(Account account) {
	accountRepository.save(account) ;
	AccountLog accountLog = new AccountLog(account.getId(), new Date()) ;
	accountLogRepository.save(accountLog) ;
	return true ;
}
	
public AccountLog existsTxId(Long txid) {
	return accountLogRepository.findById(txid).orElse(null) ;
}
  • 发送消息方法
@Resource
private RocketMQTemplate rocketMQTemplate ;
	
public String sendTx(String topic, String tags, Account account) {
	String uuid = UUID.randomUUID().toString().replaceAll("-", "") ;
	TransactionSendResult result =rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(account).
			setHeader("tx_id", uuid).build(), uuid) ;
	return result.getSendStatus().name() ;
}
  • 消息监听(生产者监听)
@RocketMQTransactionListener
public class ProducerMessageListener implements RocketMQLocalTransactionListener {
	
	@Resource
	private AccountService accountService ;

	@Override
	public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		try {
			Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ;
			accountService.register(account) ;
		} catch (Exception e) {
			e.printStackTrace() ;
			return RocketMQLocalTransactionState.ROLLBACK ;
		}
		return RocketMQLocalTransactionState.COMMIT ;
	}

	@Override
	public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
		// 这里检查本地事务是否执行成功
		try {
			Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ;
			System.out.println("执行查询ID为:" + account.getId() + " 的数据是否存在") ;
			AccountLog accountLog = accountService.existsTxId(account.getId()) ;
			if (accountLog == null) {
				return RocketMQLocalTransactionState.UNKNOWN ;
			}
		} catch (Exception e) {
			e.printStackTrace() ;
			return RocketMQLocalTransactionState.UNKNOWN ;
		}
		return RocketMQLocalTransactionState.COMMIT ;
	}

}
  • Controller接口
@RestController
@RequestMapping("/accounts")
public class AccountController {
	@Resource
	private ProducerMessageService messageService ;
	@PostMapping("/send")
	public Object sendMessage(@RequestBody Account account) {
		return messageService.sendTx("tx-topic", "mks", account) ;
	}
}

Integral子模块

  • 业务实体类
@Entity
@Table(name = "t_integral")
public class Integral {
	@Id
	private Long id;
	private Integer score ;
	private Long acccountId ;
}  
  • DAO相关类
public interface IntegralRepository extends JpaRepository<Integral, Long> {
}
  • Service相关类
@Resource
private IntegralRepository integralRepository ;
	
@Transactional
public Integral saveIntegral(Integral integral) {
	return integralRepository.save(integral) ;
}
  • 消息监听
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "mks")
@Component
public class IntegralMessageListener implements RocketMQListener<String> {

	@Resource
	private IntegralService integralService ;
	
	@SuppressWarnings("unchecked")
	@Override
	public void onMessage(String message) {
		System.out.println("Integral接收到消息:" + message) ;
		try {
			Map<String, Object> jsonMap = new JsonMapper().readValue(message, Map.class) ;
			Integer id = (Integer) jsonMap.get("id") ;
			integralService.saveIntegral(new Integral(1L, 1000, id + 0L)) ;
		} catch (Exception e) {
			throw new RuntimeException(e) ;
		}
	}

}

测试

分别启动两个子模块

  • 初始数据表

SpringBoot分布式事务之可靠消息最终一致性

 

  • Postman测试

SpringBoot分布式事务之可靠消息最终一致性

Account模块

SpringBoot分布式事务之可靠消息最终一致性

Integral模块

SpringBoot分布式事务之可靠消息最终一致性

 

当子模块Account执行本地事务发生错误时,事务会回滚并且删除消息。子模块Integral并不会收到消息。

完毕!!!

给个关注+转发谢谢

SpringBoot分布式事务之可靠消息最终一致性

 

SpringBoot分布式事务之可靠消息最终一致性

 

SpringBoot分布式事务之可靠消息最终一致性

 

SpringBoot分布式事务之可靠消息最终一致性

SpringBoot分布式事务之可靠消息最终一致性

SpringBoot整合RocketMQ入门示例

RocketMQ修改消息日志存储路径

SpringBoot整合RocketMQ事务/广播/顺序消息

RabbitMQ消息确认机制confirm

SpringBoot RabbitMQ消息可靠发送与接收

SpringBoot使用WebSocket实现即时消息

Springboot Security 基础应用 (1)

SpringBoot中使用Cache及JSR107的使用

Kafka(zookeeper)环境配置超级详细

zookeeper实现分布式缓存

zookeeper实现服务注册与发现

zookeeper实现分布式ID

Springboot之Actuator详解

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由半码博客整理,本文链接:https://www.bmabk.com/index.php/post/80048.html

(0)

相关推荐

半码博客——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!