RabbitMQ快速入门及六大模式

2023-05-16

目录

核心组件

运行原理

实现步骤

构建Gradle项目

入门案例(简单模式)

        生产者(代码)

         消费者(代码)

绑定交换机和队列

发布/订阅模式(Publish/Subscribe)

        生产者

        消费者

路由模式(routing)

        生产者

        消费者

主题模式(topics)

        生产者

        消费者

工作模式(Work queue)

        轮询模式

        公平分配模式


核心组件

 - Server:又称Broker接受客户端的连接,实现AMQP实体服务。安装rabbitmq-server Connection:连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手

- Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各 Channel,每个Channel代表一个会话任务。

- Message:消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。

- VirtualHost虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和 Queueu,同一个虚拟王机里面不能有相同名字的Exchange

- Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(==不具备消息存储的能力==)

- Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.

- Routingkey:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。

- Queue:队列:也成为MessageQueue,消息队列,保存消息并将它们转发给消费者。

运行原理

实现步骤

        1、JDK11
        2、构建一个gradle项目
        3、导入RabbitMQ的相关依赖
        4、启动rabbitmq-server服务
        5、定义生产者
        6、定义消费者
        7、查看消息在rabbitmq-server服务中的过程

构建Gradle项目

        Maven 同样引入相应的依赖

implementation group: 'com.rabbitmq', name: 'amqp-client', version: '5.10.0'

入门案例(简单模式)

        生产者(代码)

package com.any.rabbitmq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) {
        //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
        // ip port
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点
        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取通道Channel
            channel = connection.createChannel();
            String queueName = "queue1";
            /**
             * @params1 队列的名称
             * @params2 是否要持久化durable=false 所有的持久化是否存盘,如果false 不持久化 ture 持久化
             *          非持久化会存盘吗? 会存盘,随着服务器的存盘会丢失
             * @params3 排他性,是否是独占队列
             * @params4 是否自动删除,随着最后一个消费者消费消息完毕,是否把队列自动删除
             * @params5 携带一些附加参数
             */
            // 4、通过通道交换机,队列,绑定关系,路由key,发送消息,和接收消息
            channel.queueDeclare(queueName,false,false,true,null);
            // 5、准备消息
            String message = "hello,anyboot";
            // 6、发送消息给 queue
            channel.basicPublish("",queueName,null,message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

         消费者(代码)

package com.any.rabbitmq.simple;

import com.rabbitmq.client.*;
import java.io.IOException;

/**
 * 消费者
 *
 * @author 15821
 * @date 20:20 2022/1/6
 */
public class Consumer {
    public static void main(String[] args) {
        //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
        // ip port
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("101.35.118.177");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点
        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取通道Channel
            channel = connection.createChannel();
            String queueName = "queue1";
            channel.basicConsume(queueName, true, new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到的消息:" + new String(message.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
                    System.out.println("消息接受失败");
                }
            });
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

绑定交换机和队列

        该例子以生产者为例

package com.any.rabbitmq.all;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

/**
 * 路由模式
 * 交换机声明,队列声明, 绑定(binding)
 * 生产者
 *
 */
public class Producer {

    public static void main(String[] args) {
        //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
        // ip port

        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("101.35.118.177");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取信道Channel
            channel = connection.createChannel();

            // 定义交换机名称
            String exchangeName = "direct-message-exchange";

            // 定义交换机类型
            String exchangeType = "direct";

            // 定义 路由key
            String routingKey = "message";

            /**
             * 4、声明交换机
             * @param1 exchange 交换机名称
             * @param2 type 交换机类型
             * @param3 durable 重启是否删除
             */
            channel.exchangeDeclare(exchangeName,exchangeType,true);
            /**
             * 5、声明队列
             * @params1 队列的名称
             * @params2 是否要持久化durable=false 所有的持久化是否存盘,如果false 不持久化 ture 持久化
             *          非持久化会存盘吗? 会存盘,随着服务器的存盘会丢失
             * @params3 排他性,是否是独占队列
             * @params4 是否自动删除,随着最后一个消费者消费消息完毕,是否把队列自动删除
             * @params5 携带一些附加参数
             */

            channel.queueDeclare("queue5",true,false,false,null);
            channel.queueDeclare("queue6",true,false,false,null);
            channel.queueDeclare("queue7",true,false,false,null);

            /**
             * 6、绑定
             * @param1 queue 队列名称
             * @param2 exchange 交换机名称
             * @param3 routingKey 路由Key
             */
            channel.queueBind("queue5",exchangeName,routingKey);
            channel.queueBind("queue6",exchangeName,routingKey);
            channel.queueBind("queue7",exchangeName,"email");

            // 5、准备消息
            String message = "hello,message-exchange";
            // 6、发送消息给 queue
            /**
             * @params1 exchange 交换机
             * @params2 队列名称
             * @params3 props 消息状态控制
             * @params4 body 消息内容
             */
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    }

}

发布/订阅模式(Publish/Subscribe)

 

        生产者

package com.any.rabbitmq.routing.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;

/**
 * 发布/订阅模式
 * fanout 模式
 * 生产者
 * @author 15821
 * @date 20:20 2022/1/6
 */
public class Producer {
    // 发布订阅模式
    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("101.35.118.177");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点
        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取通道Channel
            channel = connection.createChannel();
            // 定义 队列名称
            String queueName = "queue1";
            // 定义交换机名称
            String exchangeName = "fanout-exchange";
            // 定义交换机类型
            String exchangeType = "fanout";
            // 定义 路由key
            String routingKey = "";
            /**
             * @params1 队列的名称
             * @params2 是否要持久化durable=false 所有的持久化是否存盘,如果false 不持久化 ture 持久化
             *          非持久化会存盘吗? 会存盘,随着服务器的存盘会丢失
             * @params3 排他性,是否是独占队列
             * @params4 是否自动删除,随着最后一个消费者消费消息完毕,是否把队列自动删除
             * @params5 携带一些附加参数
             */
            // 4、通过通道交换机,队列,绑定关系,路由key,发送消息,和接收消息
            //channel.queueDeclare(queueName,true,false,true,null);
            // 5、准备消息
            String message = "hello,anyboot";
            // 6、发送消息给 queue
            /**
             * @params1 exchange 交换机
             * @params2 队列名称
             * @params3 props 消息状态控制
             * @params4 body 消息内容
             */
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    }

}

        消费者

package com.any.rabbitmq.routing.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 发布/订阅模式Publish/Subscribe
 * fanout 模式
 * 消费者
 *
 * @author 15821
 * @date 20:20 2022/1/6
 */
public class Consumer {

    public static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
            // ip port
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("101.35.118.177");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");    // 虚拟访问节点

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 2、创建连接Connection
                connection = factory.newConnection("生产者");
                // 3、通过连接获取通道Channel
                channel = connection.createChannel();
                channel.basicConsume( queueName , true, new DeliverCallback() {
                    // 成功处理
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        // 获取队列中消息的数量
                        System.out.println(message.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    // 失败处理
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接受失败");
                    }
                });

                System.out.println(queueName+"-开始接受消息");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                if(channel !=null && channel.isOpen()){
                    try {
                        // 7、关闭连接
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if(connection!=null && connection.isOpen()){
                    try {
                        // 8、关闭通道
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable,"queue1").start();
        new Thread(runnable,"queue2").start();
        new Thread(runnable,"queue3").start();
    }
}

路由模式(routing)

        生产者

package com.any.rabbitmq.routing.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

/**
 * 路由模式
 * routing 模式
 * 生产者
 *
 * @author 15821
 * @date 20:20 2022/1/6
 */
public class Producer {

    public static void main(String[] args) {
        //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
        // ip port

        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("101.35.118.177");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取通道Channel
            channel = connection.createChannel();



            // 定义 队列名称
            String queueName = "queue1";


            // 定义交换机名称
            String exchangeName = "direct-exchange";

            // 定义交换机类型
            String exchangeType = "direct";

            // 定义 路由key
            String routingKey = "email";
            /**
             * @params1 队列的名称
             * @params2 是否要持久化durable=false 所有的持久化是否存盘,如果false 不持久化 ture 持久化
             *          非持久化会存盘吗? 会存盘,随着服务器的存盘会丢失
             * @params3 排他性,是否是独占队列
             * @params4 是否自动删除,随着最后一个消费者消费消息完毕,是否把队列自动删除
             * @params5 携带一些附加参数
             */
            // 4、通过通道交换机,队列,绑定关系,路由key,发送消息,和接收消息
            //channel.queueDeclare(queueName,true,false,true,null);
            // 5、准备消息
            String message = "hello,anyboot";
            // 6、发送消息给 queue
            /**
             * @params1 exchange 交换机
             * @params2 队列名称
             * @params3 props 消息状态控制
             * @params4 body 消息内容
             */
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }


    }

}

        消费者

package com.any.rabbitmq.routing.direct;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 路由模式
 * routing 模式
 * 费者
 *
 * @author 15821
 * @date 20:20 2022/1/6
 */
public class Consumer {

    public static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
            // ip port
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("101.35.118.177");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");    // 虚拟访问节点

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 2、创建连接Connection
                connection = factory.newConnection("生产者");
                // 3、通过连接获取通道Channel
                channel = connection.createChannel();
                channel.basicConsume( queueName , true, new DeliverCallback() {
                    // 成功处理
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        // 获取队列中消息的数量
                        System.out.println(message.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    // 失败处理
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接受失败");
                    }
                });

                System.out.println(queueName+"-开始接受消息");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                if(channel !=null && channel.isOpen()){
                    try {
                        // 7、关闭连接
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if(connection!=null && connection.isOpen()){
                    try {
                        // 8、关闭通道
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable,"queue1").start();
        new Thread(runnable,"queue2").start();
        new Thread(runnable,"queue3").start();
    }
}

主题模式(topics)

        生产者

package com.any.rabbitmq.routing.topics;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

/**
 * 主题模式
 * topics 模式
 * 生产者
 *
 * @author 15821
 * @date 20:20 2022/1/6
 */
public class Producer {

    public static void main(String[] args) {

        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("101.35.118.177");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取通道Channel
            channel = connection.createChannel();

            // 定义 队列名称
            String queueName = "queue1";

            // 定义交换机名称
            String exchangeName = "topic-exchange";

            // 定义交换机类型
            String exchangeType = "topic";
            // 定义 路由key
            String routingKey = "com.course.order";
            /**
             * @params1 队列的名称
             * @params2 是否要持久化durable=false 所有的持久化是否存盘,如果false 不持久化 ture 持久化
             *          非持久化会存盘吗? 会存盘,随着服务器的存盘会丢失
             * @params3 排他性,是否是独占队列
             * @params4 是否自动删除,随着最后一个消费者消费消息完毕,是否把队列自动删除
             * @params5 携带一些附加参数
             */
            // 4、通过通道交换机,队列,绑定关系,路由key,发送消息,和接收消息
            //channel.queueDeclare(queueName,true,false,true,null);
            // 5、准备消息
            String message = "hello,anyboot";
            // 6、发送消息给 queue
            /**
             * @params1 exchange 交换机
             * @params2 队列名称
             * @params3 props 消息状态控制
             * @params4 body 消息内容
             */
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }


    }

}

        消费者

package com.any.rabbitmq.routing.topics;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 主题模式
 * topics 模式
 * 消费者
 *
 * @author 15821
 * @date 20:20 2022/1/6
 */
public class Consumer {

    public static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
            // ip port
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("101.35.118.177");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");    // 虚拟访问节点

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 2、创建连接Connection
                connection = factory.newConnection("生产者");
                // 3、通过连接获取通道Channel
                channel = connection.createChannel();
                channel.basicConsume( queueName , true, new DeliverCallback() {
                    // 成功处理
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        // 获取队列中消息的数量
                        System.out.println(message.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    // 失败处理
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接受失败");
                    }
                });

                System.out.println(queueName+"-开始接受消息");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                if(channel !=null && channel.isOpen()){
                    try {
                        // 7、关闭连接
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if(connection!=null && connection.isOpen()){
                    try {
                        // 8、关闭通道
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable,"queue1").start();
        new Thread(runnable,"queue2").start();
        new Thread(runnable,"queue3").start();
    }
}

工作模式(Work queue)

 

        轮询模式

        生产者

package com.any.rabbitmq.work.polling;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

/**
 * work模式 轮询分发
 * 生产者
 *
 * @author 15821
 * @date 20:20 2022/1/6
 */
public class Producer {

    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("101.35.118.177");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取通道Channel
            channel = connection.createChannel();

            // 定义 队列名称
            String queueName = "queue8";

            // 4、发送消息给 queue
            for (int i = 0; i < 20; i++) {
                String message = "hello,work-rabbitmq - " + i;
                /**
                 * @params1 exchange 交换机
                 * @params2 队列名称
                 * @params3 props 消息状态控制
                 * @params4 body 消息内容
                 */
                channel.basicPublish("",queueName,null,message.getBytes());
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }


    }

}

       work1

package com.any.rabbitmq.work.polling;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * work 模式  轮询分发
 * 消费者
 *
 * @author 15821
 * @date 20:20 2022/1/6
 */
public class Work1 {

    public static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("101.35.118.177");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");    // 虚拟访问节点

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 2、创建连接Connection
                connection = factory.newConnection("生产者");
                // 3、通过连接获取通道Channel
                channel = connection.createChannel();
                channel.basicConsume( queueName , true, new DeliverCallback() {
                    // 成功处理
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        // 获取队列中消息的数量
                        System.out.println(message.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8"));
                        try {
                            TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }, new CancelCallback() {
                    // 失败处理
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接受失败");
                    }
                });

                System.out.println(queueName+"-开始接受消息");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                if(channel !=null && channel.isOpen()){
                    try {
                        // 7、关闭连接
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if(connection!=null && connection.isOpen()){
                    try {
                        // 8、关闭通道
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable,"queue8").start();
    }
}

         work2

package com.any.rabbitmq.work.polling;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * work 模式  轮询分发
 * 消费者
 *
 * @author 15821
 * @date 20:20 2022/1/6
 */
public class Work2 {

    public static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
            // ip port
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("101.35.118.177");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");    // 虚拟访问节点

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 2、创建连接Connection
                connection = factory.newConnection("生产者");
                // 3、通过连接获取通道Channel
                channel = connection.createChannel();
                //channel.
                channel.basicConsume( queueName , true, new DeliverCallback() {
                    // 成功处理
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        // 获取队列中消息的数量
                        System.out.println(message.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8"));
                        try {
                            TimeUnit.SECONDS.sleep(2);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }, new CancelCallback() {
                    // 失败处理
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接受失败");
                    }
                });

                System.out.println(queueName+"-开始接受消息");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                if(channel !=null && channel.isOpen()){
                    try {
                        // 7、关闭连接
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if(connection!=null && connection.isOpen()){
                    try {
                        // 8、关闭通道
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable,"queue8").start();
    }
}

        公平分配模式

package com.any.rabbitmq.work.fair;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

/**
 * work模式 公平分配
 * 生产者
 *
 * @author 15821
 * @date 20:20 2022/1/6
 */
public class Producer {

    public static void main(String[] args) {
        //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
        // ip port

        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("101.35.118.177");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");    // 虚拟访问节点

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、创建连接Connection
            connection = factory.newConnection("生产者");
            // 3、通过连接获取通道Channel
            channel = connection.createChannel();

            // 定义 队列名称
            String queueName = "queue8";

            // 4、发送消息给 queue
            for (int i = 0; i < 20; i++) {
                String message = "hello,work-rabbitmq - " + i;
                /**
                 * @params1 exchange 交换机
                 * @params2 队列名称
                 * @params3 props 消息状态控制
                 * @params4 body 消息内容
                 */
                channel.basicPublish("",queueName,null,message.getBytes());
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            if(channel !=null && channel.isOpen()){
                try {
                    // 7、关闭连接
                    channel.close();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null && connection.isOpen()){
                try {
                    // 8、关闭通道
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }


    }

}

        work1

package com.any.rabbitmq.work.fair;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * work 模式  公平分配
 * work 模式
 * 消费者
 *
 * @author 15821
 * @date 20:20 2022/1/6
 */
public class Work1 {

    public static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
            // ip port
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("101.35.118.177");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");    // 虚拟访问节点

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 2、创建连接Connection
                connection = factory.newConnection("生产者");
                // 3、通过连接获取通道Channel
                channel = connection.createChannel();
                Channel finalChannel = channel;
                // 确认指标,一次性处理多少条消息
                finalChannel.basicQos(1);

                finalChannel.basicConsume( queueName , false, new DeliverCallback() {
                    // 成功处理
                    @SneakyThrows
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        // 获取队列中消息的数量
                        System.out.println(message.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8"));
                        Thread.sleep(1000);
                        // 设置手动应答
                        finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false);

                    }
                }, new CancelCallback() {
                    // 失败处理
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接受失败");
                    }
                });

                System.out.println(queueName+"-开始接受消息");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                if(channel !=null && channel.isOpen()){
                    try {
                        // 7、关闭连接
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if(connection!=null && connection.isOpen()){
                    try {
                        // 8、关闭通道
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable,"queue8").start();
    }
}

         work2

package com.any.rabbitmq.work.fair;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * work 模式  公平分配
 * 消费者
 *
 * @author 15821
 * @date 20:20 2022/1/6
 */
public class Work2 {

    public static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp
            // ip port
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("101.35.118.177");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");    // 虚拟访问节点

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 2、创建连接Connection
                connection = factory.newConnection("生产者");
                // 3、通过连接获取通道Channel
                channel = connection.createChannel();
                Channel finalChannel = channel;
                finalChannel.basicQos(1);

                finalChannel.basicConsume( queueName , false, new DeliverCallback() {
                    // 成功处理
                    @SneakyThrows
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        // 获取队列中消息的数量
                        System.out.println(message.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8"));
                        Thread.sleep(2000);
                        // 设置手动应答
                        finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false);

                    }
                }, new CancelCallback() {
                    // 失败处理
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接受失败");
                    }
                });

                System.out.println(queueName+"-开始接受消息");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                if(channel !=null && channel.isOpen()){
                    try {
                        // 7、关闭连接
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if(connection!=null && connection.isOpen()){
                    try {
                        // 8、关闭通道
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable,"queue8").start();
    }
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ快速入门及六大模式 的相关文章

  • HJ71 字符串通配符

    描述 问题描述 xff1a 在计算机中 xff0c 通配符一种特殊语法 xff0c 广泛应用于文件搜索 数据库 正则表达式等领域 现要求各位实现字符串通配符的算法 要求 xff1a 实现如下2个通配符 xff1a xff1a 匹配0个或以上
  • 如何用Java实现判断一个链表是否有环

    设置一个快指针和一个慢指针 xff0c 快指针一次走两步 xff0c 慢指针一次走一步 如果该链表没有环 xff0c 快指针会先指向NULL xff0c 可据此判断链表没有环结构 xff1b 如果该链表有环 xff0c 则快指针肯定先进环
  • Arch Linux安装 2023-04-09

    除特殊场景外 下面所有的 34 34 后都是注释 准备 EFI分区不小于200mb iso版本 archlinux 2023 04 01 x86 64 使用分区工具预留出足够空间 gt 20G 标签为 未分配 安装 连接网络 rfkill
  • Spring Boot配置数据库链接池

    配置方法 基于当前的1 5 2 RELEASE的Spring Boot 依照官方文档 xff0c 如果增加了如下依赖的配置 xff0c 或者类路径中存在spring boot starter jdbc的jar xff0c 那么已默认启用了数
  • 一文搞懂Java中相对路径与绝对路径

    一文搞懂Java中相对路径与绝对路径 在java中路径分隔使用正斜杠 xff0c 不推荐使用反斜杠 xff08 因为反斜杠需要转义两个反斜杠表示一个正斜杠 xff09 在windows磁盘中用 反斜杠 表示路径的分隔在浏览器中用 正斜杠 来
  • 【向量的叉乘】

    一 二维向量叉乘公式 xff1a a xff08 x1 xff0c y1 xff09 xff0c b xff08 x2 xff0c y2 xff09 xff0c 则a b 61 xff08 x1y2 x2y1 xff09 二 a b 61
  • JAVA常用类

    Object类 Object类是类层次结构的根 xff0c 每个类都可以将Object作为超类 所有类都直接或者间接的继承该类 Object只有无参构造方法 Math类 包含执行基本数学运算的方法 Random类 伪随机数 java uta
  • Collections类 [Java]

    Collections工具类 Collections是一个操作Collection集合和Map集合的工具类 Collections不仅仅是操作Collection集合 还可以操作Map集合 Collection和Collections有什么
  • 我阿里P7了解到的Android面试的一些小内幕!已拿offer

    前言 这些题目是网友去百度 小米 乐视 美团 58 猎豹 360 新浪 搜狐等一线互联网公司面试被问到的题目 熟悉本文中列出的知识点会大大增加通过前两轮技术面试的几率 欢迎一线公司员工以及网友提交面试题库 xff0c 欢迎留言 网上的都是按
  • 7月编程排行榜来啦!这次有何新变化?

    每月编程排行榜可能会迟到 xff0c 但永远不缺席 7月的编程排行榜已出 xff0c 接下来一起看看有哪些看点吧 Tiobe编程排行榜前20名 Tiobe编程排行榜Top 10趋势 TIOBE Index编程社区指数是编程语言流行度的一个指
  • 操作系统 记录型信号量实现生产者消费者问题(完整代码)

    问题描述 用信号量模拟生产者 消费者问题的过程 生产者和消费者两个线程共享同一个缓冲区 xff0c 生产者不断向缓冲区中添加产品 xff0c 消费者从缓冲区中消费产品 要保证缓冲区满了之后生产者不能再继续添加产品 xff0c 需要等消费者至
  • 制版经验分享—使用AD18

    文章目录 前言一 封装二 走线三 注意细节四 制版流程五 制版细节总结 前言 在做一些培训题目时 xff0c 由于时间有限制 xff0c 在外面开板会花费好几天的制作和快递时间 xff0c 所以有时候就需要自己制版 xff0c 在这里我记录
  • Java打印九九乘法表

    1 使用双重for循环打印九九乘法表 Java源代码如下 xff1a for int i 61 0 i lt 61 9 i 43 43 for int j 61 1 j lt 61 i j 43 43 System out print i
  • 解决selenium打开Chrome浏览器自动退出的问题

    好不容易安装好selenium和对应的浏览器驱动器后终于可以运行程序了 xff0c 结果发现一运行程序后浏览器打开就自动退出了 xff0c 但是我在Python代码中并没有写driver quit 方法 xff0c 上网查了查发现原来是我的
  • 在Java应用中嵌入sshd服务

    这个应用需要依赖apache mina的子项目sshd xff0c 项目主页http mina apache org sshd project index html xff0c 当前版本号为0 8 0 这里的sshd和Linux下的sshd
  • openssl开发库安装时的踩坑指南

    序 前几天用linux编译一个提权脚本的时候报错 openssl opensslv h 没有那个文件或目录 的问题 无论如何也解决不了 xff0c 这下我记录一个踩坑指南防止下一个人掉进坑里 操作 总体介绍 首先介绍一下 xff0c 这个报
  • 性能测试脚本用例【模板】

    产品名称Product name 密级Confidentiality level 秘密 产品版本Product version Total 12pages 共12页 性能测试脚本用例 仅供内部使用 拟制 日期 xff1a 审核 日期 xff
  • Java常见的集合类

    我们常见的Java集合类有List Set Map List 1 接口可以被继承 2 接口可以被多次实现 3 List和ArrayList package List import java util ArrayList import jav
  • WIN7我的电脑右键管理打不开

    问题现象 xff1a 我的电脑右键点击管理无法正常打开 xff0c 会弹出下面的报错信息 首先打开注册表 xff0c 打开运行 xff0c 输入regedit 选择路径 xff1a HKEY LOCAL MACHINE SOFTWARE C
  • LIKE的用法

    我们来谈谈关于like运算符的理解 xff1a 下面是like的语法 xff0c 以后使用到like运算符的都必须根据这个语法使用 LIKE 运算符是用来匹配通配符指定模式的文本值 如果搜索表达式与模式表达式匹配 xff0c LIKE 运算

随机推荐