使用Java进行操作RabbitMQ

2023-11-01

使用Java操作消息队列

现在我们来看看如何通过Java连接到RabbitMQ服务器并使用消息队列进行消息发送(这里一起讲解,包括Java基础版本和SpringBoot版本),首先我们使用最基本的Java客户端连接方式:


 
 
  1. <dependency>
  2. <groupId>com.rabbitmq </groupId>
  3. <artifactId>amqp-client </artifactId>
  4. <version>5.14.2 </version>
  5. </dependency>

依赖导入之后,我们来实现一下生产者和消费者,首先是生产者,生产者负责将信息发送到消息队列:


 
 
  1. public static void main (String[] args) {
  2. //使用ConnectionFactory来创建连接
  3. ConnectionFactory factory = new ConnectionFactory();
  4. //设定连接信息,基操
  5. factory.setHost( "192.168.0.12");
  6. factory.setPort( 5672); //注意这里写5672,是amqp协议端口
  7. factory.setUsername( "admin");
  8. factory.setPassword( "admin");
  9. factory.setVirtualHost( "/test");
  10. //创建连接
  11. try( Connection connection = factory.newConnection()){
  12. } catch (Exception e){
  13. e.printStackTrace();
  14. }
  15. }

这里我们可以直接在程序中定义并创建消息队列(实际上是和我们在管理页面创建一样的效果)客户端需要通过连接创建一个新的通道(Channel),同一个连接下可以有很多个通道,这样就不用创建很多个连接也能支持分开发送了。


 
 
  1. try( Connection connection = factory.newConnection();
  2. Channel channel = connection.createChannel()){ //通过Connection创建新的Channel
  3. //声明队列,如果此队列不存在,会自动创建
  4. channel.queueDeclare( "yyds", false, false, false, null);
  5. //将队列绑定到交换机
  6. channel.queueBind( "yyds", "amq.direct", "my-yyds");
  7. //发布新的消息,注意消息需要转换为byte[]
  8. channel.basicPublish( "amq.direct", "my-yyds", null, "Hello World!".getBytes());
  9. } catch (Exception e){
  10. e.printStackTrace();
  11. }

其中queueDeclare方法的参数如下:

  • queue:队列的名称(默认创建后routingKey和队列名称一致)
  • durable:是否持久化。
  • exclusive:是否排他,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。排他队列是基于Connection可见,同一个Connection的不同Channel是可以同时访问同一个连接创建的排他队列,并且,如果一个Connection已经声明了一个排他队列,其他的Connection是不允许建立同名的排他队列的,即使该队列是持久化的,一旦Connection关闭或者客户端退出,该排他队列都会自动被删除。
  • autoDelete:是否自动删除。
  • arguments:设置队列的其他一些参数,这里我们暂时不需要什么其他参数。

其中queueBind方法参数如下:

  • queue:需要绑定的队列名称。
  • exchange:需要绑定的交换机名称。
  • routingKey:不用多说了吧。

其中basicPublish方法的参数如下:

  • exchange: 对应的Exchange名称,我们这里就使用第二个直连交换机。
  • routingKey:这里我们填写绑定时指定的routingKey,其实和之前在管理页面操作一样。
  • props:其他的配置。
  • body:消息本体。

执行完成后,可以在管理页面中看到我们刚刚创建好的消息队列了:

并且此消息队列已经成功与amq.direct交换机进行绑定:

那么现在我们的消息队列中已经存在数据了,怎么将其读取出来呢?我们来看看如何创建一个消费者:


 
 
  1. public static void main (String[] args) throws IOException, TimeoutException {
  2. ConnectionFactory factory = new ConnectionFactory();
  3. factory.setHost( "10.37.129.4");
  4. factory.setPort( 5672);
  5. factory.setUsername( "admin");
  6. factory.setPassword( "admin");
  7. factory.setVirtualHost( "/test");
  8. //这里不使用try-with-resource,因为消费者是一直等待新的消息到来,然后按照
  9. //我们设定的逻辑进行处理,所以这里不能在定义完成之后就关闭连接
  10. Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel();
  12. //创建一个基本的消费者
  13. channel.basicConsume( "yyds", false, (s, delivery) -> {
  14. System.out.println( new String(delivery.getBody()));
  15. //basicAck是确认应答,第一个参数是当前的消息标签,后面的参数是
  16. //是否批量处理消息队列中所有的消息,如果为false表示只处理当前消息
  17. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  18. //basicNack是拒绝应答,最后一个参数表示是否将当前消息放回队列,如果
  19. //为false,那么消息就会被丢弃
  20. //channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
  21. //跟上面一样,最后一个参数为false,只不过这里省了
  22. //channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
  23. }, s -> {});
  24. }

其中basicConsume方法参数如下:

●queue  -  消息队列名称,直接指定。
●autoAck - 自动应答,消费者从消息队列取出数据后,需要跟服务器进行确认应答,当服务器收到确认后,会自动将消息删除,如果开启自动应答,那么消息发出后会直接删除。
●deliver  -  消息接收后的函数回调,我们可以在回调中对消息进行处理,处理完成后,需要给服务器确认应答。
●cancel  -  当消费者取消订阅时进行的函数回调,这里暂时用不到。

现在我们启动一下消费者,可以看到立即读取到我们刚刚插入到队列中的数据:


我们现在继续在消息队列中插入新的数据,这里直接在网页上进行操作就行了,同样的我们也可以在消费者端接受并进行处理。

现在我们把刚刚创建好的消息队列删除。

官方文档:Spring AMQP

前面我们已经完成了RabbitMQ的安装和简单使用,并且通过Java连接到服务器。现在我们来尝试在SpringBoot中整合消息队列客户端,首先是依赖:
 


 
 
  1. <dependency>
  2. <groupId>org.springframework.boot </groupId>
  3. <artifactId>spring-boot-starter-amqp </artifactId>
  4. </dependency>

接着我们需要配置RabbitMQ的地址等信息:


 
 
  1. spring:
  2. rabbitmq:
  3. addresses: 192.168.0.4
  4. username: admin
  5. password: admin
  6. virtual-host: / test

这样我们就完成了最基本信息配置,现在我们来看一下,如何像之前一样去声明一个消息队列,我们只需要一个配置类就行了:


 
 
  1. @Configuration
  2. public class RabbitConfiguration {
  3. @Bean("directExchange") //定义交换机Bean,可以很多个
  4. public Exchange exchange (){
  5. return ExchangeBuilder.directExchange( "amq.direct").build();
  6. }
  7. @Bean("yydsQueue") //定义消息队列
  8. public Queue queue (){
  9. return QueueBuilder
  10. .nonDurable( "yyds") //非持久化类型
  11. .build();
  12. }
  13. @Bean("binding")
  14. public Binding binding (@Qualifier("directExchange") Exchange exchange,
  15. @Qualifier("yydsQueue") Queue queue){
  16. //将我们刚刚定义的交换机和队列进行绑定
  17. return BindingBuilder
  18. .bind(queue) //绑定队列
  19. .to(exchange) //到交换机
  20. .with( "my-yyds") //使用自定义的routingKey
  21. .noargs();
  22. }
  23. }

接着我们来创建一个生产者,这里我们直接编写在测试用例中:


 
 
  1. @SpringBootTest
  2. class SpringCloudMqApplicationTests {
  3. //RabbitTemplate为我们封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可
  4. @Resource
  5. RabbitTemplate template;
  6. @Test
  7. void publisher () {
  8. //使用convertAndSend方法一步到位,参数基本和之前是一样的
  9. //最后一个消息本体可以是Object类型,真是大大的方便
  10. template.convertAndSend( "amq.direct", "my-yyds", "Hello World!");
  11. }
  12. }

现在我们来运行一下这个测试用例:

可以看到后台自动声明了我们刚刚定义好的消息队列和交换机以及对应的绑定关系,并且我们的数据也是成功插入到消息队列中:

现在我们再来看看如何创建一个消费者,因为消费者实际上就是一直等待消息然后进行处理的角色,这里我们只需要创建一个监听器就行了,它会一直等待消息到来然后再进行处理:


 
 
  1. @Component //注册为Bean
  2. public class TestListener {
  3. @RabbitListener(queues = "yyds") //定义此方法为队列yyds的监听器,一旦监听到新的消息,就会接受并处理
  4. public void test (Message message){
  5. System.out.println( new String(message.getBody()));
  6. }
  7. }

接着我们启动服务器:

可以看到控制台成功输出了我们之前放入队列的消息,并且管理页面中也显示此消费者已经连接了:

接着我们再通过管理页面添加新的消息看看,也是可以正常进行接受的。

当然,如果我们需要确保消息能够被消费者接受并处理,然后得到消费者的反馈,也是可以的:


 
 
  1. @Test
  2. void publisher () {
  3. //会等待消费者消费然后返回响应结果
  4. Object res = template.convertSendAndReceive( "amq.direct", "my-yyds", "Hello World!");
  5. System.out.println( "收到消费者响应:"+res);
  6. }

消费者这边只需要返回一个对应的结果即可:


 
 
  1. @RabbitListener(queues = "yyds")
  2. public String receiver (String data){
  3. System.out.println( "一号消息队列监听器 "+data);
  4. return "收到!";
  5. }

测试没有问题:

那么如果我们需要直接接收一个JSON格式的消息,并且希望直接获取到实体类呢?


 
 
  1. @Data
  2. public class User {
  3. int id;
  4. String name;
  5. }


 
 
  1. @Configuration
  2. public class RabbitConfiguration {
  3. ...
  4. @Bean("jacksonConverter") //直接创建一个用于JSON转换的Bean
  5. public Jackson2JsonMessageConverter converter (){
  6. return new Jackson2JsonMessageConverter();
  7. }
  8. }

接着我们只需要指定转换器就可以了:


 
 
  1. @Component
  2. public class TestListener {
  3. //指定messageConverter为我们刚刚创建的Bean名称
  4. @RabbitListener(queues = "yyds", messageConverter = "jacksonConverter")
  5. public void receiver (User user){ //直接接收User类型
  6. System.out.println(user);
  7. }
  8. }

现在我们直接在管理页面发送:

{"id":1,"name":"LB"}
 
 

可以看到成功完成了转换,并输出了用户信息:

同样的,我们也可以直接发送User,因为我们刚刚已经配置了Jackson2JsonMessageConverter为Bean,所以直接使用就可以了:


 
 
  1. @Test
  2. void publisher () {
  3. template.convertAndSend( "amq.direct", "yyds", new User());
  4. }

可以看到后台的数据类型为:

这样,我们就通过SpringBoot实现了RabbitMQ的简单使用。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用Java进行操作RabbitMQ 的相关文章

  • Spring中如何映射Request参数?

    我正在使用 Spring MVC 当用户注册后 一封电子邮件将发送到用户帐户 这工作正常 我还将加密的用户ID发送到用户电子邮件 为此我编写了一个模板
  • eclipse mylyn 与 redmine

    是否可以将mylyn连接到redmine而不需要redmine中的rest支持 有一个链接http download eclipse org mylyn incubator 3 8 http download eclipse org myl
  • 导入java spring项目后如何构建maven

    你好 我是 java spring 概念的新手 所以我下载了一个示例spring应用程序并将其导入到eclipse中 我已经阅读了spring教程 要么我必须将maven安装到eclipse中才能运行spring项目 所以我已经安装了mav
  • 我的应用程序中的 google+ 集成

    我是 Android 开发新手 正在创建一个带有 Google 集成的 Android 应用程序 我看到了一个教程并开始遵循它 但自教程以来 Google 已经更新了 Google 集成 例如 在本教程中您曾经单击 获取配置文件 按钮htt
  • Eclipse + Spring Boot 中“抛出 new SilentExitException()”处的断点

    每次我在 Eclipse IDE Spring Tool Suite 中以调试模式运行 Spring Boot 项目时 线程都会停止在throw new SilentExitException 即使没有断点也行 有什么解决方案可以避免这种行
  • Java无法读取字体

    好的 我在使用自定义字体时遇到问题 基本上我得到了从互联网上下载的自定义字体并在我的程序中使用它 当我在 Eclipse 我使用的编辑器 中运行该程序时 一切正常 没有问题 但是 每当我将它从 eclipse 导出到 jar 时 或者尝试从
  • 限制执行第三方软件的线程的权限

    我正在开发一个基于 Eclipse 的应用程序 能够执行第三方组件 不是 eclipse plugin 每个组件都有一个列出权限 以及相应动机 的自定义描述符 这样最终用户可以决定是否执行它 组件在单独的线程中执行 如何根据描述符限制这些线
  • 单击按钮时更改照片

    import javax swing Icon import javax swing ImageIcon public class Stage1 extends javax swing JFrame int score 0 int iter
  • 在java中将HTML转换为RTF?

    我需要将 HTML 转换为 RTF 我正在使用以下代码 private static String convertToRTF String htmlStr OutputStream os new ByteArrayOutputStream
  • 自动装箱是否调用 valueOf()?

    我试图确定以下陈述是否保证为真 Boolean true Boolean TRUE Boolean true Boolean valueOf true Integer 1 Integer valueOf 1 我一直认为自动装箱相当于调用va
  • 使用 Hashmap 理解两个或多个键

    我的哈希图有问题 在我的哈希映射方法中 我希望有两个或多个关键字作为键 而不是只有一个 例如 我希望用户输入一些包含两个或多个关键字的句子 假设 教授姓名 是关键字 例如 String temp3 instructor teacher me
  • 面临 process.start(); 的问题在 Android 棒棒糖中

    面临一个问题process start 在 Android 棒棒糖中 我在服务中遇到了 android lollipop 后台进程的问题 我的代码在 KitKat 之前工作正常 我有一个ProcessBuilder pBuilder并向其中
  • Selenium - WebDriver.findElement() 和 WebElement.findElement() 之间的区别

    我正在使用WebElement findElement By cssSelector click 在页面上查找某个元素 但它返回了 Unable to locate element 但是当我使用WebDriver findElement B
  • Java - 动态创建子类

    我想以编程方式创建一个子类 我想我的选择很少 Javassist CGLib BCEL 或 ASM 用例是一个应用程序的内部是面向类的 而扩展是基于类的 因此 我不能将单个类作为由外部化脚本驱动的多个扩展的基础 现在 我该怎么做呢 我找到了
  • SwingWorker 和 Executor 的区别

    我正在使用 SwingWorker 在我正在制作的应用程序上执行一些重负载任务 虽然今天我遇到了 Executor 类和这个例子 Executors newCachedThreadPool execute new Runnable publ
  • JPA2+Hibernate 3.6.0 中的 JTA 还是 LOCAL 事务?

    我们正在重新思考我们的技术堆栈 以下是我们的选择 由于应用程序的复杂性等 我们不能没有 Spring 和 Hibernate 我们还从 J2EE 1 4 迁移到 Java EE 5 技术栈 Java EE 5 JPA 2 0 我知道Java
  • GridLayout 中的 JLabel

    如何添加JLabel出于GridLayout 我有一个 8x8 网格布局 Container content getContentPane content setLayout new GridLayout 8 8 2 2 for int f
  • Java中ThreadFactory的使用

    有人可以简要解释一下如何以及何时使用 ThreadFactory 吗 使用和不使用 ThreadFactory 的示例可能确实有助于理解差异 Thanks 这是一种可能的用法 假设您有一个ExecutorService它执行你的Runnab
  • JFrame.repaint() 和 JPanel.repaint() 之间的区别

    谁能解释一下两者之间的区别JPanel repaint 方法和JFrame repaint 方法 我想两者都调用paintComponent JPanel 中的方法 请澄清 谢谢 Calling repaint 在任何组件上都会向重绘管理器
  • java中的“main”可以返回字符串吗?

    java中的public static void main String args 是否有可能返回String代替void 如果是 怎么办 public static String main String args 代替 public st

随机推荐

  • 高性能Mysql——分区表详解

    文章目录 分表和分区 分表和分区的区别 分表和分区联系呢 分区表分区方式 分区管理 分区表注意事项 分表和分区 分表 MySQL 的分表是真正的分表 一张表分成很多表后 每一个小表都是完整的一张表 都对应三个文件 一个 MYD数据文件 MY
  • html做群聊通讯方法,websocket学习和群聊实现

    WebSocket协议可以实现前后端全双工通信 从而取代浪费资源的长轮询 在此协议的基础上 可以实现前后端数据 多端数据 真正的实时响应 在学习WebSocket的过程中 实现了一个简化版群聊 过程和代码详细记录在这篇文章中 1 概述 1
  • 【转】JWT 登录认证及 token 自动续期方案解读

    转自 https mp weixin qq com s X6Xsxgbfvbf3JCa0i7q4 A 要实现认证功能 很容易就会想到 JWT 或者 Session 但是两者有啥区别 各自有什么优缺点 应该选择谁 JWT 和 Session
  • Java线程随笔

    目录 守护线程 线程可见性 线程时序性 线程的中断机制 守护线程 基本概念 守护线程可以简单理解为后台运行线程 守护线程不需要关心他的结束问题 java垃圾回收就是一个守护线程 例如你的应用程序运行时需要播放音乐 如果将播放音乐这个线程设置
  • iOS下XMPP开发之xmppFramework框架的导入步骤和介绍

    一个将要开发xmpp的项目 建议在项目刚创建就导入框架 这样可以避免一些自己操作失误造成不必要的损失 xmpp中最常用的框架就是 xmppFrameWork 第一种方法直接拖 1 gt 拖入文件夹 在网盘链接的xmppFramework文件
  • 【python手写算法】numpy实现简易神经网络和反向传播算法【1】

    import numpy as np def dense A W Z np matmul A W 矩阵乘法 return 1 1 np exp Z if name main leanring rate 100 A np array 200
  • 你真的懂JavaScript吗

    放在前面 本文原文的标题是 So you think you know JavaScript 在下感觉有些标题党了 不过看了下文章的链接还是很不错的 原文作者是由几个问题展开了说明 问题 1 浏览器的console里会打印出什么 var a
  • 黑客是如何获取足够的流量以支撑其发动DDOS攻击?

    对计算有一些了解的朋友可能都会知道DDoS是一种互联网最普及的攻击方式 也是一些黑客的初级入门的技巧 每一次进行大规模的DDoS的攻击 那打出来的流量都让人咂舌 动静大而且波及极为广阔 DDoS要的就是流量 大多数黑客基本上为获取流量而不择
  • 数字电路设计之verilog的门级描述

    使用verilog的数字电路设计 一般会有晶体管级的描述 门级描述 RTL 行为描述 我们接触得比较多的就是后面两种 前两种更少涉及 现在就说一下门级描述吧 门级描述就是使用各种逻辑门对组合逻辑进行描述 举个栗子 与或非门 这里的and o
  • Unity iPhoneX适配方案【NGUI&UGUI】

    本文作者旨在通过改锚点的方式 分别实现在NGUI和UGUI上的iPhone X适配技术方案 并结合自身项目经验 阐述了主要的实现细节 希望能对广大游戏开发团队有借鉴意义 适配来源 按照苹果官方人机界面指南 Apple 开发者中心 在iPho
  • python+django乡村居民数据的可视化平台

    本论文主要论述了如何使用Django框架开发一个乡村振兴数据的可视化平台 本系统将严格按照软件开发流程进行各个阶段的工作 采用B S架构 面向对象编程思想进行项目开发 在引言中 作者将论述乡村振兴数据的可视化平台的当前背景以及系统开发的目的
  • jsp验证码实现代码

    1 后台java代码 package com bobo base servlet import java awt Color import java awt Font import java awt Graphics import java
  • 哪个虚拟服务器免费,免费虚拟主机空间选择哪家会更好?

    虚拟主机在使用过程中需要提供比较大的空间 因为如果内存不足的话 会出现网站访问速度比较慢 或者直接卡顿和卡死的情况 当然现在市场中也有免费虚拟主机空间 大家在选择的时候就可以看看选择哪一家的虚拟主机空间效果会更好 或者在整体的使用性价比上更
  • ckeditor粘贴的图片自动上传

    环境 java springmvc freemaker ckeditor 在做项目的时候发现本地图片粘贴到ckeditor中 img标签的src中的值是 data image png base64 开头的 后面会跟一串字符串 图片越大字符串
  • 数字三角形1

    Description 7 3 8 8 1 0 2 7 4 4 4 5 2 6 5 Figure 1 Figure 1 shows a number triangle Write a program that calculates the
  • 腾讯云服务器地域和可用区选择攻略及分布表更新

    腾讯云服务器地域是指物理数据中心的地理位置 不同地域之间网络完全隔离不能内网通信 可用区是同一个地域下电力和网络互相独立的区域 可用区之间可以做到故障隔离 同地域下的不同可用区之间可以内网通信 腾讯云服务器地域遍布全球 国内地域包括华南地区
  • ClickHouse+DBeaver安装总结(踩坑记录)

    原计划是在win上安装clickhouse 并打算用DBeaver对其进行操作 但后来问题较多 无法解决 使用云服务器安装ClickHouse 主机远程访问的方式代替 记录这个过程存在的问题 使用Docker安装clickhouse 参考链
  • SSM毕业设计分享 病人跟踪治疗信息管理系统(含源码+论文)

    文章目录 1 项目简介 2 实现效果 2 1 界面展示 3 设计方案 3 1 概述 3 2 开发环境 3 3 系统流程 3 4 系统结构设计 4 项目获取 1 项目简介 Hi 各位同学好呀 这里是M学姐 今天向大家分享一个今年 2022 最
  • Jmeter入门基础之线程组(Thread Group)

    大家好 我是Billie 很高兴能和大家一起学习Jmeter 目录 摘要 一 概述 参数配置 1 概述 2 配置参数 三 使用案例 新建线程组 四 补充内容 摘要 本篇文章主要介绍了线程组的参数配置和部分使用方法 提示 以下是本篇文章正文内
  • 使用Java进行操作RabbitMQ

    使用Java操作消息队列 现在我们来看看如何通过Java连接到RabbitMQ服务器并使用消息队列进行消息发送 这里一起讲解 包括Java基础版本和SpringBoot版本 首先我们使用最基本的Java客户端连接方式