Springboot中配置activeMQ持久化

2023-11-18

一、activeMQ数据库持久化配置

ActiveMQ持久化的三种方式,我们采用数据库的方式来进行持久化。

(1) Memory 消息存储-基于内存的消息存储。
(2) 基于日志消息存储方式,KahaDB是ActiveMQ的默认日志存储方式,它提供了容量的提升和恢复
能力。
(3) 基于JDBC的消息存储方式-数据存储于数据库(例如:MySQL)中。

首先我们先来配置activeMQ

在conf文件夹里的activeMQ.xml中增加一个jdbc的bean

<bean id="activemq-db" class="org.apache.commons.dbcp.BasicDataSource">
                      <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
                      <property name="url" value="jdbc:mysql://127.0.0.1:3306/test"/>
                      <property name="username" value="root"/>
                      <property name="password" value="root"/>
                      <property name="maxActive" value="200"/>
                      <property name="poolPreparedStatements" value="true"/>
                    </bean>

 然后还是同一个文件,找到<persistenceAdapter>标签修改:注释掉原来的kahaDB方式持久化。

 <persistenceAdapter>
       <!--     <kahaDB directory="${activemq.base}/data/kahadb"/>  -->
<!--createTablesOnStartup    启动是否创建表  第一次为true 后续为false-->
<jdbcPersistenceAdapter dataSource="#activemq-db" createTablesOnStartup="true" />
        </persistenceAdapter>

 接着需要将一些jar包放在lib目录里。因为我们用的是连接池和mysql,所以要导入四个jar包

 配置好的MQ下载地址:https://download.csdn.net/download/qq_39404258/12561459

如果启动失败,报下面这样的错,说明你的mysql和mysql连接的jar包不匹配:

Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'OPTION SQL_SELECT_LIMIT=DEFAULT' at line 1

我用的mysql是5.6,用的mysql-connector是5.0.8,出现了这样的错,将mysql-connector改成了5.1.21就正常运行了。版本以自己的实际为主。

二、ActiveMQ的两种模型

点对点模型 queue

每个消息只有一个消费者( Consumer)(即一旦被消费,消息就不再在消息队列中);
发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有
正在运行,它不会影响到消息被发送到队列;
接收者在成功接收消息之后需向队列应答成功。

发布/订阅模型 topic

每个消息可以有多个消费者;
发布者和订阅者之间有时间上的依赖性(先订阅主题,再来发送消息)。
订阅者必须保持运行的状态,才能接受发布者发布的消息

 

使用jms原生消息进行示例:

生产者:

public class ProducerJMS {
    public static String TCP_PATH="tcp://127.0.0.1:61616";
    public static void main(String[] args) {
        try {
            //1.创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory();
            //2.创建连接
            Connection connection=factory.createConnection();
            //3.打开连接
            connection.start();
            //4.创建session param1=事务是否开启 param2=消息确认机制  如果开启事务,第二个参数无用,且需要一个提交事务的操作
            Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            //5.创建消息队列
            Topic topic =session.createTopic("q2");
            Queue queue=session.createQueue("q1");
            MessageProducer producer1=session.createProducer(topic);
            //6.创建生产者
            MessageProducer producer=session.createProducer(queue);

            //7.创建消息
            TextMessage textMessage=session.createTextMessage("new");
            TextMessage textMessage1=session.createTextMessage("new1");
            TextMessage textMessage2=session.createTextMessage("new2");
            TextMessage top1=session.createTextMessage("top");
            TextMessage top2=session.createTextMessage("top1");
            TextMessage top3=session.createTextMessage("top2");
            //8.发送消息到消息队列
            producer.send(textMessage);
            producer.send(textMessage1);
            producer.send(textMessage2);
            producer1.send(top2);
            producer1.send(top3);
            producer1.send(top1);
            //9.关闭连接
            session.close();
            connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }


    }
}

消费者:

public class ConsumerJMS {
    public static String URL_PATH="tcp://127.0.0.1:61616";
    public static void main(String[] args) {
        try{
            //1.创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory(URL_PATH);
            //2.创建连接
            Connection connection = factory.createConnection();
            //3.打开连接
            connection.start();
            //4.创建会话
            Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            //5.创建目标地址(通道)
            Queue queue =session.createQueue("q1");
            Topic topic = session .createTopic("q2");
            //6.创建消费者
            MessageConsumer consumer1=session.createConsumer(topic);
            MessageConsumer consumer=session.createConsumer(queue);
            consumer1.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if (message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("接收的消息:"+textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }else {
                        System.out.println("类型错误!");
                    }
                }
            });
            //7.接收消息,因为在一直监听,当有新消息来时,获取,所以连接不能关
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if (message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("接收的消息:"+textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }else {
                        System.out.println("类型错误!");
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }

    }



}

 启动两个消费者,然后启动生产者,结果:

通过结果发现,queue的消息随机分配给了两个消费者,但topic的消息同时发给了他们两个。
接着先启动生产者,再启动消费者,发现只收到了queue里的消息。

三、在springboot中简单使用

 

yml配置:

 

 

server:
  port: 9081
spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
    packages:
      trust-all: true  #信任包
  jms:
    pub-sub-domain: false #false点对点 true订阅
    template:
      delivery-mode: persistent #持久化
activemq:
  name: spring_queue

maven依赖:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

五种消息的使用实例:

生产者:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes= ProducerApplication.class)
public class Producer {
    @Value("${activemq.name}")
    private String messageName;
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Autowired
    private JmsTemplate jmsTemplate;

    @Test
    public void ptpSend() {
        jmsMessagingTemplate.convertAndSend("spring_queue", "new message");
    }

    @Test
    public void ptpSendTextMessage() {
        jmsTemplate.send(messageName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("testMessage");
            }
        });
    }
    @Test
    public void ptpSendMapMessage() {
        jmsTemplate.send(messageName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                MapMessage mapMessage =session.createMapMessage();
                mapMessage.setString("name","zz");
                mapMessage.setInt("age",1);
                return mapMessage;
            }
        });
    }
    @Test
    public void ptpSendUserMessage() {
        jmsTemplate.send(messageName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                User user= new User("zhangsan",11);
                return session.createObjectMessage(user);
            }
        });
    }
    @Test
    public void ptpSendByteMessage() {
        jmsTemplate.send(messageName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                File file =new File("D:\\img\\30.png");
               BytesMessage bytesMessage = session.createBytesMessage();
                try {
                    FileInputStream in = new FileInputStream(file);
                    byte[] bytes = new byte[(int)file.length()];
                    //将Stream里的内容输入到bytes里
                    in.read(bytes);
                    bytesMessage.writeBytes(bytes);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return bytesMessage;
            }
        });
    }
    @Test
    public void ptpSendStreamMessage() {
        jmsTemplate.send(messageName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
              StreamMessage streamMessage =  session.createStreamMessage();
              streamMessage.writeDouble(2.2);
              streamMessage.writeString("Stream");
                return streamMessage;
            }
        });
    }
}

消费者:

@Component
public class Consumer {

    @JmsListener(destination = "${activemq.name}")
    public void reveiced(Message message){
        if (message instanceof TextMessage){
          TextMessage textMessage=(TextMessage) message;
            try {
                System.out.println("接收消息:"+textMessage.getText());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }else if (message instanceof MapMessage){
            MapMessage mapMessage=(MapMessage) message;
            try {
                System.out.println("接收Map消息:"+mapMessage.getString("name")+","+mapMessage.getString("age"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        else if (message instanceof ObjectMessage){
            ObjectMessage objectMessage=(ObjectMessage) message;
            try {
                User user = (User) objectMessage.getObject();
                System.out.println("接收Object消息:"+user.getName()+","+user.getAge());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        else if (message instanceof BytesMessage){   
            BytesMessage bytesMessage=(BytesMessage) message;
            try {
                FileOutputStream fileOutputStream = new FileOutputStream("D:\\img1\\1.png");
                byte[] bytes = new byte[(int)bytesMessage.getBodyLength()];
                bytesMessage.readBytes(bytes);
                fileOutputStream.write(bytes);
                fileOutputStream.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        else if (message instanceof StreamMessage){
            StreamMessage streamMessage=(StreamMessage) message;
            try {
                System.out.println("接收Stream消息:"+streamMessage.readDouble());
                System.out.println("接收Stream消息:"+streamMessage.readString());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Springboot中配置activeMQ持久化 的相关文章

随机推荐

  • Linux系统Could not connect to '192.168.128.XXX' (port 22): Connection failed.

    网上查找了方法 都是 查看防火墙 service iptables status 关闭防火墙 service iptables stop 打开防火墙 service iptables start 打开sshd service sshd st
  • zigbee串口打印无法显示或乱码解决方法

    zigbee串口打印问题 在终端节点向协调器发数据时 在数据流中伴有该节点的网络地址 而有时通过PC端无法打印出来 1 可能是串口软件的问题 用过好几个串口软件 有时会出现停止 或乱码行为 在经过多次换用软件 发现 SecureCRT 这个
  • uni-app watch事件监听三种用法

    1 普通监听 无法监听到第一次绑定的变化
  • stm32----用状态机判断单双击

    一 什么是状态机 状态机 State Machine 是一种用于描述系统状态和状态之间转换关系的数学模型 说白了就是做出某个动作之后变成什么样的状态 比如stm32中的按键控制开关灯 假设灯一开始是关的 当我们按下按键之后 灯亮了 这相当于
  • REST API 设计最佳实践:如何正确使用 HTTP 状态码?

    总的来说 HTTP协议出现以来Web服务也就存在了 但是 自从云计算出现后 才成为实现客户端与服务和数据交互的普遍方法 作为一名开发者 我很幸运能够在工作中使用一些仍然存在的SOAP服务 但是 我主要接触的是REST 这是一种基于资源的AP
  • dev express for asp.net 如何更换主题

    1 如下图点击菜单 2 选择主题然后按 Update Project
  • Python——协程(Coroutine),异步IO

    目录 生成器 Generator yield表达式的使用 生产者和消费者模型 编辑 yield from表达式 协程 Coroutine asyncio coroutine async await 总结 由于GIL的存在 导致Python多
  • Java FileReader

    Java FileReader Java FileReader Java FileReader class is part of java io package Java FileReader类是java io软件包的一部分 The Fil
  • Android自动化测试,5个必备的测试框架

    Appium Appium是一个开源的移动测试工具 支持iOS和Android 它可以用来测试任何类型的移动应用 原生 网络和混合 作为一个跨平台的工具 你可以在不同的平台上运行相同的测试 为了实现跨平台的功能 Appium使用了供应商提供
  • python 抖音采集_python爬取抖音视频的实例详解

    import requests import json import re import os from pprint import pprint as pp import queue class DouYin header accept
  • 全书简介和作者寄语

    巨硬的NumPy 巨硬的NumPy 教程包括两部分 从小白到入门 和 从入门到熟练 从小白到入门 旨在帮助没有基础的同学快速掌握 numpy 的常用功能 保证日常绝大多数场景的使用 从入门到熟练 目的是帮助有进一步需要的同学对 numpy
  • (六)Kubernetes - 手动部署(二进制方式安装)

    Kubernetes 手动部署 5 1 部署Nginx Keepalived高可用负载均衡器 1 1 安装软件包 Master1 Master2 1 2 Nginx配置文件 主备相同 1 3 keepalived配置文件 Master1 1
  • mysql jpa 不要自动建表,如何让Hibernate在与JPA一起使用时自动在数据库中创建表?...

    I am new to JPA And for now I am trying to understand standard examples I was reading online and saw a few stackoverflow
  • vue面试题汇总

    HTML篇 CSS篇 JS篇 TypeScript篇 React篇 微信小程序篇 前端面试题汇总大全 含答案超详细 HTML JS CSS汇总篇 持续更新 前端面试题汇总大全二 含答案超详细 Vue TypeScript React 微信小
  • pytorch---之item()

    torch Tensor item 坑 注意只能是一个值 适合返回loss acc
  • 设计模式概念学习

    文章目录 创建类型 单例模式 饿汉 懒汉 openbmc项目实际应用 工厂方法 简单工厂方法 openbmc项目实际应用 抽象工厂 以下为学习时对各种设计模式的简单理解 还没有深入学习和实际应用 推荐1个 很棒的网站学习设计模式 每个模式都
  • 01-windows下python爬取网页上的图片

    1 首先下载python 安装环境 pycharm anaconda的下载与安装 移步各个主页下载 一键式安装 pycharm http www jetbrains com pycharm anaconda https www anacon
  • 什么是算子?

    什么是算子 在知道什么是算子之前我们还需要知道一些其他的相关概念 大概来说 算子是一个函数空间到函数空间上的映射O X X 广义上的算子可以推广到任何空间 如内积空间等 映射 从一个拓扑空间到另一个拓扑空间的对应关系 对于每一个x 都有唯一
  • 【应用层2】Http协议

    一 简介 Http 即超文本传输协议 一种建立在 TCP 上的无状态连接 属于应用层协议 http传输的内容都是明文的 不安全的 Http 协议用于客户端与服务器端之间的通信 它规定了客户端与服务端之间的通信格式 包括请求和响应的格式 Ht
  • Springboot中配置activeMQ持久化

    一 activeMQ数据库持久化配置 ActiveMQ持久化的三种方式 我们采用数据库的方式来进行持久化 1 Memory 消息存储 基于内存的消息存储 2 基于日志消息存储方式 KahaDB是ActiveMQ的默认日志存储方式 它提供了容