SpringBoot+ActiveMQ-点对点队列模式(消费端)

2023-11-17

ActiveMQ消息中间件的点对点模式point to point 消息队列

生产端案例(配合消费端测试):SpringBoot+ActiveMQ生产端

ActiveMQ版本:apache-activemq-5.16.5

SpringBoot集成ActiveMQ消费端的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>boot.example.queue.customer</groupId>
    <artifactId>boot-example-queue-demo-customer-2.0.5</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>boot-example-queue-demo-customer-2.0.5</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>boot.example.demo.entity</groupId>
            <artifactId>boot-example-demo-entity-2.0.5</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- activeMQ依赖组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- spring.activemq.pool.enabled=true -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.16.5</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>com.github.xiaoymin</groupId>
            <artifactId>swagger-bootstrap-ui</artifactId>
            <version>1.9.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 打包成一个可执行jar -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

application.properties


server.port=8042


spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.in-memory=false
spring.activemq.packages.trust-all=true
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=6
spring.activemq.pool.idle-timeout=30000
spring.activemq.pool.expire-timeout=0
spring.jms.pub-sub-domain=false

启动类AppQueueCustomer

package boot.example.queue.customer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;

/**
 * 蚂蚁舞
 */
@SpringBootApplication
/**
 * 启动消息队列
 */
@EnableJms
public class AppQueueCustomer {
    public static void main( String[] args ) {
        SpringApplication.run(AppQueueCustomer.class, args);
        System.out.println( "Hello World!" );
    }
    

}

配置一个queue ActiveMQQueueConfig

package boot.example.queue.customer.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.ConnectionFactory;

/**
 * 蚂蚁舞
 */
@Configuration
public class ActiveMQQueueConfig {

    // queue模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(activeMQConnectionFactory);
        //  启用queue 不启用topic
        bean.setPubSubDomain(false);
        return bean;
    }


}

消息消费端的queue

ActiveMQConstant

package boot.example.queue.customer.config;

/**
 *  蚂蚁舞
 */

import boot.example.queue.entity.BootProvider;

import java.util.LinkedList;
import java.util.List;


public class ActiveMQConstant {

    //  默认queue
    public static final String defaultQueue = "myw_queue";

    //  指定queue
    public static final String stringQueue = "stringQueue";

    //  指定list<String>
    public static final String stringListQueue = "stringListQueue";

    //  指定Object
    public static final String objQueue = "objQueue";

    //  指定List<Object>
    public static final String objListQueue = "objListQueue";

    //  简单存储默认queue消费端收到的消息
    public static List<String> defaultList = new LinkedList<>();

    public static List<String> stringQueueList = new LinkedList<>();

    public static List<List<String>> stringListQueueList = new LinkedList<>();

    public static List<BootProvider> objQueueList = new LinkedList<>();

    public static List<List<BootProvider>> objListQueueList = new LinkedList<>();


}

DefaultQueueConsumerService

package boot.example.queue.customer.service;

import boot.example.queue.customer.config.ActiveMQConstant;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import javax.jms.JMSException;
import javax.jms.TextMessage;

/**
 *  蚂蚁舞
 */
@Service
public class DefaultQueueConsumerService {

    @JmsListener(destination = ActiveMQConstant.defaultQueue)
    public void message(TextMessage textMessage) throws JMSException {
        if(textMessage == null || textMessage.getText() == null){
            return;
        }
        System.out.println("默认消费者:"+textMessage.getText());
        ActiveMQConstant.defaultList.add(textMessage.getText());
    }


}

AQueueConsumerService

package boot.example.queue.customer.service;

import boot.example.queue.customer.config.ActiveMQConstant;
import boot.example.queue.entity.BootProvider;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

import javax.jms.ObjectMessage;
import java.util.List;

/**
 * 蚂蚁舞
 */
@Service
public class AQueueConsumerService {

    @JmsListener(destination = ActiveMQConstant.stringQueue, containerFactory = "jmsListenerContainerQueue")
    public void receiveStringQueue(String msg) {
        if(msg == null){return;}
        System.out.println("A接收到消息...." + msg);
        ActiveMQConstant.stringQueueList.add(msg);
    }

    @JmsListener(destination = ActiveMQConstant.stringListQueue, containerFactory = "jmsListenerContainerQueue")
    public void receiveStringListQueue(List<String> list) {
        if(list.isEmpty()){return;}
        System.out.println("A接收到集合队列消息...." + list);
        ActiveMQConstant.stringListQueueList.add(list);
    }

    @JmsListener(destination = ActiveMQConstant.objQueue, containerFactory = "jmsListenerContainerQueue")
    public void receiveObjQueue(ObjectMessage objectMessage) throws Exception {
        if(objectMessage == null || objectMessage.getObject() == null){return;}
        System.out.println("A接收到对象队列消息...." + objectMessage.getObject());
        BootProvider bootProvider = (BootProvider) objectMessage.getObject();
        ActiveMQConstant.objQueueList.add(bootProvider);
    }

    @SuppressWarnings("unchecked")
    @JmsListener(destination = ActiveMQConstant.objListQueue, containerFactory = "jmsListenerContainerQueue")
    public void receiveObjListQueue(ObjectMessage objectMessage) throws Exception {
        if(objectMessage == null || objectMessage.getObject() == null){return;}
        System.out.println("A接收到的对象队列消息..." + objectMessage.getObject());
        List<BootProvider> list = (List<BootProvider>) objectMessage.getObject();
        ActiveMQConstant.objListQueueList.add(list);
    }


}

BQueueConsumerService

package boot.example.queue.customer.service;

import boot.example.queue.customer.config.ActiveMQConstant;
import boot.example.queue.entity.BootProvider;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import java.util.List;

/**
 *  蚂蚁舞
 */
@Service
public class BQueueConsumerService {

    @JmsListener(destination = ActiveMQConstant.stringQueue, containerFactory = "jmsListenerContainerQueue")
    public void receiveStringQueue(TextMessage textMessage) throws JMSException {
        if(textMessage == null || textMessage.getText() == null){return;}
        System.out.println("B接收到消息...." + textMessage.getText());
        ActiveMQConstant.stringQueueList.add(textMessage.getText());
    }

    @JmsListener(destination = ActiveMQConstant.stringListQueue, containerFactory = "jmsListenerContainerQueue")
    public void receiveStringListQueue(List<String> list) {
        if(list.isEmpty()){return;}
        System.out.println("B接收到集合队列消息...." + list);
        ActiveMQConstant.stringListQueueList.add(list);
    }

    @JmsListener(destination = ActiveMQConstant.objQueue, containerFactory = "jmsListenerContainerQueue")
    public void receiveObjQueue(ObjectMessage objectMessage) throws Exception {
        if(objectMessage == null || objectMessage.getObject() == null){return;}
        System.out.println("B接收到对象队列消息...." + objectMessage.getObject());
        BootProvider bootProvider = (BootProvider) objectMessage.getObject();
        ActiveMQConstant.objQueueList.add(bootProvider);
    }

    @SuppressWarnings("unchecked")
    @JmsListener(destination = ActiveMQConstant.objListQueue, containerFactory = "jmsListenerContainerQueue")
    public void receiveObjListQueue(ObjectMessage objectMessage) throws Exception {
        if(objectMessage == null || objectMessage.getObject() == null){return;}
        System.out.println("B接收到的对象队列消息..." + objectMessage.getObject());
        List<BootProvider> list = (List<BootProvider>) objectMessage.getObject();
        ActiveMQConstant.objListQueueList.add(list);
    }
    
    


}

SwaggerConfig UI使用

package boot.example.queue.customer.config;

import com.google.common.base.Predicates;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

/**
 *  蚂蚁舞
 */
@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Bean
    public Docket createRestApi(){
        return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()
                .apis(RequestHandlerSelectors.any()).paths(PathSelectors.any())
                .paths(Predicates.not(PathSelectors.regex("/error.*")))
                .paths(PathSelectors.regex("/.*"))
                .build().apiInfo(apiInfo());
    }

    private ApiInfo apiInfo(){
        return new ApiInfoBuilder()
                .title("demo")
                .description("demo接口")
                .version("0.01")
                .build();
    }

    /**
     * http://localhost:XXXX/doc.html  地址和端口根据实际项目查看
     */


}

BootDefaultQueueCustomerController

package boot.example.queue.customer.controller;

import boot.example.queue.customer.config.ActiveMQConstant;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**
 * 蚂蚁舞
 */
@RestController
@RequestMapping(value="/customer")
public class BootDefaultQueueCustomerController {

    @GetMapping(value="/myw_queue")
    public List<String> myw_queue() {
        return ActiveMQConstant.defaultList;
    }

}

BootQueueCustomerController

package boot.example.queue.customer.controller;

import boot.example.queue.customer.config.ActiveMQConstant;
import boot.example.queue.entity.BootProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**
 * 蚂蚁舞
 */
@RestController
@RequestMapping(value="/customer")
public class BootQueueCustomerController {

    @GetMapping(value="/stringQueue")
    public List<String> stringQueue() {
        return ActiveMQConstant.stringQueueList;
    }

    @GetMapping(value="/stringListQueue")
    public List<List<String>> stringListQueue() {
        return ActiveMQConstant.stringListQueueList;
    }

    @GetMapping(value="/objQueueList")
    public List<BootProvider> objQueueList() {
        return ActiveMQConstant.objQueueList;
    }

    @GetMapping(value="/objListQueueList")
    public List<List<BootProvider>> objListQueueList() {
        return ActiveMQConstant.objListQueueList;
    }


}

测试使用的对象BootProvider

package boot.example.queue.entity;

import java.io.Serializable;
import java.util.Date;

/**
 *  用在activeMq消息,必须保证package一致,不然序列化后反序列化要出错
 *  蚂蚁舞
 */
public class BootProvider implements Serializable {

    private int id;

    private String name;

    private Date date = new Date();

    public BootProvider() {
    }

    public BootProvider(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Date getDate() {
        return date;
    }

    public void setDate(Date date) {
        this.date = date;
    }

    @Override
    public String toString() {
        return "BootProvider{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", date=" + date +
                '}';
    }
}

结构目录

├─boot-example-demo-entity-2.0.5
│  │  pom.xml
│  │  
│  ├─src
│  │  └─main
│  │      └─java
│  │          └─boot
│  │              └─example
│  │                  └─queue
│  │                      └─entity
│  │                              BootProvider.java
├─boot-example-queue-demo-customer-2.0.5
│  │  pom.xml
│  │  
│  ├─src
│  │  ├─main
│  │  │  ├─java
│  │  │  │  └─boot
│  │  │  │      └─example
│  │  │  │          └─queue
│  │  │  │              └─customer
│  │  │  │                  │  AppQueueCustomer.java
│  │  │  │                  │  
│  │  │  │                  ├─config
│  │  │  │                  │      ActiveMQConstant.java
│  │  │  │                  │      ActiveMQQueueConfig.java
│  │  │  │                  │      SwaggerConfig.java
│  │  │  │                  │      
│  │  │  │                  ├─controller
│  │  │  │                  │      BootDefaultQueueCustomerController.java
│  │  │  │                  │      BootQueueCustomerController.java
│  │  │  │                  │      
│  │  │  │                  └─service
│  │  │  │                          AQueueConsumerService.java
│  │  │  │                          BQueueConsumerService.java
│  │  │  │                          DefaultQueueConsumerService.java
│  │  │  │                          
│  │  │  └─resources
│  │  │          application.properties
│  │  │          
│  │  └─test
│  │      └─java
│  │          └─boot
│  │              └─example
│  │                  └─queue
│  │                      └─customer
│  │                              AppQueueCustomerTest.java
│  │                              

启动后访问(ActiveMQ必须提前启动)

http://localhost:8042/doc.html

现在进行测试,在生产端程序发送数据,可以看到消费端收到了数据并临时保存在列表里

还可以使用List<String> 或者 List<Object> 或者直接java对象

测试下对象

消费端接收的数据

另外可以在控制台看到

A和B两个接收监听都收到了数据,看着像是轮询消费的,ActiveMQ支持存在多个消费者,但是对一个消费者而言,只会有一个消费者可以消费,其他的消费者便不能消费此消息

还有ActiveMQ默认开启持久化的,数据在安装目录的data目录下面

源码案例地址: SpringBoot+ActiveMQ源码案例

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SpringBoot+ActiveMQ-点对点队列模式(消费端) 的相关文章

随机推荐

  • Linux 线程同步

    文章目录 一 线程同步介绍 同步与互斥概述 线程同步问题 二 互斥锁 为什么需要互斥锁 互斥锁 Mutex 介绍 互斥锁相关 API 死锁 DeadLock 三 读写锁 读写锁概述 读写锁相关 API 四 生产者与消费者模型 五 条件变量
  • linux最佳线程数

    最佳线程数 性能压测的情况下 起初随着用户数的增加 QPS会上升 当到了一定的阀值之后 用户数量增加QPS并不会增加 或者增加不明显 同时请求的响应时间却大幅增加 这个阀值我们认为是最佳线程数 为什么要找最佳线程数 1 过多的线程只会造成
  • 算法设计与分析: 6-3 最小权顶点覆盖问题

    6 3 最小权顶点覆盖问题 问题描述 给定一个赋权无向图 G V E 每个顶点 v V 都有一个权值 w v 如果 U V U V U subseteq V 且对任意 u v E 有 u U 或 v U 就称 U 为图 G 的一个顶点覆盖
  • Pycharm程序调试(Debug+断点)

    主要思路 利用断点 注 打断点之后 程序运行到断点的哪一行处 但此行并未执行 调试的过程分为三步 第一步 在你想要调试的地方 打上断点 第二步 使用调试模式来运行这个 python 程序 第三步 使用各种手段开始代码调试 一 图文教程 1
  • Tomcat架构解析以及设计借鉴

    Tomcat 发展这么多年 已经比较成熟稳定 在如今 追新求快 的时代 Tomcat 作为 Java Web 开发必备的工具似乎变成了 熟悉的陌生人 难道说如今就没有必要深入学习它了么 学习它我们又有什么收获呢 静下心来 细细品味经典的开源
  • 开源软件大集合

    http a note sourceforge net A Note 4 2 1 可在Windows桌面放置便笺 并可提供闹钟提醒功能 http www xs4all nl edienskeAbakt 0 9 能够以压缩方式对文档进行备份
  • llama.cpp LLM模型 windows cpu安装部署;运行LLaMA2模型测试

    参考 https www listera top ji xu zhe teng xia chinese llama alpaca https blog csdn net qq 38238956 article details 1301135
  • 设置linux-kali 2022语言为中文(保姆级图文)

    目录 友情提示 1 打开终端 2 打开设置 3 修改设置 4 重启生效设置 总结 欢迎关注 网络工程专业 系列 持续更新中 欢迎关注 网络工程专业 系列 持续更新中 在安装完 kali linux2022 时 操作系统默认语言为英文 初学者
  • 【论文阅读-TPAMI2021】Curriculum Learning(课程学习)综述

    简介 Curriculum learning CL 课程学习 是一种模型训练策略 通过先让模型学习简单数据后再学习困难数据的方式模拟学生进行课程学习的场景 通用的课程学习框架为Difficulty Measurer 困难程度评估 Train
  • 不懂Python装饰器,你敢说会Python?

    对于Python学习者 一旦过了入门阶段 你几乎一定会用到Python的装饰器 它经常使用在很多地方 比如Web开发 日志处理 性能搜集 权限控制等 还有一个极其重要的地方 那就是面试的时候 对 装饰器是面试中最常见的问题之一 实战入门 抛
  • webpack从入门到放弃(二:基本属性)

    本节介绍webpack五大核心概念 一 entry 入口 指示 Webpack 从哪个文件开始打包 webpack是根据依赖关系进行打包 以入口文件为起点 根据依赖关系形成依赖树 在生产模式打包时 根据tree shaking未引用的文件不
  • 8259初始化命令字(ICW1-ICW4)

    8259A的中断操作功能很强 包括中断的请求 屏蔽 排队 结束 级联以及提供中断类型号和查询等操作 并且其操作的方式又有不同 它既能实现向量中断 又能进行中断查询 它可以用于16位机 也可用于8位机 因此 使用起来感到复杂且不好掌握 为此
  • idea 提交远程库冲突解决

    idea 提交远程库冲突解决 github团队协作 正常开发 管理得好的话 不会出现代码冲突问题 项目经理会划分模块 每个小组成员各自开发模块 公共的代码由专门的人负责维护 但是偶尔管理沟通问题导致出现冲突偶尔也是会出现的 冲突出现场景过程
  • Visual Studio Code中英文的切换

    我在学习 Flutter 的时候 使用过 VsCode 来开发 一般来说 安装好的 VsCode 都是英文版的 有些人可能不太习惯用英文版的 不过没有关系 我在这里提供中英文切换的方法给大家 切换为中文 1 点击 1 中的选项 在 2 中的
  • java栈溢出现象_JVM源码分析之栈溢出完全解读

    概述 之所以想写这篇文章 其实是因为最近有不少系统出现了栈溢出导致进程crash的问题 并且很隐蔽 根本原因还得借助coredump才能分析出来 于是想从JVM实现的角度来全面分析下栈溢出的这类问题 或许你碰到过如下的场景 日志里出现了St
  • python uiautomator2 init 作用

    目录 背景 具体步骤 总结 背景 在搭建Python的自动化测试环境时 执行脚本一直报错 ions GatewayError Uiautomator started failed
  • (Linux)docker容器安装Kibana--简单安装

    docker容器安装Kibana docker容器安装Kibana 启动Kibana docker容器安装Kibana 使用docker命令安装Kibana容器 sudo docker pull kibana 7 4 2 等待下载安装完成
  • matlab 2022更新

    matlab 2022b dictionary 对象 将唯一键映射到值以便快速查找 深度学习工具箱 直接导入 PyTorch 模型 将模型导出到 TensorFlow Simulink 将库浏览器停靠在模型中 MATLAB NET 引擎 A
  • Bootstrap JavaScript插件:警告信息 (alert.js)

    作者 WangMin 格言 努力做好自己喜欢的每一件事 CSDN原创文章 博客地址 WangMin 警告框插件 alert js 为警告框组件一个关闭功能 就是点击警告框的关闭按钮 可以让警告框消失 并将它从 DOM 中删除 使用方法 1
  • SpringBoot+ActiveMQ-点对点队列模式(消费端)

    ActiveMQ消息中间件的点对点模式point to point 消息队列 生产端案例 配合消费端测试 SpringBoot ActiveMQ生产端 ActiveMQ版本 apache activemq 5 16 5 SpringBoot