RabbitMQ:使用Java进行操作

2023-11-05

使用Java操作消息队列

现在我们来看看如何通过Java连接到RabbitMQ服务器并使用消息队列进行消息发送(这里一起讲解,包括Java基础版本和SpringBoot版本),首先我们使用最基本的Java客户端连接方式:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.2</version>
</dependency>

依赖导入之后,我们来实现一下生产者和消费者,首先是生产者,生产者负责将信息发送到消息队列:

public static void main(String[] args) {
    //使用ConnectionFactory来创建连接
    ConnectionFactory factory = new ConnectionFactory();

    //设定连接信息,基操
    factory.setHost("192.168.0.12");
    factory.setPort(5672);  //注意这里写5672,是amqp协议端口
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setVirtualHost("/test");
  
 		//创建连接
    try(Connection connection = factory.newConnection()){
        
    }catch (Exception e){
        e.printStackTrace();
    }
}

这里我们可以直接在程序中定义并创建消息队列(实际上是和我们在管理页面创建一样的效果)客户端需要通过连接创建一个新的通道(Channel),同一个连接下可以有很多个通道,这样就不用创建很多个连接也能支持分开发送了。

try(Connection connection = factory.newConnection();
    Channel channel = connection.createChannel()){   //通过Connection创建新的Channel
  	//声明队列,如果此队列不存在,会自动创建
    channel.queueDeclare("yyds", false, false, false, null);
  	//将队列绑定到交换机
    channel.queueBind("yyds", "amq.direct", "my-yyds");
  	//发布新的消息,注意消息需要转换为byte[]
    channel.basicPublish("amq.direct", "my-yyds", null, "Hello World!".getBytes());
}catch (Exception e){
    e.printStackTrace();
}

其中queueDeclare方法的参数如下:

  • queue:队列的名称(默认创建后routingKey和队列名称一致)
  • durable:是否持久化。
  • exclusive:是否排他,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。排他队列是基于Connection可见,同一个Connection的不同Channel是可以同时访问同一个连接创建的排他队列,并且,如果一个Connection已经声明了一个排他队列,其他的Connection是不允许建立同名的排他队列的,即使该队列是持久化的,一旦Connection关闭或者客户端退出,该排他队列都会自动被删除。
  • autoDelete:是否自动删除。
  • arguments:设置队列的其他一些参数,这里我们暂时不需要什么其他参数。

其中queueBind方法参数如下:

  • queue:需要绑定的队列名称。
  • exchange:需要绑定的交换机名称。
  • routingKey:不用多说了吧。

其中basicPublish方法的参数如下:

  • exchange: 对应的Exchange名称,我们这里就使用第二个直连交换机。
  • routingKey:这里我们填写绑定时指定的routingKey,其实和之前在管理页面操作一样。
  • props:其他的配置。
  • body:消息本体。

执行完成后,可以在管理页面中看到我们刚刚创建好的消息队列了:

并且此消息队列已经成功与amq.direct交换机进行绑定:

那么现在我们的消息队列中已经存在数据了,怎么将其读取出来呢?我们来看看如何创建一个消费者:

public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("10.37.129.4");
    factory.setPort(5672);
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setVirtualHost("/test");

    //这里不使用try-with-resource,因为消费者是一直等待新的消息到来,然后按照
    //我们设定的逻辑进行处理,所以这里不能在定义完成之后就关闭连接
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    //创建一个基本的消费者
    channel.basicConsume("yyds", false, (s, delivery) -> {
        System.out.println(new String(delivery.getBody()));
        //basicAck是确认应答,第一个参数是当前的消息标签,后面的参数是
        //是否批量处理消息队列中所有的消息,如果为false表示只处理当前消息
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        //basicNack是拒绝应答,最后一个参数表示是否将当前消息放回队列,如果
        //为false,那么消息就会被丢弃
        //channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
        //跟上面一样,最后一个参数为false,只不过这里省了
        //channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
    }, s -> {});
}

其中basicConsume方法参数如下:

●queue  -  消息队列名称,直接指定。
●autoAck - 自动应答,消费者从消息队列取出数据后,需要跟服务器进行确认应答,当服务器收到确认后,会自动将消息删除,如果开启自动应答,那么消息发出后会直接删除。
●deliver  -  消息接收后的函数回调,我们可以在回调中对消息进行处理,处理完成后,需要给服务器确认应答。
●cancel  -  当消费者取消订阅时进行的函数回调,这里暂时用不到。

现在我们启动一下消费者,可以看到立即读取到我们刚刚插入到队列中的数据:


我们现在继续在消息队列中插入新的数据,这里直接在网页上进行操作就行了,同样的我们也可以在消费者端接受并进行处理。

现在我们把刚刚创建好的消息队列删除。

官方文档:Spring AMQP

前面我们已经完成了RabbitMQ的安装和简单使用,并且通过Java连接到服务器。现在我们来尝试在SpringBoot中整合消息队列客户端,首先是依赖:
 

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

接着我们需要配置RabbitMQ的地址等信息:

spring:
  rabbitmq:
    addresses: 192.168.0.4
    username: admin
    password: admin
    virtual-host: /test

这样我们就完成了最基本信息配置,现在我们来看一下,如何像之前一样去声明一个消息队列,我们只需要一个配置类就行了:

@Configuration
public class RabbitConfiguration {
    @Bean("directExchange")  //定义交换机Bean,可以很多个
    public Exchange exchange(){
        return ExchangeBuilder.directExchange("amq.direct").build();
    }

    @Bean("yydsQueue")     //定义消息队列
    public Queue queue(){
        return QueueBuilder
          				.nonDurable("yyds")   //非持久化类型
          				.build();
    }

    @Bean("binding")
    public Binding binding(@Qualifier("directExchange") Exchange exchange,
                           @Qualifier("yydsQueue") Queue queue){
      	//将我们刚刚定义的交换机和队列进行绑定
        return BindingBuilder
                .bind(queue)   //绑定队列
                .to(exchange)  //到交换机
                .with("my-yyds")   //使用自定义的routingKey
                .noargs();
    }
}

接着我们来创建一个生产者,这里我们直接编写在测试用例中:

@SpringBootTest
class SpringCloudMqApplicationTests {

  	//RabbitTemplate为我们封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可
    @Resource
    RabbitTemplate template;

    @Test
    void publisher() {
      	//使用convertAndSend方法一步到位,参数基本和之前是一样的
      	//最后一个消息本体可以是Object类型,真是大大的方便
        template.convertAndSend("amq.direct", "my-yyds", "Hello World!");
    }

}

现在我们来运行一下这个测试用例:

可以看到后台自动声明了我们刚刚定义好的消息队列和交换机以及对应的绑定关系,并且我们的数据也是成功插入到消息队列中:

现在我们再来看看如何创建一个消费者,因为消费者实际上就是一直等待消息然后进行处理的角色,这里我们只需要创建一个监听器就行了,它会一直等待消息到来然后再进行处理:

@Component  //注册为Bean
public class TestListener {

    @RabbitListener(queues = "yyds")   //定义此方法为队列yyds的监听器,一旦监听到新的消息,就会接受并处理
    public void test(Message message){
        System.out.println(new String(message.getBody()));
    }
}

接着我们启动服务器:

可以看到控制台成功输出了我们之前放入队列的消息,并且管理页面中也显示此消费者已经连接了:

接着我们再通过管理页面添加新的消息看看,也是可以正常进行接受的。

当然,如果我们需要确保消息能够被消费者接受并处理,然后得到消费者的反馈,也是可以的:

@Test
void publisher() {
  	//会等待消费者消费然后返回响应结果
    Object res = template.convertSendAndReceive("amq.direct", "my-yyds", "Hello World!");
    System.out.println("收到消费者响应:"+res);
}

消费者这边只需要返回一个对应的结果即可:

@RabbitListener(queues = "yyds")
public String receiver(String data){
    System.out.println("一号消息队列监听器 "+data);
    return "收到!";
}

测试没有问题:

那么如果我们需要直接接收一个JSON格式的消息,并且希望直接获取到实体类呢?

@Data
public class User {
    int id;
    String name;
}

@Configuration
public class RabbitConfiguration {
  	...

    @Bean("jacksonConverter")   //直接创建一个用于JSON转换的Bean
    public Jackson2JsonMessageConverter converter(){
        return new Jackson2JsonMessageConverter();
    }
}

接着我们只需要指定转换器就可以了:

@Component
public class TestListener {

  	//指定messageConverter为我们刚刚创建的Bean名称
    @RabbitListener(queues = "yyds", messageConverter = "jacksonConverter")
    public void receiver(User user){  //直接接收User类型
        System.out.println(user);
    }
}

现在我们直接在管理页面发送:

{"id":1,"name":"LB"}

可以看到成功完成了转换,并输出了用户信息:

同样的,我们也可以直接发送User,因为我们刚刚已经配置了Jackson2JsonMessageConverter为Bean,所以直接使用就可以了:

@Test
void publisher() {
    template.convertAndSend("amq.direct", "yyds", new User());
}

可以看到后台的数据类型为:

这样,我们就通过SpringBoot实现了RabbitMQ的简单使用。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ:使用Java进行操作 的相关文章

随机推荐

  • python 安装whl文件

    python 安装whl文件 使用场景 在terminal中 通过 pip install 命令进行第三方模块安装时 由于网络获其他原因会使得第三方模块下载失败 导致安装失败 此时 我们可以先通过下载网址将第三方模块包手动下载到本地 再手动
  • Linux系统下 查找已安装软件的命令

    1 find 使用find查找文件的所在路径 find 查找路径 查找参数 在根目录下查找以 conf结尾的文件 find name conf 2 ps 通过查找进程的方法找到对应的包的路径 ps ef grep mongo 也可以简写成
  • mysql5.7.13+VS3013 源代码阅读调试

    之前写Java 对C make cmake都不是很熟 所以参考了以下这些前辈写的博客 最后成功搭建了mysql5 7 13 VS3013调试环境 自己总结了需要需要注意的几点 Windows VS2012环境下编译调试MySQL源码 一 W
  • SQLServer如果指定列列值相同则用逗号拼接其他指定列数据 stuff函数+for xml path

    for xml path 就是将 sql 查询出来的内容以XML的格式显示出来 Stuff 查询字符串 开始位置 数字 长度 数字 需插入的字符串 示例 55替换abcd123字符串中的a 示例 55替换abcd123字符串中的abcd 示
  • vue+Echarts绘制k线图(二)--分时图和交易量图

    目录 1 前言 2 分时图 2 1 vue引入Echarts 2 2 分时图介绍 2 3 分时折线图配置 2 4 组合交易量图 2 5 鼠标指示数据设置 2 6 项目完整代码 3 总结 1 前言 近来发现Echarts API越发的强大 对
  • 二分查找的各种应用详解(C++)

    基本概念 Binary Search 二分查找也称折半查找 它是一种效率较高的查找方法 使用二分查找要求线性表必须采用顺序存储结构 而且表中元素按关键字有序排列 基本原理 查找 因为序列已经单调且有序排列 从中间位置开始比较 一次可以排除一
  • 只考一门数据结构!安徽工程大学计算机考研

    安徽工程大学 考研难度 内容 23考情概况 拟录取和复试分析 院校概况 23专业目录 23复试详情 各专业考情分析 各科目考情分析 正文992字 预计阅读 3分钟 2023考情概况 安徽工程大学计算机相关各专业复试和拟录取分析 083500
  • 分布式开放消息系统(RocketMQ)的原理与实践

    分布式开放消息系统 RocketMQ 的原理与实践 作者 CHEN川 关注 2016 02 25 15 43 字数 6784 阅读 135462 评论 49 喜欢 351 赞赏 7 一年前为了一次内部分享而写的这篇文章 没想到会有这么多人阅
  • (ANC)前三章思维导图总结

    最近发现对于一本书 如果一点点事无巨细的做笔记 效率会比较低 于是改变了一下之前的读书方式 用思维导图的做读书笔记 这样便于了解整本书的框架和每章的大致内容 也仅限于自己做笔记用
  • 数据结构和算法(栈的模拟、前中后缀表达式、表达式求值步骤和思路)

    1 栈的介绍 栈的英文为 stack 栈是一个先入后出 FILO First In Last Out 的有序列表 栈 stack 是限制线性表中元素的插入和删除只能在线性表的同一端进行的一种特殊线性表 允许插入和删除的一端 为变化的一端 称
  • qt中qwt的安装的方式

    参考大神博客 https blog csdn net imkelt article details 51234230 utm medium distribute pc relevant none task blog BlogCommendF
  • Error in nextTick “TypeError Cannot read property ‘xxx‘ of undefined“

    报这个错主要是因为子组件还没加载完成就对子组件进行赋值 推荐使用第一个 this nextTick gt 修改子组件的内容 或 setTimeout gt 修改子组件的内容 50 父组件传值给子组件 子组件不能直接修改 会报错 子组件修改父
  • JavaScript中的endsWith

    如何在JavaScript中检查字符串是否以特定字符结尾 示例 我有一个字符串 var str mystring 我想知道该字符串是否以 结尾 我该如何检查 JavaScript中是否有endsWith 方法 我有一个解决方案是获取字符串的
  • 嘴说手画一文搞懂Spark的Join

    Spark Sql的Join和关系型数据库Sql的Join有很多相同点 比如inner join left join right join full join 这是二者都有的概念 并且含义相同 但是 Spark Sql是分布式执行 面对的是
  • ADB命令开启和关闭飞行模式,两段式操作方式!!!!

    开启飞行模式 必须要先执行1 再执行2 执行1 adb shell settings put global airplane mode on 1 执行2 adb shell am broadcast a android intent act
  • Docker部署Elasticsearch集群

    编写docker compose yml version 3 7 services es01 image elasticsearch 7 10 1 container name es01 ports 9200 9200 9300 9300
  • dc-1 靶机渗透学习

    环境 Vmware 虚拟机软件 dc 1 靶机ip地址 192 168 202 130 kali攻击机ip地址 192 168 202 129 本次渗透过程kali攻击机和dc靶机都采取NAT模式 信息收集 首先用ipconfig查看当前k
  • 初始化k8s踩过的坑

    问题一 error execution phase preflight couldn t validate the identity of the API Server abort connecting 这个问题网上有很多的解决方法 大致有
  • 【OpenCV】分离多通道图像RGB的值

    原文地址 http blog csdn net xiaowei cqu article details 7558657 1 计算图像ROI区域RGB的平均值 cvAvg函数 2 通道分离 合并的时候要特别的注意 分离之后的图像时单通道的灰度
  • RabbitMQ:使用Java进行操作

    使用Java操作消息队列 现在我们来看看如何通过Java连接到RabbitMQ服务器并使用消息队列进行消息发送 这里一起讲解 包括Java基础版本和SpringBoot版本 首先我们使用最基本的Java客户端连接方式