RocketMQ发送事务消息的所有方法以及代码示例

2023-05-16

#TOC

一、使用RocketMQTemplate发送事务消息

首先我们要确定发送什么样的消息,使用RocketMQTemplate发事务消息时程序会自动进入事务监听器类中,所以我们确定发什么样的消息才能在事务监听器中决定是否提交事务:

public class TransactionMQProducerTest {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
    	Message<String> sendMessage = MessageBuilder.withPayload("这里设置消息体")
                .setHeader("消息的属性的key", "消息的属性的值")
                // 想发送带key的消息,请求头的键必须写成KEYS
                .setHeader("KEYS", "消息的key的值")
                .build();
    	List<String> exampleObj = Arrays.asList("a", "b", "c");
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("topicA:tagA", sendMessage, exampleObj);
        System.out.println(result);
    }
}

然后就是实现事件监听器类,这个类需要实现 RocketMQLocalTransactionListener接口并加上 @Component
@RocketMQTransactionListener 注解:

@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message message, Object object) {
            System.out.println(message.getTags());
            List<String> exampleObj = (List<String>) object;
            String str = exampleObj.get(0);
            System.out.println(str);
            // 提交事务,此次发送成功
            return RocketMQLocalTransactionState.COMMIT;

            // 回滚事务,此次发送取消
//          return RocketMQLocalTransactionState.ROLLBACK;

            // 事务回查,调用checkLocalTransaction函数
//          return RocketMQLocalTransactionState.UNKNOW;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message message) {
    		System.out.println("事务回查");
       		return RocketMQLocalTransactionState.COMMIT;
    }
}

相对于发送普通消息,发送事务消息要实现 RocketMQLocalTransactionListener接口并加上 @Component
@RocketMQTransactionListener 注解。我们调用public TransactionSendResult sendMessageInTransaction(String destination, Message<?> message, Object arg) throws MessagingException; 发送事务消息后会回调到executeLocalTransaction()方法中,sendMessageInTransaction()中的msg和object参数会传入到executeLocalTransaction()方法中的message和object参数中,在executeLocalTransaction()方法中我们根据自己的业务需求来返回RocketMQLocalTransactionState.COMMIT(提交事务,此次发送成功)RocketMQLocalTransactionState.UNKNOW(事务回查,进入到checkLocalTransaction函数中)RocketMQLocalTransactionState.ROLLBACK(回滚事务,此次发送取消)三种状态中的一个,在checkLocalTransaction()方法中根据业务需求决定是否提交。

二、使用TransactionMQProducer发送事务消息

public class TransactionMQProducerTest {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        TransactionMQProducer producer = new TransactionMQProducer("命名空间", "生产者组",
                new AclClientRPCHook(new SessionCredentials("用户名","密码")),
                true, "trace-topic");
        producer.setNamesrvAddr("nameServer集群IP");
        // 设置事务监听器
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object object) {
                System.out.println(message.getTags());
                List<String> exampleObj = (List<String>) object;
                String str = exampleObj.get(0);
                System.out.println(str);
                // 提交事务,此次发送成功
                return LocalTransactionState.COMMIT_MESSAGE;

                // 回滚事务,此次发送取消
//                return LocalTransactionState.ROLLBACK_MESSAGE;

                // 事务回查,进入到checkLocalTransaction函数中
//                return LocalTransactionState.UNKNOW;
            }
            
            // executeLocalTransaction()返回LocalTransactionState.UNKNOW的时候就会进入到此方法中
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("事务回查");
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();
        Message sendMessage = new Message("topicA", "tagA", ("这里设置消息体").getBytes(StandardCharsets.UTF_8));
        sendMessage.putUserProperty("消息的属性的键", "消息的属性的值");
        sendMessage.setKeys("消息的key");
        List<String> exampleObj = Arrays.asList("a", "b", "c");
		TransactionSendResult result = producer.sendMessageInTransaction(sendMessage, exampleObj);
        System.out.println(result);
        producer.shutdown();
    }
}

相对于发送普通消息,发送事务消息要设置事务监听器(即实现TransactionListener接口,重写executeLocalTransaction()和checkLocalTransaction()方法)。我们调用public TransactionSendResult sendMessageInTransaction(Message msg, Object object) throws MQClientException; 发送事务消息后会回调到executeLocalTransaction()方法中,sendMessageInTransaction()中的msg和object参数会传入到executeLocalTransaction()方法中的message和object参数中,在executeLocalTransaction()方法中我们根据自己的业务需求来返回LocalTransactionState.COMMIT_MESSAGE(提交事务,此次发送成功)LocalTransactionState.UNKNOW(事务回查,进入到checkLocalTransaction函数中)LocalTransactionState.ROLLBACK_MESSAGE(回滚事务,此次发送取消)三种状态中的一个,在checkLocalTransaction()方法中根据业务需求决定是否提交。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ发送事务消息的所有方法以及代码示例 的相关文章

  • jenkins安装和配置(一):ubuntu 20.04 jenkins安装

    参考 Installing Jenkins 参考 How to Install and Configure Jenkins on Ubuntu 20 04 LTS 一 安装前提 Java 8 or Java 11 are required
  • 泛型 知识点 总结

    为什么要有泛型 泛型实质上就是使程序员定义安全的类型 在没有出现泛型之前 java也提供了对Objct的引用 34 任意化 34 操作 这种 34 任意化 34 操作就是对Object引用进行向下转型及向上转型操作 但是某些强制类型转换的错
  • 手把手教你centos8/rhel8使用国内源安装virtualbox If your system is using EFI Secure Boot you may need to

    这两天装virtualbox的时候又遇到问题了 xff0c 它显示这个错误 xff1a If your system is using EFI Secure Boot you may need to sign the kernel modu
  • gitlab配置

    在Ubuntu下安装gitlab ce 首先全程使用root进行配置 更新apt源 apt update 安装依赖项 Postfix Configuration 选择 No configuration 就好 apt span class t
  • vs2019更新后的设置问题for(

    最近不小心把vs2019更新了一下 xff0c 虽然增加了很多新功能 xff0c 但是对于我来说最大的意义在于它的字体颜色好像更丰富了 xff0c 其他的也不太能用得到 但是更新之后 xff0c 写for循环的时候发现for i 61 0
  • 插入最少的字符,构造回文串

    题目 xff1a 题目链接 给一个字符串 xff0c 在任意的位置插入一些字符 xff0c 使得这个字符串成为回文串 xff0c 最少需要插入多少个字符 ps xff1a 有一个和这个题很像的题 xff0c 都是插入最少的字符构造回文串 x
  • Linux常用命令大全

    Linux常用命令大全 一 进程管理 xff08 1 xff09 Linux中Kill进程的方法 二 系统信息三 关机 系统的关机 重启以及登出 四 文件和目录五 文件搜索六 挂载一个文件系统七 磁盘空间八 系统负载 top九 用户和群组十
  • 字符的最少变换次数,并查集

    题意 xff1a 给两个字符串a xff0c b a合和b中的字符都是前20个字母 xff08 a t xff09 xff0c a需要进行一些变换 xff0c 使得a等于b 对于每一次变换 xff1a 选取a中相同的字符 xff0c 然后把
  • 并查集 判断无向图是否有环

    无向图中 xff0c 给定一些边 xff0c 然后判断这些边组成的图是否有环 注意这个方法必须保证没有输入重边 对于一条边用 a b 表示 xff0c 然后把a xff0c b加入到并查集中 如果又加入了一条边 b c xff0c 那么如果
  • 彻底地删除垃圾软件

    当不小心下载了垃圾软件 xff0c 想删除掉 xff0c 但是又删不掉 xff0c 或者是删不干净的一些办法 首先 xff0c 我们应该把软件关掉 xff0c 让它停止运行 然后找到软件所在的文件夹 xff0c 把一些能够删除的文件都删除掉
  • Aizu_GRL_1_c floyd算法,判断负环的题

    这题应该是很简单的 xff0c 但是我wa了好久 看了别人的博客 xff0c 还是想不明白哪有问题 xff0c 我发现别人的ac代码要判断一下 xff0c mp i k 和mp k jj 是不是等于inf xff0c 我觉得判不判断没啥区别
  • codeforces 1165D(求因子的问题)

    题意 xff1a 给出除了1和它本身之外的所有因子 xff0c 找出最小的这个数 xff0c 如果有冲突的数就输出 1 思路 xff1a 我是当时没做出来看了大佬的博客才知道 https www cnblogs com fengxunlin
  • cmd 复制文件语法不正确

    复制文件到另一个文件夹 xff1a 语法格式 xff1a copy 原文件的路径和名称 目标文件夹的路径 说语法不正确的时候可能是路径中有空格 比如C Program Files x86 CodeBlocks share CodeBlock
  • 关于斜率的计算

    codeforces 842A 从L到R中选取一个数作为分子 xff0c 从x到y中选取一个数作为分母 xff0c 给一个数k 问有没有可能这个分数等于k 这题也是有够坑的 xff0c 用二分精确的判断要超时 xff0c 但是只是看在不在范
  • c语言线性表实现电话簿(学生信息)

    c语言线性表实现电话簿 span class token macro property span class token directive keyword include span span class token string lt s
  • vsc code-runner插件运行python文件,解释器更改为ananconda

    code runner刚下载下来的默认解释器是系统自带的 xff0c 并不是anaconda的 选择文件 首选项 找到code runner executor map 找到python的命令 xff0c 改为自己的解释器位置即可
  • NT_STATUS_ACCESS_DENIED listing \*

    在centos8上配了个SAMBA与windows共享文件 访问共享目录出现 NT STATUS ACCESS DENIED listing xff0c smb gt span class token operator span span
  • RocketMQ发送普通消息的所有方法以及代码示例

    RocketMQ发送普通消息的所有方法以及代码示例 一 使用RocketMQTemplate发送消息 xff08 整合Springboot xff09 xff08 1 xff09 void send Message lt gt messag
  • C语言总结day01

    day01 1 C语言标识符 C语言标识符需满足以下条件 只能由英文字母 26个英文字母 xff0c 包括大小写即共52个 数字 0 9 和下划线组成长度为1 32必须以英文字母或下划线开头 2 C语言风格 C语言严格区分英文字母的大小写C
  • C语言总结day02

    day02 1 运算符 算术运算符 单目运算符 xff1a 43 正 xff1b 负 xff1b 双目运算符 xff1a 这三个同级 gt 43 这两个同级 注意 xff1a 前边三个的优先级大于后边的两个 xff1b 双目运算符两边运算数

随机推荐

  • C语言总结day03

    day03 1 数据的输入输出 数据的输出 xff1a 从计算机向输出设备 如显示器 打印机等 输出数据称为输出 数据的输入 xff1a 从输入设备 如键盘 磁盘 光盘 扫描仪等 向计算机输入数据称为输入 C语言函数库中有一批 34 标准输
  • C语言总结day04

    day04 1 int a n 是错误的 C语言中不允许对数组的大小作动态定义 2 字符数组的初始化 char a 61 I am Student char a 10 利用for语句对每个进行初始化 3 字符数组的输入和输出输入 char
  • C语言总结day06

    day06 1 定义和使用结构体变量 C语言允许用户建立由不同类型数据组成的组合型的数据结构 xff0c 它称为结构体 声明格式 struct 结构体名 成员列表 定义结构体类型变量 struct 结构体名 结构体变量 在声明类型的同时定义
  • C语言总结day07

    day07 一些概念理解 1 为什么使用指针 每一个编程语言都使用指针C 43 43 将指针暴露给了 用户 xff08 程序员 xff09 xff0c 而java和C 等语言则将指针隐蔽起来了 2 指针和引用的区别 本质 xff1a 引用是
  • C语言总结day05

    day05 1 函数间可以相互调用 xff0c 但是不能调用main函数 xff0c main函数是被操作系统调用的 2 数组作为函数参数 数组元素可以作函数参数 值传递 注意 xff1a 数组元素可以用作函数实参 xff0c 不能用作形参
  • SSM-Spring入门

    Spring学习 1 Spring简介 Spring是什么 Spring是分层的Java SE EE应用full stack轻量级开源框架 xff0c 是以loC Inverse Of Contorl 反转控制 和AOP Aspect Or
  • SSM-Spring学习(二)

    Spring IoC和DI注解开发 Spring配置数据源 数据源 xff08 连接池 xff09 介绍 xff1a 数据库连接池概念 百度百科 xff1a 数据库连接池负责分配 管理和释放数据库连接 xff0c 它允许应用程序重复使用一个
  • SSM-Spring(三)-AOP

    简介 什么是 AOP AOP 为 span class token class name Aspect span span class token class name Oriented span span class token clas
  • RocetMQ发送顺序消息的所有方法以及代码示例

    RocetMQ发送顺序消息的所有方法以及代码示例 一 使用RocketMQTemplate发送顺序消息 xff08 1 xff09 SendResult syncSendOrderly String destination Message
  • Spring(四)---Spring Jdbc Template基本使用

    1 概述 JdbcTemplate是spring框架中提供的一个对象 xff0c 是对原始繁琐的Jdbc API对象的简单封装 spring框架 为我们提供了很多的操作模板类 例如 xff1a 操作关系型数据的JdbcTemplate和Hi
  • SpringMVC(一)------快速入门

    Spring与Web环境集成 ApplicationContext应用上下文获取方式 应用上下文对象是通过new ClasspathXmlApplicationContext spring配置文件 方式获取的 xff0c 但是每次从容器中获
  • SpringMVC(二)-----SpringMVC的请求和响应

    SpringMVC的数据响应 SpringMVC的数据响应 数据响应方式 理解 页面跳转 直接返回字符串通过ModelAndView对象返回 回写数据 直接返回字符串返回对象或集合 SpringMVC的数据响应 页面跳转 返回字符串形式 x
  • SpringMVC(三)------SpringMVC的文件上传

    SpringMVC的请求 文件上传 客户端表单实现 应用 文件上传客户端表单需要满足 xff1a 表单项type 61 file 表单的提交方式是post表单的enctype属性是多部分表单形式 xff0c 及enctype 61 mult
  • Keil 安装

    Keil v5 C51和MDK共存 准备工具 C51版本 Keil xff08 如C51v959 xff09 MDK版本 xff08 如MDK525 xff09 激活工具 keil keygen 需要的Pack包 Keil STM32F1x
  • Kubernetes安装

    使用kubeadm创建集群 基础环境 一台兼容的 Linux 主机 Kubernetes 项目为基于 Debian 和 Red Hat 的 Linux 发行版以及一些不提供包管理器的发行版提供通用的指令每台机器 2 GB 或更多的 RAM
  • Java集合学习

    Java集合图的详解 Java集合详解 第一部分 Collection xff08 继承了Iteratable接口 xff09 和Map xff0c 是集合框架的根接口集合类存放于java util包中 集合类存放的都是对象的引用 xff0
  • yaml文件格式总结

    yaml文件格式总结 概念基本语法数据类型YAML 对象 YAML 数组复合结构纯量引用 概念 YAML 是 YAML Ain t a Markup Language xff08 YAML 不是一种标记语言 xff09 的递归缩写 在开发的
  • tmux使用教程

    tmux 的安装 Centos系统中使用 yum 来安装 tmuxyum install tmuxUbuntu系统使用apt安装tmuxapt install tmuxMac OS 使用 brew 来安装 tmuxbrew install
  • SSM-Spring快速入门(基于maven)

    Spring IoC amp DI 开发流程 xff08 1 xff09 导入Spring开发的基本包坐标 span class token generics span class token punctuation lt span pro
  • RocketMQ发送事务消息的所有方法以及代码示例

    TOC 一 使用RocketMQTemplate发送事务消息 首先我们要确定发送什么样的消息 xff0c 使用RocketMQTemplate发事务消息时程序会自动进入事务监听器类中 xff0c 所以我们确定发什么样的消息才能在事务监听器中