RabbitMQ的使用

2023-11-19

安装:

Docker 安装 RabbitMQ

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p  25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management 

4369,25672(Erlang发现&集群端口)

5672,5671(AMQP端口)

15672 (web管理后台端口)

61613,61614(STOMP协议端口)

1883,8883(MQTT协议端口)

RabbitMQ随docker自动启动

docker update rabbitmq --restart=always

配置

设置传送rabbitmq格式化,

@Configuration
public class MyRabbitConfig {

    private RabbitTemplate rabbitTemplate;

    @Primary
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setMessageConverter(messageConverter());
        initRabbitTemplate();
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     *
     */
    // @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate() {

        /**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });


        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }
}

解决消息丢失

1、消息丢失

消息发送出去,由于网络问题没有抵达服务器

做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机 制,可记录到数据库,采用定期扫描重发的方式

做好日志记录,每个消息状态是否都被服务器收到都应该记录 • 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进 行重发

消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚 未持久化完成,宕机。

publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。

自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机

一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重 新入 队

代码方法:

# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

手动接受代码basicAck

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

ture为接受后删除,false为接受后不删除

拒收basicReject

 channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);

ture为拒收后重新入队,false为拒收后丢弃

解决消息重复:

• 2、消息重复 • 消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息 重新由unack变为ready,并发送给其他消费者

• 消息消费失败,由于重试机制,自动又将消息发送出去

• 成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送

消费者的业务消费接口应该设计为幂等性的。比如扣库存有 工作单的状态标志

• 使用防重表(redis/mysql),发送消息每一个都有业务的唯 一标识,处理过就不用处理 •rabbitMQ的每一个消息都有redelivered字段,可以获取是否 是被重新投递过来的,而不是第一次投递过来的

解决消息积压:

• 3、消息积压 • 消费者宕机积压 • 消费者消费能力不足积压 • 发送者发送流量太大

• 上线更多的消费者,进行正常消费

• 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处

简单使用

设计队列流程图

只要使用@bean注解交给spring管理即可,使用全量构造方法创建

@Configuration
public class MyRabbitMQConfig {

    /* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */

    /**
     * 死信队列
     *
     * @return
     */@Bean
    public Queue orderDelayQueue() {
        /*
            Queue(String name,  队列名字
            boolean durable,  是否持久化
            boolean exclusive,  是否排他
            boolean autoDelete, 是否自动删除
            Map<String, Object> arguments) 属性
         */
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "order-event-exchange");
        arguments.put("x-dead-letter-routing-key", "order.release.order");
        arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
        Queue queue = new Queue("order.delay.queue", true, false, false, arguments);

        return queue;
    }

    /**
     * 普通队列
     *
     * @return
     */
    @Bean
    public Queue orderReleaseQueue() {

        Queue queue = new Queue("order.release.order.queue", true, false, false);

        return queue;
    }

    /**
     * TopicExchange
     *
     * @return
     */
    @Bean
    public Exchange orderEventExchange() {
        /*
         *   String name,
         *   boolean durable,
         *   boolean autoDelete,
         *   Map<String, Object> arguments
         * */
        return new TopicExchange("order-event-exchange", true, false);

    }


    @Bean
    public Binding orderCreateBinding() {
        /*
         * String destination, 目的地(队列名或者交换机名字)
         * DestinationType destinationType, 目的地类型(Queue、Exhcange)
         * String exchange,
         * String routingKey,
         * Map<String, Object> arguments
         * */
        return new Binding("order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.create.order",
                null);
    }

    @Bean
    public Binding orderReleaseBinding() {

        return new Binding("order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",
                null);
    }

}

监听者

主类上添加注解

@RabbitListener(queues = "order.release.order.queue")

方法上添加注解

@RabbitHandler
public void listener(OrderEntity orderEntity, Channel channel, Message message)

只需要将参数设置号rabbitMQ就会自动传参

发送消息

rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order.getOrder());

死性队列

安装

1. 首先我们将刚下载下来的
rabbitmq_delayed_message_exchange-3.8.0.ez文件上传到RabbitMQ所在服务器
2. 切换到插件所在目录,
执行 docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins 命令,将刚插件拷贝到容器内plugins目录下
3. 执行 docker exec -it rabbitmq /bin/bash 
命令进入到容器内部,并 cd plugins 进入plugins目录
4. 执行 ls -l|grep delay  命令查看插件是否copy成功
5. 在容器内plugins目录下,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange  命令启用插件
6. exit命令退出RabbitMQ容器内部,然后执行 docker restart rabbitmq 命令重启RabbitMQ容器

下载地址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

使用版本:

出现x-delayed-message交换机视为安装成功

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ的使用 的相关文章

  • Spring RestTemplate 使用 cookie 遵循重定向

    最近我遇到了一个问题 我需要做一个GET请求远程服务 我假设使用一个简单的 servlet 并且 RestTemplate 返回Too many redirects 经过一番调查 似乎对指定远程服务发出的第一个请求实际上只是一个 302 重
  • 通往楼梯顶部的可能路径

    这是一个非常经典的问题 我听说谷歌在他们的面试中使用过这个问题 问题 制定一个递归方法 打印从楼梯底部到楼梯顶部的所有可能的独特路径 有 n 个楼梯 您一次只能走 1 步或 2 步 示例输出 如果它是一个有 3 级楼梯的楼梯 1 1 1 2
  • 如何根据运行的 jar 的结果让我的 ant 任务通过或失败?

    我正在运行 CrossCheck 无浏览器 js 单元测试 作为 ant 脚本的一部分 如果 CrossCheck 测试失败 我希望 ant 报告失败 这是 build xml 中的相关部分
  • 需要使用 joda 进行灵活的日期时间转换

    我想使用 joda 解析电子邮件中的日期时间字符串 不幸的是我得到了各种不同的格式 例如 Wed 19 Jan 2011 12 52 31 0600 Wed 19 Jan 2011 10 15 34 0800 PST Wed 19 Jan
  • 如何检测 Java 字符串中的 unicode 字符?

    假设我有一个包含 的字符串 我如何找到所有这些 un icode 字符 我应该测试他们的代码吗 我该怎么做呢 例如 给定字符串 A X 我想将其转换为 AYXY 我想对其他 unicode 字符做同样的事情 并且我不想将它们存储在某种翻译映
  • 如何根据 HTTP 请求使用 Python 和 Flask 执行 shell 命令并流输出?

    下列的这个帖子 https stackoverflow com questions 15092961 how to continuously display python output in a webpage 我能够tail f网页的日志
  • Linux TUN/TAP:无法从 TAP 设备读回数据

    问题是关于如何正确配置想要使用 Tun Tap 模块的 Linux 主机 My Goal 利用现有的路由软件 以下为APP1和APP2 但拦截并修改其发送和接收的所有消息 由Mediator完成 我的场景 Ubuntu 10 04 Mach
  • Java Swing For mac 中的 DJ Native Swing 浏览器

    我有一个用 Swing 制作的 Java 应用程序 并且使用了一个 DJ Native Swing 浏览器 当我尝试在 OS X 上使用它时 它抛出了一个NoClassDefFoundError尽管我添加了 swt jar 但始终如此 有人
  • 在另一个模块中使用自定义 gradle 插件模块

    我正在开发一个自定义插件 我希望能够在稍后阶段将其部署到存储库 因此我为其创建了一个独立的模块 在对其进行任何正式的 TDD 之前 我想手动进行某些探索性测试 因此 我创建了一个使用给定插件的演示模块 到目前为止 我发现执行此操作的唯一方法
  • 内部存储的安全性如何?

    我需要的 对于 Android 我需要永久保存数据 但也能够编辑 并且显然是读取 它 用户不应访问此数据 它可以包含诸如高分之类的内容 用户不得对其进行编辑 我的问题 我会 并且已经 使用过Internal Storage 但我不确定它实际
  • 读取电子邮件的文本文件转换为 Javamail MimeMessage

    我有一个电子邮件原始来源的文本文件 直接从 gmail 复制 如果您单击 查看原始文件 您就会看到它 我想读入该文件并将其转换为 MimeMessage 如果您好奇为什么 我设置了 JavaMaildir 并且需要用电子邮件填充它的收件箱以
  • GWT 2.3 开发模式 - 托管模式 JSP 编译似乎不使用 java 1.5 兼容性

    无法编译 JSP 类 生成的 servlet 错误 DefaultMessage 上次更新 0 日期 中 0 时间 HH mm ss z 语法 错误 注释仅在源级别为 1 5 时可用 在尝试以开发模式在 Web 浏览器中打开我的 gwt 模
  • 使用 Mockito 模拟某些方法,但不模拟其他方法

    有没有办法使用 Mockito 模拟类中的某些方法 而不模拟其他方法 例如 在这个 诚然是人为的 Stock我想嘲笑的班级getPrice and getQuantity 返回值 如下面的测试片段所示 但我想要getValue 执行乘法 如
  • Freemarker 和 Struts 2,有时它计算为序列+扩展哈希

    首先我要说的是 使用 Struts2 Freemarker 真是太棒了 然而有些事情让我发疯 因为我不明白为什么会发生这种情况 我在这里问是因为也许其他人有一个想法可以分享 我有一个动作 有一个属性 说 private String myT
  • 流中的非终结符 forEach() ?

    有时 在处理 Java Stream 时 我发现自己需要一个非终端 forEach 来触发副作用但不终止处理 我怀疑我可以用 map item gt f item 之类的方法来做到这一点 其中方法 f 执行副作用并将项目返回到流中 但这似乎
  • java库维护数据库结构

    我的应用程序一直在开发 所以偶尔 当版本升级时 需要创建 更改 删除一些表 修改一些数据等 通常需要执行一些sql代码 是否有一个 Java 库可用于使我的数据库结构保持最新 通过分析类似 db structure version 信息并执
  • Docker 绑定安装卷不会传播由角度“ngserve”执行监视的更改事件

    请按照下列步骤操作 定义 Dockerfile FROM node alpine RUN yarn global add angular cli RUN yarn global add node sass RUN mkdir volumes
  • wildfly-logstash 不将日志发送到logstash

    我正在使用 jboss keycloak 11 0 2 和 wildfly logstash https github com kifj wildfly logstash https github com kifj wildfly logs
  • 连接被拒绝:当uwsgi和nginx在不同容器中时

    我正在尝试设置两个 docker 容器 是的 无需 docker compose 分开 一个带有 nginx 另一个带有带有基本 Flask 应用程序的 uwsgi 我在 docker 内的同一网络中运行容器我的 nginx 配置已添加 链
  • Android:无法发送http post

    我一直在绞尽脑汁试图弄清楚如何在 Android 中发送 post 方法 这就是我的代码的样子 public class HomeActivity extends Activity implements OnClickListener pr

随机推荐

  • 20天学会Java-基础阶段笔记

    视频地址 https www bilibili com video BV1Cv411372m 此笔记是 P1 P85 1 开始 1 1 注释 理解 注释是对代码的解释和说明文字 可以提高程序的可读性 因此在程序中添加必要的注释文字十分重要
  • 【千律】C++基础:通过递归函数计算N的阶乘

    include
  • 修改照片尺寸25mm*35mm

    打开方式 画图 gt 调整图片大小 选择像素 gt 修改为295 413即可
  • OpenCloudOS 8 安装rabbitMQ 和Docker

    文章目录 安装环境 Docker CE 安装rabbitMq 安装步骤 1 引入签名 2 为 RabbitMQ 和 Modern Erlang 添加 Yum 仓库 3 更新yum元数据 缓存rabbitmq相关的仓库数据 4 yum安装依赖
  • vue + moment 实现倒计时

    示例 代码 span countDown endDate span 引入日期插件 import moment from moment export default data return now moment endDate 2019 05
  • Windows和iPad传输

    一 电脑操作 1 新建文件夹 2 设置文件夹的属性 选着共享 3 设置高级共享 权限选择完全控制 4 选择共享 选择Everyone 5 在windows搜索栏中输入cmd 打开命令提示符窗口 6 输入ipconfig回车 7 记住你的ip
  • python爬取豆瓣电影json数据

    由于豆瓣里的电影都有专属的id 获取到id后可以进一步爬取其他页面的内容 首先来到主界面 https movie douban com 观察网页 点击 选电影 进入需要爬取的界面 打开Chrome开发模式 并下拉网页观察新生成的文件 可以观
  • Cause: java.sql.SQLIntegrityConstraintViolationException: Column ‘xxx‘ cannot be null

    1 报错信息 2 定位错误的范围 SQL insert into business businessId password salt businessName businessAddress businessExplain starPric
  • 【基于Cocos Creator实现的赛车游戏】9.实现汽车节点的控制逻辑

    转载知识星球 深度连接铁杆粉丝 运营高品质社群 知识变现的工具 项目地址 赛车小游戏 基于Cocos Creator 3 5版本实现 课程的源码 基于Cocos Creator 3 5版本实现 在上一节的课程中 您已经实现了通过触控给刚体施
  • RCP系列-第一章 环境安装

    RCP系列文章 第一章 Matlab安装 Matlab安装 RCP系列文章 前言 一 Matlab 获取 二 安装 1 解压 2 打开解压后的文件夹中的 R2018b win64 文件夹 3 鼠标右击 setup 选择 以管理员身份运行 4
  • oswatch的安装和使用

    author skate time 2011 08 06 oswatch的安装和使用 1 下载和安装 oswatch的安装与使用也比较简单 和nmon一样 都是下载后直接解压就可以使用的 oswatch是通过调用系统的命令完成信息的收集 命
  • 【Transformer系列(3)】 《Attention Is All You Need》论文超详细解读(翻译+精读)

    前言 哒哒 时隔好久终于继续出论文带读了 这次回归当然要出一手王炸呀 没错 今天我们要一起学习的就是传说中的Transformer 在2021年Transformer一经论文 Attention is All You Need 提出 就如龙
  • java数组学习

    2021 2 2 数组 一维数组的使用 1 一维数组的声明和初始化 2 如何调用数组的指定位置的元素 3 如何获取数组的长度 4 如何遍历数组 5 数组元素的默认初始化值 6 数组的内存解析 package day01 import jav
  • PyTorch-12 GAN、WGAN

    PyTorch 12 生成对抗网络 GAN WGAN 参考 https zhuanlan zhihu com p 34287744 GAN模型的目标函数如下 GAN模型优化训练 在训练过程中 生成网络的目标就是尽量生成真实的图片去欺骗判别网
  • ubuntu18.04 桌面卡死解决方法

    转载 https blog csdn net ATOOHOO article details 88169508 两个月关机 放个周末两天回来 使用系统文件夹很卡 直到因为复制文件卡死 内存和交换机空间都没满 第二次桌面又卡 因为心急 或者因
  • CUDA的下载安装

    大家好 下面将进行CUDA的下载安装 下载安装的详细步骤描述如下 1 CUDA下载 https download csdn net download qq 41104871 87462747 2 CUDA安装 1 首先 需要解压缩下载好的C
  • docker入门实践,制作Dockerfile镜像

    目前我知道的自制镜像有2种方式 根据容器制作镜像和根据Dockerfile制作镜像 根据现成的容器制作镜像 适用于已经有一个现成的容器已经满足需求的情况 docker ps a CONTAINER ID IMAGE COMMAND CREA
  • PCAP流量数据集(网络安全)

    MAWI Working Group Traffic Archive URL http mawi wide ad jp mawi CIC dataset Canadian Institute for Cybersecurity datase
  • Kmalloc

    Kmalloc内存分配和malloc相似 除非被阻塞否则他执行的速度非常快 而且不对获得空间清零 Flags参数 include
  • RabbitMQ的使用

    安装 Docker 安装 RabbitMQ docker run d name rabbitmq p 5671 5671 p 5672 5672 p 4369 4369 p 25672 25672 p 15671 15671 p 15672