RabbitMQ-Java 简单使用

2023-10-26

RabbitMQ-Java 入门案例

参考非常详细的博主教程:https://www.cnblogs.com/dtdx/p/14362760.html
SpringBoot+Java 版教程:https://blog.csdn.net/lgl782519197/article/details/113775569

00、环境搭建

实现步骤:

1、IDEDA内构建一个maven工程(jdk1.8)
2:导入rabbitmq的maven依赖
3:启动rabbitmq-server服务
4:定义生产者、定义消费者
5:观察消息的在rabbitmq-server服务中的过程

1、构建一个maven工程

2、引入rabbitmq-maven依赖

<!-- Java原生依赖 -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>
<!-- Spring依赖 -->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-amqp</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>
<!-- SpringBoot依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

按需选择即可。
番外:rabbitmq和spring同属一个公司开放的产品,所以他们的支持也是非常完善,这也是为什么推荐使用rabbitmq的一个原因。

01、Simple 简单队列


简单理解
如果把使用 RabbitMQ 进行消息发送的过程比喻成邮寄邮件。那么简单队列的场景是,只有一个邮箱、一个邮局、一个投递员,。消息通过 RabbitMQ 进行一对一发送,发送过程最简单。

简单队列模型示意图


代码实战
一个生产者,一个消费者,一个队列(一个默认的交换机)

 

 

1、定义生产者

package com.example.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author:
 * @description: Producer 简单队列生产者
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2: 从连接工厂中获取
        Connection connection = connectionFactory.newConnection("生产者");
        // 3: 从连接中打开通道channel
        Channel channel = connection.createChannel();

        // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
        /*
         *  申明队列:如果队列不存在会自动创建。
         *  注意:
         *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
         *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
         *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
         *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
         *
         *  @params1: queue 队列的名称
         *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
         *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
         *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
         *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
         * */
        channel.queueDeclare("simple-queue1", false, false, false, null);
        // 5: 准备发送消息的内容
        String message = "你好,消息队列!!!";
        // 6: 发送消息给队列queue1
        /*
         * @params1: 交换机exchange
         * @params2: 队列名称、路由key(routing)
         * @params3: 属性配置
         * @params4: 发送消息的内容
         **/
        // 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机。
        channel.basicPublish("", "simple-queue1", null, message.getBytes());
        System.out.println("消息发送成功!");

        // 最后关闭通关和连接
        channel.close();
        connection.close();
    }
}


2、定义消费者

package com.example.simple;

import com.rabbitmq.client.*;
import java.io.IOException;

/**
 * @author:
 * @description: Producer 简单队列消费者
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        /*
         * 2: 从连接工厂中获取/创建连接(断点到此步可以发现web界面Connection下会出现此连接信息)
         * 3: 从连接中获取通道channel(断点到此步可以发现web界面Channel下会出现此连接信息)
         */
        try (Connection connection = connectionFactory.newConnection("消费者");
             Channel channel = connection.createChannel()){

            // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息(声明队列可以在生产者或者消费者端)
            //channel.queueDeclare("queue1", false, false, false, null);

            // 接收消息,监听对应的队列名即可
            /*
             *  @params1: queue 队列的名称
             *  @params2: autoAck 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
             *  @params3: deliverCallback 指定消费回调,开启监听队列queue1
             *  @params4: cancelCallback 消费失败回调
             * */
            channel.basicConsume("simple-queue1", true, new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
                    System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s)  {
                    System.out.println("接受失败了...");
                }
            });

            // 让程序停止,好接收消费
            System.out.println("开始接受消息");
            System.in.read();
        }

    }
}


3、先运行生产者,然后运行消费者。最后消费者控制台输出:

开始接受消息
收到消息是你好,消息队列!!!


02、Work 工作队列


简单理解
在前面我们学了Simple Queue 模型,Simple Queue 模型消息生产者和消费者是一 一对应的,消息由生产者发送到队列并由消费者消费。假设有一种消息,消息生产者一次性发送了10条消息,消费者消费一条消息执行耗时1秒,如果使用简单队列模式总共耗时10秒;如果使用工作队列模式有3个消费者共同消费这些消息理想情况下只需要3秒就可以处理完这些消息。

工作队列模型示意图

 

 

代码实战-轮询模式(Round-Robin)
1、定义生产者

package com.example.work.round;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2: 创建连接,获取通道
        Connection connection = connectionFactory.newConnection("生产者");
        Channel channel = connection.createChannel();

        // 3: 声明队列
        channel.queueDeclare("work-queue1", false, false, false, null);
        // 4: 循环发送消息
        for (int i = 1; i <= 20; i++) {
            channel.basicPublish("", "work-queue1", null, ("work-轮询模式:"+ i).getBytes());
        }
        System.out.println("消息发送成功!");

        // 最后关闭通关和连接
        channel.close();
        connection.close();
    }
}


2、定义消费者1

package com.example.work.round;

import com.rabbitmq.client.*;

public class Work1 {
    public static void main(String[] args) throws Exception {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2: 创建连接,获取通道
        Connection connection = connectionFactory.newConnection("消费者1");
        Channel channel = connection.createChannel();

        // 接收消息,监听对应的队列名即可
        channel.basicConsume("work-queue1", true, (consumerTag, delivery) ->
                System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8")),
                consumerTag  -> {
        });

    }
}


3、定义消费者2

package com.example.work.round;

import com.rabbitmq.client.*;

public class Work2 {
    public static void main(String[] args) throws Exception {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2: 创建连接,获取通道
        Connection connection = connectionFactory.newConnection("消费者2");
        Channel channel = connection.createChannel();

        // 接收消息,监听对应的队列名即可
        channel.basicConsume("work-queue1", true, (consumerTag, delivery) ->
                System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8")),
                consumerTag  -> {
        });

    }
}


查看控制台输出:

消费者1:

收到消息是work-轮询模式:1
收到消息是work-轮询模式:3
收到消息是work-轮询模式:5
收到消息是work-轮询模式:7
收到消息是work-轮询模式:9
收到消息是work-轮询模式:11
收到消息是work-轮询模式:13
收到消息是work-轮询模式:15
收到消息是work-轮询模式:17
收到消息是work-轮询模式:19


消费者2:

收到消息是work-轮询模式:2
收到消息是work-轮询模式:4
收到消息是work-轮询模式:6
收到消息是work-轮询模式:8
收到消息是work-轮询模式:10
收到消息是work-轮询模式:12
收到消息是work-轮询模式:14
收到消息是work-轮询模式:16
收到消息是work-轮询模式:18
收到消息是work-轮询模式:20


代码实战-公平分发(Fair-Dispatch)

也可以称为:Work模式的”能者多劳“模式

1、定义生产者

package com.example.work.fairr;

import com.rabbitmq.client.*;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2: 创建连接,获取通道
        Connection connection = connectionFactory.newConnection("生产者");
        Channel channel = connection.createChannel();

        // 3: 声明队列
        channel.queueDeclare("work-queue2", false, false, false, null);
        for (int i = 1; i <= 20; i++) {
            channel.basicPublish("", "work-queue2", null, ("work-公平分发:"+ i).getBytes());
        }
        System.out.println("消息发送成功!");

        // 最后关闭通关和连接
        channel.close();
        connection.close();
    }
}


2、定义消费者1

package com.example.work.fairr;

import com.rabbitmq.client.*;
import java.util.concurrent.TimeUnit;

public class Work1 {
    public static void main(String[] args) throws Exception {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2: 创建连接,获取通道
        Connection connection = connectionFactory.newConnection("消费者1");
        Channel channel = connection.createChannel();

        // 同一时刻服务器只会发送一条消息给消费者
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            try {
                // 加一个睡眠,模拟消费者1消费慢
                TimeUnit.MILLISECONDS.sleep(1000);
                System.out.println("Work1-收到消息是" + new String(delivery.getBody(), "UTF-8"));
                // 确认消息消费,参数1:确认队列中哪个具体消息,参数2:是否开启多个消息同时确认
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        // 接收消息,开启手动确认消费消息机制,设置autoAck为false,防止消息一下子都进入消费者
        channel.basicConsume("work-queue2", false, deliverCallback,consumerTag  -> {
        });

    }
}


3、定义消费者2

package com.example.work.fairr;

import com.rabbitmq.client.*;
import java.util.concurrent.TimeUnit;

public class Work2 {
    public static void main(String[] args) throws Exception {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2: 创建连接,获取通道
        Connection connection = connectionFactory.newConnection("消费者2");
        Channel channel = connection.createChannel();

        // 同一时刻服务器只会发送一条消息给消费者
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            try {
                // 加一个睡眠,模拟消费者1消费慢
                TimeUnit.MILLISECONDS.sleep(100);
                System.out.println("Work2-收到消息是" + new String(delivery.getBody(), "UTF-8"));
                // 确认消息消费,参数1:确认队列中哪个具体消息,参数2:是否开启多个消息同时确认
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        // 接收消息,开启确认机制,设置autoAck为false
        channel.basicConsume("work-queue2", false, deliverCallback, consumerTag  -> {
        });

    }
}


与一般情况不同的是如下三个步骤(在消费端配置):

// 1.同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

// 2.开启这行 表示使用手动确认模式
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

// 3.同时改为手动确认:监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, false, consumer);


查看控制台输出结果:

消费者1:

Work1-收到消息是work-公平分发:2
Work1-收到消息是work-公平分发:12

消费者2:

Work2-收到消息是work-公平分发:1
Work2-收到消息是work-公平分发:3
Work2-收到消息是work-公平分发:4
Work2-收到消息是work-公平分发:5
Work2-收到消息是work-公平分发:6
Work2-收到消息是work-公平分发:7
Work2-收到消息是work-公平分发:8
Work2-收到消息是work-公平分发:9
Work2-收到消息是work-公平分发:10
Work2-收到消息是work-公平分发:11
Work2-收到消息是work-公平分发:13
Work2-收到消息是work-公平分发:14
Work2-收到消息是work-公平分发:15
Work2-收到消息是work-公平分发:16
Work2-收到消息是work-公平分发:17
Work2-收到消息是work-公平分发:18
Work2-收到消息是work-公平分发:19
Work2-收到消息是work-公平分发:20


03、Fanout 发布订阅


Pub/Sub模型简单理解
在前面我们学习了简单队列模型和工作队列模型,其实不管是简单队列模型还是工作队列模式只用到了 RabbitMQ 中的 Queue 队列,消息直接发送到队列中,消费者直接从队列中获取消息。

现在我们学习另一种消息的处理方式,消息生产者将消息发送到 Exchange 交换机中,再由交换机将消息投递到 Queue队列中,交换机和队列的关系是多对多的关系,表示一个交换机可以通过一定的规矩将消息投递到多个队列中,同样一个队列也可以接收多个交换机投递的消息

交换机投递消息到队列由哪几种方式?

Fanout: 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的
Direct:处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 test,则只有被标记为test的消息才被转发,不会转发test.aaa,也不会转发dog.123,只会转发test。
Topic: 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号#匹配一个或多个词,符号* 匹配不多不少一个词。因此audit.#能够匹配到audit.irs.corporate,但是audit.* 只会匹配到audit.irs
Headers:Headers类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。Headers是一个键值对,可以定义成Hashtable。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。fanout,direct,topic exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型

发布订阅模型示例图

 

 代码实战

一个生产者,一个交换机,三个队列,三个消费者
特点:发布与订阅模式,是一种广播机制,它是没有路由key的模式。

1、定义生产者

package com.example.fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2: 创建连接,获取通道
        Connection connection = connectionFactory.newConnection("生产者");
        Channel channel = connection.createChannel();

        // 3: 定义队列名称-随机生成队列名、交换机名、路由key(fanout模式时路由key为空字符串)
        String exchangeName = "fanout-exchange";

        // 4-1: 声明交换机(也可通过web页面创建)
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);
        // 4-2: 声明队列
        channel.queueDeclare("fanout-queue1", false, false, false, null);
        channel.queueDeclare("fanout-queue2", false, false, false, null);
        channel.queueDeclare("fanout-queue3", false, false, false, null);
        // 4-3: 绑定交换机和队列
        channel.queueBind("fanout-queue1", exchangeName, "");
        channel.queueBind("fanout-queue2", exchangeName, "");
        channel.queueBind("fanout-queue3", exchangeName, "");

        // 5: @params1: 交换机名  @params2 队列/路由key @params 属性配置  @params4 消息内容
        channel.basicPublish(exchangeName, "", null, "你好,消息队列!".getBytes("UTF-8"));
        System.out.println("消息发送成功!");

        // 最后关闭通关和连接
        channel.close();
        connection.close();
    }
}


2、定义消费者

package com.example.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Consumer {

    private static Runnable runnable = () -> {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        Connection connection = null;
        Channel channel = null;
        try {
            // 2: 创建连接,获取通道(消费者一般不增加自动关闭)
            connection = connectionFactory.newConnection("消费者");
            channel = connection.createChannel();
            // 获取队列的名称
            final String queueName = Thread.currentThread().getName();

            // 6: 定义接受消息的回调
            DeliverCallback deliverCallback = (consumerTag, delivery) ->
                    System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));

            channel.basicConsume(queueName, true, deliverCallback, consumerTag  -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 此处应该是需要关闭通道的,不过为了测试咱们不关闭了
        }
    };

    public static void main(String[] args) {
        // 启动三个线程去执行
        new Thread(runnable, "fanout-queue1").start();
        new Thread(runnable, "fanout-queue2").start();
        new Thread(runnable, "fanout-queue3").start();
    }
}


消费者控制台输出:

fanout-queue3:收到消息是:你好,消息队列!
fanout-queue1:收到消息是:你好,消息队列!
fanout-queue2:收到消息是:你好,消息队列!


04、Direct 路由模式


简单理解
Direct Exchange 消息模型 是交换机将消息投递到队列中的另一种方式,需要将交换机类型设置为 direct。那么什么是 direct 类型交换机?

其中Direct:处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 test,则只有被标记为test的消息才被转发,不会转发test.aaa,也不会转发dog.123,只会转发test。

路由消息模型示意图

 

 Direct 消息流转过程

  • 生产者向Exchange发送消息。
  • 队列使用路由密钥绑定到Exchange。
  • 通常,使用相同/不同的路由密钥有多个队列绑定到Exchange。
  • 发送到Exchange的消息包含路由密钥。根据路由密钥,消息将转发到一个或多个队列。
  • 订阅队列的使用者接收消息并进行处理。


代码实战

一个生产者,一个交换机,两个队列,两个消费者
特点:Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。

1、定义生产者

package com.example.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2: 创建连接,获取通道
        Connection connection = connectionFactory.newConnection("生产者");
        Channel channel = connection.createChannel();

        // 3: 定义队列名称-随机生成队列名、交换机名、路由key(fanout模式时路由key为空字符串)
        String exchangeName = "direct-exchange";

        // 4-1: 声明交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
        // 4-2: 声明队列
        channel.queueDeclare("direct-info", false, false, false, null);
        channel.queueDeclare("direct-error", false, false, false, null);
        // 4-3: 绑定交换机和队列
        channel.queueBind("direct-info", exchangeName, "info");
        channel.queueBind("direct-error", exchangeName, "error");

        // 5: @params1: 交换机名  @params2 队列/路由key @params 属性配置  @params4 消息内容
        channel.basicPublish(exchangeName, "info", null, "你好,消息队列!".getBytes("UTF-8"));
        channel.basicPublish(exchangeName, "error", null, "你好,消息队列!".getBytes("UTF-8"));

        System.out.println("消息发送成功!");

        // 最后关闭通关和连接
        channel.close();
        connection.close();
    }
}


2、定义消费者

package com.example.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Consumer {

    private static Runnable runnable = () -> {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        Connection connection = null;
        Channel channel = null;
        try {
            // 2: 创建连接,获取通道(消费者一般不增加自动关闭)
            connection = connectionFactory.newConnection("消费者");
            channel = connection.createChannel();

            // 获取队列的名称
            final String queueName = Thread.currentThread().getName();

            // 3:定义接受消息的回调
            channel.basicConsume(queueName, true, (consumerTag, delivery) ->
                    System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody()),
                    consumerTag  -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 此处应该是需要关闭通道的,不过为了测试咱们不关闭了
        }
    };

    public static void main(String[] args) {
        // 启动三个线程去执行
        new Thread(runnable, "direct-info").start();
        new Thread(runnable, "direct-error").start();
    }
}

消费者控制台输出:

direct-error:收到消息是:你好,消息队列!
direct-info:收到消息是:你好,消息队列!

05、Topic 主题模式

简单理解
Topic Exchange 消息模型需要指定路由器类型设置为Topic,那么什么是 Topic 类型?

Topic: 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号#匹配一个或多个词,符号* 匹配不多不少一个词。因此audit.#能够匹配到audit.irs.corporate,但是audit.* 只会匹配到audit.irs

主题消息模型示意图

 

 Topic 中的路由键设置规则

  • Topic Exchange中的路由关键字必须包含零个或多个由 . 点分隔的单词,例如health.education。
  • Topic Exchange中的路由键通常称为路由模式。
  • 路由键允许只包含 星号(*)和 井号 (#)的正则表达式组成
  • 星号(*****)表示正好允许一个字。
  • 同样,井号(#)表示允许的单词数为零或更多。
  • 点号(.)表示–单词定界符。多个关键术语用点定界符分隔。
  • 如果路由模式为**health.\***,则意味着以第一个单词为首的路由键运行状况发送的任何消息都将到达队列。例如,health.education将到达此队列,但sports.health将不起作用。

Topic 中的消息流转过程

  • 一个Queue队列通过路由键(P)绑定到 Exchange。
  • Producer 将带有P路由键(K)的消息发送到 Topic Exchange。
  • 如果P与K匹配,则消息被传递到队列。路由密钥匹配的确定如下所述。
  • 订阅队列的使用者将收到消息。

代码实战

一个生产者,一个交换机,两个队列,两个消费者
特点:Topic模式是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式。

生产者创建Topic的exchange并且绑定到队列中,绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx 去别写。* -》代表xxx,# -》代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底时什么
简单的说:* 表示一个,# 表示 0个 或者 多个

1、定义生产者

package com.example.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2: 创建连接,获取通道
        Connection connection = connectionFactory.newConnection("生产者");
        Channel channel = connection.createChannel();

        // 3: 定义队列名称-随机生成队列名、交换机名、路由key(fanout模式时路由key为空字符串)
        String exchangeName = "topic-exchange";

        // 4-1: 声明交换机(也可通过web页面创建)
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC);

        // 4-2: 声明队列
        channel.queueDeclare("topic-queue1", false, false, false, null);
        channel.queueDeclare("topic-queue2", false, false, false, null);
        channel.queueDeclare("topic-queue3", false, false, false, null);

        // 4-3: 绑定交换机和队列
        channel.queueBind("topic-queue1", exchangeName, "#.order");
        channel.queueBind("topic-queue2", exchangeName, "*.user");

        // 5: @params1: 交换机名  @params2 队列/路由key @params 属性配置  @params4 消息内容
        // topic-queue1、topic-queue2、topic-queue3
        channel.basicPublish(exchangeName, "com.course.order", null, "routingKey:#.order".getBytes("UTF-8"));
        // topic-queue3
        channel.basicPublish(exchangeName, "com.order.user", null, "routingKey:#.user".getBytes("UTF-8"));

        System.out.println("消息发送成功!");

        // 最后关闭通关和连接
        channel.close();
        connection.close();
    }
}

2、定义消费者

package com.example.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Consumer {

    private static Runnable runnable = () -> {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        Connection connection = null;
        Channel channel = null;
        try {
            // 2: 创建连接,获取通道(消费者一般不增加自动关闭)
            connection = connectionFactory.newConnection("消费者");
            channel = connection.createChannel();
            // 获取队列的名称
            final String queueName = Thread.currentThread().getName();

            // 6: 定义接受消息的回调
            channel.basicConsume(queueName, true,
                    (consumerTag, delivery) -> System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")),
                    consumerTag  -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
            System.err.println("发送消息出现异常...");
        } finally {
            // 此处应该是需要关闭通道的,不过为了测试咱们不关闭了
        }
    };

    public static void main(String[] args) {
        // 启动三个线程去执行
        new Thread(runnable, "topic-queue1").start();
        new Thread(runnable, "topic-queue2").start();
        new Thread(runnable, "topic-queue3").start();
    }
}

消费者控制台输出:

topic-queue1:收到消息是:routingKey:#.order


06、Headers 模式


Headers Exchange 模型需要指定路由器类型设置为Headers,那么什么是 HeadersHeaders 类型?

Headers:Headers类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。Headers是一个键值对,可以定义成Hashtable。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。fanout,direct,topic exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型

Headers 消息模型示意图

 Headers 中消息流转过程

  • 一个或多个队列使用标头属性(H)绑定(链接)到标头交换。
  • 生产者将带有标头属性(MH)的消息发送到此Exchange。
  • 如果MH与H匹配,则消息将转发到队列。
  • 监听队列的使用者接收消息并对其进行处理。

代码实现
1、生产者:

package com.example.header;

import com.rabbitmq.client.*;

import java.util.HashMap;
import java.util.Map;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2: 创建连接,获取通道
        Connection connection = connectionFactory.newConnection("生产者");
        Channel channel = connection.createChannel();

        // 3: 定义队列名称-随机生成队列名、交换机名、路由key

        /**
         * 定义交换机、队列 和 绑定队列和交换机并绑定配置参数
         */
        // 4-1: 声明交换机
        channel.exchangeDeclare("header-exchange", BuiltinExchangeType.HEADERS);
        // 4-2: 声明队列
        channel.queueDeclare("header-queue-noe", false, false, false, null);
        channel.queueDeclare("header-queue-two", false, false, false, null);
        // 4-3: 绑定交换机和队列并加上配置header匹配规则
        Map<String, Object> oneArgs = new HashMap<>();
        // 匹配其中一个key/value 就能匹配成功
        oneArgs.put("x-match", "any");
        oneArgs.put("h1", "Header1");
        oneArgs.put("h2", "Header2");
        channel.queueBind("header-queue-noe", "header-exchange", "", oneArgs);
        Map<String, Object> twoArgs = new HashMap<>();
        // 必须匹配所有key/value 才能匹配成功
        twoArgs.put("x-match", "all");
        twoArgs.put("h1", "Header1");
        twoArgs.put("h2", "Header2");
        channel.queueBind("header-queue-two", "header-exchange", "", twoArgs);


        /**
         * 定义需要发送的消息和属性
         */
        Map<String, Object> headerMap = new HashMap<>();
        headerMap.put("h1", "Header1");
        headerMap.put("h3", "Header3");

        AMQP.BasicProperties basicPropertiesOne = new AMQP.BasicProperties()
                .builder().headers(headerMap).build();
        channel.basicPublish("header-exchange", "", basicPropertiesOne, "Header Exchange example 1".getBytes("UTF-8"));
        System.out.println("h1,h3:消息发送成功!");

        headerMap.put("h2", "Header2");
        AMQP.BasicProperties basicPropertiesTwo = new AMQP.BasicProperties()
                .builder().headers(headerMap).build();
        channel.basicPublish("header-exchange", "", basicPropertiesTwo, "Header Exchange example 2".getBytes("UTF-8"));
        System.out.println("h1,h2,h3:消息发送成功!");

        // 最后关闭通关和连接
        channel.close();
        connection.close();
    }
}

2、消费者:

package com.example.header;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Consumer {

    private static Runnable runnable = () -> {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try {
            // 2: 创建连接,获取通道(消费者一般不增加自动关闭)
            connection = connectionFactory.newConnection("消费者");
            channel = connection.createChannel();
            // 获取队列的名称
            final String queueName = Thread.currentThread().getName();

            // 6: 定义接受消息的回调
            channel.basicConsume(queueName, true, ((consumerTag, message) -> {
                System.out.println(queueName + ":收到消息是: " + new String(message.getBody()));
            }), consumerTag -> {
                System.out.println(consumerTag);
            });
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 此处应该是需要关闭通道的,不过为了测试咱们不关闭了
        }
    };

    public static void main(String[] args) {
        // 启动三个线程去执行
        new Thread(runnable, "header-queue-noe").start();
        new Thread(runnable, "header-queue-two").start();
    }
}

消费者控制台输出:

header-queue-noe:收到消息是: Header Exchange example 1
header-queue-two:收到消息是: Header Exchange example 2
header-queue-noe:收到消息是: Header Exchange example 2
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ-Java 简单使用 的相关文章

随机推荐

  • STM32F407的CAN通信实验CAN1CAN2的设置

    CAN2的初始化参照上一篇博文就OK了 需要修改的地方 配置过滤器 CAN FilterInitStructure CAN FilterNumber 14 那为什么要修改额 查datasheet 双 CAN CAN1 主 bxCAN 用于管
  • Flex应用程序启动详解

    编写一个简单的Flex应用程序并不复杂 就算你从来没接触过Flex程序设计 照着帮助的实例步骤 不需花多长时间也能做出一个漂亮简捷的小程序出来 不过 随着对Flex程序编写的深入 会越来越觉得 其实要编写一个好的Flex应用程序并不简单 涉
  • uniapp切片-可视化设计工具(一套代码编译到7个平台iOS、Android、H5、小程序)

    uni app 是一个使用 Vue js 开发跨平台应用的前端框架 开发者编写一套代码 可编译到iOS Android H5 小程序等多个平台 一套代码编到7个平台 难以置信吗 依次扫描7个二维码 亲自体验最全面的跨平台效果 uni app
  • C++之sort()函数详解,刷题必备~

    顾名思义 sort就是用来排序的函数 它根据具体情形使用不同的排序方法 效率较高 一般来说 不推荐使用C语言中的qsort函数 原因是qsort用起来比较烦琐 涉及很多指针的操作 而且sort在实现中规避了经典快速排序中可能出现的会导致实际
  • C# 中的sealed修饰符学习

    转载原地址 http developer 51cto com art 200908 147327 htm C 语言还是比较常见的东西 这里我们主要介绍C sealed修饰符 包括介绍两个修饰符在含义上互相排斥用于方法和属性等方面 C sea
  • python爬虫网络出错怎么办_Python爬虫异常处理

    100 继续 客户端应当继续发送请求 客户端应当继续发送请求的剩余部分 或者如果请求已经完成 忽略这个响应 101 转换协议 在发送完这个响应最后的空行后 服务器将会切换到在Upgrade 消息头中定义的那些协议 只有在切换新的协议更有好处
  • linux上redis常用命令以及遇到的问题

    1 在linux上解压缩后使用make命令进行编译的时候 错误类型 zmalloc h 50 31 致命错误 jemalloc jemalloc h 没有那个文件或目录 原因是因为编译的时候Linux默认内存分配器是jemalloc 而Re
  • 添加商品到购物车 Vuex

    商品详情 购物车页面 code
  • OFDM插入导频过程详解

    ofdm符号的长度 有效数据 cp的长度 cp就是将有效数据的后半部分1 4截取并添加到有效数据的开始部分 比如一个ofdm符号的长度为4us 那么有效数据的长度为3 2us cp的长度为0 8us 子载波的间隔 1 有效数据的长度 就是有
  • Unity使用c#开发遇上的问题(十三)(unity平台下使用 Vuforia 以及 ARFoundiation 的总结,根据个人观点)

    文章目录 前言 一 Vuforia的使用感觉 二 ARfoundiation的使用感觉 总结 前言 有一段时间没有更新系列的内容 上次更新完又重新思考了一下以后进行的方向 这里就目前接触的Vuforia 和 unity 自带的AR 之前叫A
  • 自动生成根据mysql表创建hive表脚本

    bin bash source etc profile 该脚本为手动传参根据MySQL表信息创建hive表 输入参数判断逻辑 必须数据两个参数 一个是MySQL库名 第二个是表名 if eq 2 then db name 1 mysql 库
  • 浅析java垃圾回收机制

    一 什么是垃圾回收 1 垃圾回收 顾名思义 便是将已经分配出去的 但却不再使用的内存回收回来 以便能够再次分配 在 Java 虚拟机的语境下 垃圾指的是死亡的对象所占据的堆空间 垃圾回收只会负责释放那些对象占有的内存 此时对象也就被销毁 2
  • 0长度数组的使用,重点掌握的知识

    0长度的数组在ISO C和C 的规格说明书中是不允许的 但是GCC的C99支持的这种用法 GCC对0长度数组的文档参考 Arrays of Length Zero 如下代码片段 哪个更简洁更灵活 看一眼就知道了 include
  • 用vscode开发autojs,输出窗口不显示任何输出结果

    我的情况是 我vscode开发autojs 程序 之前在一切正常的情况下 输出窗口可以正常显示程序运行结果 右侧红圈里可以选择我连接的手机型号 如下图 但是现在出现问题 就是输出窗口不显示任何结果 在右侧的选项卡里也找不到我的手机型号 之前
  • 2021年全球与中国龙胆苦苷行业市场规模及发展前景分析

    2021年全球与中国龙胆苦苷行业市场规模及发展前景分析 本报告研究全球与中国市场龙胆苦苷的发展现状及未来发展趋势 分别从生产和消费的角度分析龙胆苦苷的主要生产地区 主要消费地区以及主要的生产商 重点分析全球与中国市场的主要厂商产品特点 产品
  • (R,线性回归)R语言里的模型诊断图(Residuals vs Fitted,Normal QQ , Scale-Location ,Residuals Leverage)

    线性回归 是概率统计学里最重要的统计方法 也是机器学习中一类非常重要的算法 线性模型简单理解非常容易 但是内涵是非常深奥的 尤其是线性回归模型中的Diagnostics plot的阅读与理解一直被认为是线性回归中的一个难点 在任何线性模型中
  • 获取微信公众号地址的图片不能正常显示的问题

    获取微信公众号地址的图片不能正常显示的问题 目前已经获取微信公众号发布的图片 但不能正常显示 提示 此图片来自微信公众平台 未经允许不得引用 看了一下他的地址是这样的 https mmbiz qpic cn mmbiz jpg ic70qV
  • Codeforces Round #291 (Div. 2)

    题目链接contest 514 A Chewba ca and Number 不允许有前导零 所以如果第一位是9的话 需要特别考虑 一开始理解错了题意 又WA了呜呜呜 include
  • 弱密码测试工具blaster使用演示

    声明 出品 安全客 以下内容 来自安全客作者原创 由于传播 利用此文所提供的信息而造成的任何直接或间接的后果和损失 均由使用者本人负责 长白山攻防实验室以及文章作者不承担任何责任 关于blaster blaster是一款强大的弱密码隐患检测
  • RabbitMQ-Java 简单使用

    RabbitMQ Java 入门案例 参考非常详细的博主教程 https www cnblogs com dtdx p 14362760 html SpringBoot Java 版教程 https blog csdn net lgl782