SpringBoot集成RocketMQ实现事务消息 结合实际业务出发 多业务多场景分析 附带示例代码和解决方案
- 前言
- 前期准备
- SpringBoot集成RocketMQ实现事务消息示例代码
- POM
- 添加application.yml配置信息
- 生产者
- 生产者事务消息监听器
- 消费者
- 前置说明
- 事务消息处理流程图
- 业务场景分析-以及问题解决方案代码实现
- 一、用户签到时发送MQ消息增加用户积分
- 场景1(在事务内部发送增加用户积分消息)
-
- 场景2(在事务提交之后发送增加用户积分消息)
-
- 问题解决方案
- 代码实现方案1
- 消息对象
- 业务逻辑代码+生产者
- 全局事务消息监听
- 消费者
前言
之前看了几篇博客写的RocketMQ事务消息使用场景分析,个人感觉分析的业务不太贴切,这里会对多个业务场景进行事务问题分析,本文不过多的探讨RocketMQ事务消息的实现原理,网上有很多资料,这里只对真实的业务场景重点分析,集成代码包会使用rocketmq-spring-boot-starter 2.2.3。
前期准备
SpringBoot集成RocketMQ实现事务消息示例代码
POM
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--要使用RocketMQ5.x的自定义时间延时消息必须要使用2.2.3及以上的版本-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.74</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.2.5</version>
</dependency>
</dependencies>
添加application.yml配置信息
server:
port: 8888
rocketmq:
# name-server服务地址多个用;隔开 例如127.0.0.1:9876;127.0.0.1:9877
name-server: 192.168.10.220:9876
producer: # 生产者配置
group: group1 # 生产者分组
send-message-timeout: 3000 # 消费者发送消息超时时间单位毫秒
生产者
@Slf4j
@Component
public class SimpleTransationMessageProducer {
@Autowired
RocketMQTemplate rocketMQTemplate;
public void send() {
String msg = "发送事务消息"+RandomUtil.randomInt(100);
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("simple-transation-topic-01", MessageBuilder.withPayload(msg).build(), null);
String transactionId = result.getTransactionId();
String status = result.getSendStatus().name();
log.info("发送消息成功 transactionId={} status={} ",transactionId,status);
}
}
生产者事务消息监听器
发送事务消息必须要有该监听器不然发送时会抛出异常。
@Slf4j
@RocketMQTransactionListener
public class SimpleTransactionMsgListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
int index=2;
switch (index){
case 1:
String jsonStr = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
log.info("本地事务回滚,回滚消息,"+jsonStr);
return RocketMQLocalTransactionState.ROLLBACK;
case 2:
log.info("需要等待Broker进行事务状态回查");
return RocketMQLocalTransactionState.UNKNOWN;
default:
log.info("事务提交,消息正常处理");
return RocketMQLocalTransactionState.COMMIT;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String transactionId = message.getHeaders().get("__transactionId__").toString();
log.info("检查本地事务状态,transactionId:{}", transactionId);
return RocketMQLocalTransactionState.COMMIT;
}
}
消费者
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "simple-transation-group-01", topic = "simple-transation-topic-01")
public class SimpleTransationMessageConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
log.info("收到消息: msgId={} topic={} queueId={} body={}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getQueueId(), body);
}
}
前置说明
在这里我们会使用SpringBoot项目作为基础来衍生各个功能进行说明,在项目中我们使用最多的事务处理方式有两种自动事务@Transactional和手动事务,对于不同的事务方式,发送Rocket事务消息的逻辑也有不同,这里会根据用户签到获取积分业务展开。
事务消息处理流程图
业务场景分析-以及问题解决方案代码实现
一、用户签到时发送MQ消息增加用户积分
场景1(在事务内部发送增加用户积分消息)
在事务内部发送普通消息,这种方式也是我们用的最多的,调用的service方法会加上@Transactional注解整个方法中执行的操作都在一个事务中,我见过大部分的业务都是这样发送的MQ消息,如果不出意外情况这样是没有什么问题,但是出问题了那就是致命的。
问题分析
- 增加积分消息发送成功之后如果事务commit失败,创建签到记录会回滚,但是消息已经成功发出,积分系统会将消息消费。
- 出现原因:数据库服务出现异常,网络问题导致提交超时事物自动回滚、应用服务断电宕机等…
场景2(在事务提交之后发送增加用户积分消息)
问题分析
- 事物提交成功,但是在发送消息时出现异常,没有给用户添加上对应积分。
- 出现原因:RocketMQ服务端出现异常、网络问题导致发送超时、应用服务断电宕机等…
问题解决方案
上面两种场景无非都是要么事物没有提交成功,要么事物提交成功消息没有发送出去,如果我们可以解决这两种问题那系统就可以闭环了,针对这两种场景可以直接使用RocketMQ的事物消息实现。
代码实现方案1
消息对象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SignInMsgDTO {
private String customerNo;
private String signInDate;
private Long point;
}
业务逻辑代码+生产者
这里会使用到函数对象Function作为事务消息的第三个参数传递到全局事务消息监听器中的executeLocalTransaction(Message message, Object o)方法的Object o,在executeLocalTransaction方法中将Object o转换成Function函数调用apply方法传入事务ID,作为业务的回查标记,执行业务本地事务也会在function.apply中执行。
@Slf4j
@Service
public class SignInService {
@Autowired
private SignInCmdExe signInCmdExe;
@Autowired
RocketMQTemplate rocketMQTemplate;
public boolean doSignIn(String customerNo){
long point=100;
SignInMsgDTO signInMsgDTO = new SignInMsgDTO(customerNo,"20230401",point);
Message<SignInMsgDTO> message = MessageBuilder.withPayload(signInMsgDTO).build();
Function function = new Function<String,Boolean>(){
@Override
public Boolean apply(String transactionId) {
boolean execute = signInCmdExe.execute(customerNo, transactionId);
return execute;
}
} ;
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("sign-in-transation-topic-01", message,function);
String localTransactionState = result.getLocalTransactionState().name();
if (LocalTransactionState.ROLLBACK_MESSAGE.name().equals(localTransactionState)) {
log.info("发送事务消息失败 transactionId={} localTransactionState={} ", result.getTransactionId(), localTransactionState);
return false;
}
log.info("发送事务消息成功 transactionId={} localTransactionState={} ", result.getTransactionId(), localTransactionState);
return true;
}
}
SignInCmdExe
@Slf4j
@Service
public class SignInCmdExe {
@Transactional(rollbackFor = Exception.class)
public boolean execute(String customerNo,String transactionId){
int status = 0;
if(status == 1){
log.info("用户已签到,不能重复签到 customerNo={}",customerNo);
return false;
}
log.info("用户签到成功 customerNo={} 签到记录transactionId={}",customerNo,transactionId);
return true;
}
}
全局事务消息监听
@Slf4j
@RocketMQTransactionListener
public class SignInTransactionMsgListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
String transactionId = null;
try{
transactionId = message.getHeaders().get("__transactionId__").toString();
log.info("执行本地事务 transactionId={}",transactionId);
if (o == null) {
log.info("事务消息回滚,没有需要处理的业务 transactionId={}",transactionId);
return RocketMQLocalTransactionState.ROLLBACK;
}
Function<String, Boolean> function = (Function<String, Boolean>) o;
Boolean apply = function.apply(transactionId);
if (apply) {
log.info("事务提交,消息正常处理 transactionId={}",transactionId);
return RocketMQLocalTransactionState.COMMIT;
}
log.info("事务消息回滚,业务本地事务执行失败回滚事务消息 transactionId={}",transactionId);
}catch (Exception e){
log.info("出现异常 返回ROLLBACK transactionId={}",transactionId);
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String transactionId = message.getHeaders().get("__transactionId__").toString();
log.info("检查本地事务状态,transactionId:{}", transactionId);
if (isSuccess(transactionId)) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
private boolean isSuccess(String transactionId) {
return true;
}
}
消费者
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "sign-in-transation-group-01", topic = "sign-in-transation-topic-01")
public class SignInTransationMessageConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
log.info("收到用户签到消息: msgId={} topic={} queueId={} body={}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getQueueId(), body);
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)