#TOC
一、使用RocketMQTemplate发送事务消息
首先我们要确定发送什么样的消息,使用RocketMQTemplate发事务消息时程序会自动进入事务监听器类中,所以我们确定发什么样的消息才能在事务监听器中决定是否提交事务:
public class TransactionMQProducerTest {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
Message<String> sendMessage = MessageBuilder.withPayload("这里设置消息体")
.setHeader("消息的属性的key", "消息的属性的值")
.setHeader("KEYS", "消息的key的值")
.build();
List<String> exampleObj = Arrays.asList("a", "b", "c");
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("topicA:tagA", sendMessage, exampleObj);
System.out.println(result);
}
}
然后就是实现事件监听器类,这个类需要实现 RocketMQLocalTransactionListener
接口并加上 @Component
和
@RocketMQTransactionListener
注解:
@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message message, Object object) {
System.out.println(message.getTags());
List<String> exampleObj = (List<String>) object;
String str = exampleObj.get(0);
System.out.println(str);
return RocketMQLocalTransactionState.COMMIT;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message message) {
System.out.println("事务回查");
return RocketMQLocalTransactionState.COMMIT;
}
}
相对于发送普通消息,发送事务消息要实现 RocketMQLocalTransactionListener
接口并加上 @Component
和
@RocketMQTransactionListener
注解。我们调用public TransactionSendResult sendMessageInTransaction(String destination, Message<?> message, Object arg) throws MessagingException;
发送事务消息后会回调到executeLocalTransaction()方法中,sendMessageInTransaction()中的msg和object参数会传入到executeLocalTransaction()方法中的message和object参数中,在executeLocalTransaction()方法中我们根据自己的业务需求来返回RocketMQLocalTransactionState.COMMIT(提交事务,此次发送成功)
、RocketMQLocalTransactionState.UNKNOW(事务回查,进入到checkLocalTransaction函数中)
、RocketMQLocalTransactionState.ROLLBACK(回滚事务,此次发送取消)
三种状态中的一个,在checkLocalTransaction()方法中根据业务需求决定是否提交。
二、使用TransactionMQProducer发送事务消息
public class TransactionMQProducerTest {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
TransactionMQProducer producer = new TransactionMQProducer("命名空间", "生产者组",
new AclClientRPCHook(new SessionCredentials("用户名","密码")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object object) {
System.out.println(message.getTags());
List<String> exampleObj = (List<String>) object;
String str = exampleObj.get(0);
System.out.println(str);
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("事务回查");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message sendMessage = new Message("topicA", "tagA", ("这里设置消息体").getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的属性的键", "消息的属性的值");
sendMessage.setKeys("消息的key");
List<String> exampleObj = Arrays.asList("a", "b", "c");
TransactionSendResult result = producer.sendMessageInTransaction(sendMessage, exampleObj);
System.out.println(result);
producer.shutdown();
}
}
相对于发送普通消息,发送事务消息要设置事务监听器(即实现TransactionListener接口,重写executeLocalTransaction()和checkLocalTransaction()方法
)。我们调用public TransactionSendResult sendMessageInTransaction(Message msg, Object object) throws MQClientException; 发送事务消息后会回调到executeLocalTransaction()方法中,sendMessageInTransaction()中的msg和object参数会传入到executeLocalTransaction()方法中的message和object参数中,在executeLocalTransaction()方法中我们根据自己的业务需求来返回LocalTransactionState.COMMIT_MESSAGE(提交事务,此次发送成功)
、LocalTransactionState.UNKNOW(事务回查,进入到checkLocalTransaction函数中)
、LocalTransactionState.ROLLBACK_MESSAGE(回滚事务,此次发送取消)
三种状态中的一个,在checkLocalTransaction()方法中根据业务需求决定是否提交。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)