ActiveMQ消息中间件的点对点模式point to point 消息队列
生产端案例(配合消费端测试):SpringBoot+ActiveMQ生产端
ActiveMQ版本:apache-activemq-5.16.5
SpringBoot集成ActiveMQ消费端的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>boot.example.queue.customer</groupId>
<artifactId>boot-example-queue-demo-customer-2.0.5</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot-example-queue-demo-customer-2.0.5</name>
<description>Demo project for Spring Boot</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>boot.example.demo.entity</groupId>
<artifactId>boot-example-demo-entity-2.0.5</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- activeMQ依赖组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- spring.activemq.pool.enabled=true -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.16.5</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>swagger-bootstrap-ui</artifactId>
<version>1.9.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包成一个可执行jar -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
application.properties
server.port=8042
spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.in-memory=false
spring.activemq.packages.trust-all=true
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=6
spring.activemq.pool.idle-timeout=30000
spring.activemq.pool.expire-timeout=0
spring.jms.pub-sub-domain=false
启动类AppQueueCustomer
package boot.example.queue.customer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;
/**
* 蚂蚁舞
*/
@SpringBootApplication
/**
* 启动消息队列
*/
@EnableJms
public class AppQueueCustomer {
public static void main( String[] args ) {
SpringApplication.run(AppQueueCustomer.class, args);
System.out.println( "Hello World!" );
}
}
配置一个queue ActiveMQQueueConfig
package boot.example.queue.customer.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
/**
* 蚂蚁舞
*/
@Configuration
public class ActiveMQQueueConfig {
// queue模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(activeMQConnectionFactory);
// 启用queue 不启用topic
bean.setPubSubDomain(false);
return bean;
}
}
消息消费端的queue
ActiveMQConstant
package boot.example.queue.customer.config;
/**
* 蚂蚁舞
*/
import boot.example.queue.entity.BootProvider;
import java.util.LinkedList;
import java.util.List;
public class ActiveMQConstant {
// 默认queue
public static final String defaultQueue = "myw_queue";
// 指定queue
public static final String stringQueue = "stringQueue";
// 指定list<String>
public static final String stringListQueue = "stringListQueue";
// 指定Object
public static final String objQueue = "objQueue";
// 指定List<Object>
public static final String objListQueue = "objListQueue";
// 简单存储默认queue消费端收到的消息
public static List<String> defaultList = new LinkedList<>();
public static List<String> stringQueueList = new LinkedList<>();
public static List<List<String>> stringListQueueList = new LinkedList<>();
public static List<BootProvider> objQueueList = new LinkedList<>();
public static List<List<BootProvider>> objListQueueList = new LinkedList<>();
}
DefaultQueueConsumerService
package boot.example.queue.customer.service;
import boot.example.queue.customer.config.ActiveMQConstant;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import javax.jms.JMSException;
import javax.jms.TextMessage;
/**
* 蚂蚁舞
*/
@Service
public class DefaultQueueConsumerService {
@JmsListener(destination = ActiveMQConstant.defaultQueue)
public void message(TextMessage textMessage) throws JMSException {
if(textMessage == null || textMessage.getText() == null){
return;
}
System.out.println("默认消费者:"+textMessage.getText());
ActiveMQConstant.defaultList.add(textMessage.getText());
}
}
AQueueConsumerService
package boot.example.queue.customer.service;
import boot.example.queue.customer.config.ActiveMQConstant;
import boot.example.queue.entity.BootProvider;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import javax.jms.ObjectMessage;
import java.util.List;
/**
* 蚂蚁舞
*/
@Service
public class AQueueConsumerService {
@JmsListener(destination = ActiveMQConstant.stringQueue, containerFactory = "jmsListenerContainerQueue")
public void receiveStringQueue(String msg) {
if(msg == null){return;}
System.out.println("A接收到消息...." + msg);
ActiveMQConstant.stringQueueList.add(msg);
}
@JmsListener(destination = ActiveMQConstant.stringListQueue, containerFactory = "jmsListenerContainerQueue")
public void receiveStringListQueue(List<String> list) {
if(list.isEmpty()){return;}
System.out.println("A接收到集合队列消息...." + list);
ActiveMQConstant.stringListQueueList.add(list);
}
@JmsListener(destination = ActiveMQConstant.objQueue, containerFactory = "jmsListenerContainerQueue")
public void receiveObjQueue(ObjectMessage objectMessage) throws Exception {
if(objectMessage == null || objectMessage.getObject() == null){return;}
System.out.println("A接收到对象队列消息...." + objectMessage.getObject());
BootProvider bootProvider = (BootProvider) objectMessage.getObject();
ActiveMQConstant.objQueueList.add(bootProvider);
}
@SuppressWarnings("unchecked")
@JmsListener(destination = ActiveMQConstant.objListQueue, containerFactory = "jmsListenerContainerQueue")
public void receiveObjListQueue(ObjectMessage objectMessage) throws Exception {
if(objectMessage == null || objectMessage.getObject() == null){return;}
System.out.println("A接收到的对象队列消息..." + objectMessage.getObject());
List<BootProvider> list = (List<BootProvider>) objectMessage.getObject();
ActiveMQConstant.objListQueueList.add(list);
}
}
BQueueConsumerService
package boot.example.queue.customer.service;
import boot.example.queue.customer.config.ActiveMQConstant;
import boot.example.queue.entity.BootProvider;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import java.util.List;
/**
* 蚂蚁舞
*/
@Service
public class BQueueConsumerService {
@JmsListener(destination = ActiveMQConstant.stringQueue, containerFactory = "jmsListenerContainerQueue")
public void receiveStringQueue(TextMessage textMessage) throws JMSException {
if(textMessage == null || textMessage.getText() == null){return;}
System.out.println("B接收到消息...." + textMessage.getText());
ActiveMQConstant.stringQueueList.add(textMessage.getText());
}
@JmsListener(destination = ActiveMQConstant.stringListQueue, containerFactory = "jmsListenerContainerQueue")
public void receiveStringListQueue(List<String> list) {
if(list.isEmpty()){return;}
System.out.println("B接收到集合队列消息...." + list);
ActiveMQConstant.stringListQueueList.add(list);
}
@JmsListener(destination = ActiveMQConstant.objQueue, containerFactory = "jmsListenerContainerQueue")
public void receiveObjQueue(ObjectMessage objectMessage) throws Exception {
if(objectMessage == null || objectMessage.getObject() == null){return;}
System.out.println("B接收到对象队列消息...." + objectMessage.getObject());
BootProvider bootProvider = (BootProvider) objectMessage.getObject();
ActiveMQConstant.objQueueList.add(bootProvider);
}
@SuppressWarnings("unchecked")
@JmsListener(destination = ActiveMQConstant.objListQueue, containerFactory = "jmsListenerContainerQueue")
public void receiveObjListQueue(ObjectMessage objectMessage) throws Exception {
if(objectMessage == null || objectMessage.getObject() == null){return;}
System.out.println("B接收到的对象队列消息..." + objectMessage.getObject());
List<BootProvider> list = (List<BootProvider>) objectMessage.getObject();
ActiveMQConstant.objListQueueList.add(list);
}
}
SwaggerConfig UI使用
package boot.example.queue.customer.config;
import com.google.common.base.Predicates;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
/**
* 蚂蚁舞
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket createRestApi(){
return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()
.apis(RequestHandlerSelectors.any()).paths(PathSelectors.any())
.paths(Predicates.not(PathSelectors.regex("/error.*")))
.paths(PathSelectors.regex("/.*"))
.build().apiInfo(apiInfo());
}
private ApiInfo apiInfo(){
return new ApiInfoBuilder()
.title("demo")
.description("demo接口")
.version("0.01")
.build();
}
/**
* http://localhost:XXXX/doc.html 地址和端口根据实际项目查看
*/
}
BootDefaultQueueCustomerController
package boot.example.queue.customer.controller;
import boot.example.queue.customer.config.ActiveMQConstant;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* 蚂蚁舞
*/
@RestController
@RequestMapping(value="/customer")
public class BootDefaultQueueCustomerController {
@GetMapping(value="/myw_queue")
public List<String> myw_queue() {
return ActiveMQConstant.defaultList;
}
}
BootQueueCustomerController
package boot.example.queue.customer.controller;
import boot.example.queue.customer.config.ActiveMQConstant;
import boot.example.queue.entity.BootProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* 蚂蚁舞
*/
@RestController
@RequestMapping(value="/customer")
public class BootQueueCustomerController {
@GetMapping(value="/stringQueue")
public List<String> stringQueue() {
return ActiveMQConstant.stringQueueList;
}
@GetMapping(value="/stringListQueue")
public List<List<String>> stringListQueue() {
return ActiveMQConstant.stringListQueueList;
}
@GetMapping(value="/objQueueList")
public List<BootProvider> objQueueList() {
return ActiveMQConstant.objQueueList;
}
@GetMapping(value="/objListQueueList")
public List<List<BootProvider>> objListQueueList() {
return ActiveMQConstant.objListQueueList;
}
}
测试使用的对象BootProvider
package boot.example.queue.entity;
import java.io.Serializable;
import java.util.Date;
/**
* 用在activeMq消息,必须保证package一致,不然序列化后反序列化要出错
* 蚂蚁舞
*/
public class BootProvider implements Serializable {
private int id;
private String name;
private Date date = new Date();
public BootProvider() {
}
public BootProvider(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Date getDate() {
return date;
}
public void setDate(Date date) {
this.date = date;
}
@Override
public String toString() {
return "BootProvider{" +
"id=" + id +
", name='" + name + '\'' +
", date=" + date +
'}';
}
}
结构目录
├─boot-example-demo-entity-2.0.5
│ │ pom.xml
│ │
│ ├─src
│ │ └─main
│ │ └─java
│ │ └─boot
│ │ └─example
│ │ └─queue
│ │ └─entity
│ │ BootProvider.java
├─boot-example-queue-demo-customer-2.0.5
│ │ pom.xml
│ │
│ ├─src
│ │ ├─main
│ │ │ ├─java
│ │ │ │ └─boot
│ │ │ │ └─example
│ │ │ │ └─queue
│ │ │ │ └─customer
│ │ │ │ │ AppQueueCustomer.java
│ │ │ │ │
│ │ │ │ ├─config
│ │ │ │ │ ActiveMQConstant.java
│ │ │ │ │ ActiveMQQueueConfig.java
│ │ │ │ │ SwaggerConfig.java
│ │ │ │ │
│ │ │ │ ├─controller
│ │ │ │ │ BootDefaultQueueCustomerController.java
│ │ │ │ │ BootQueueCustomerController.java
│ │ │ │ │
│ │ │ │ └─service
│ │ │ │ AQueueConsumerService.java
│ │ │ │ BQueueConsumerService.java
│ │ │ │ DefaultQueueConsumerService.java
│ │ │ │
│ │ │ └─resources
│ │ │ application.properties
│ │ │
│ │ └─test
│ │ └─java
│ │ └─boot
│ │ └─example
│ │ └─queue
│ │ └─customer
│ │ AppQueueCustomerTest.java
│ │
启动后访问(ActiveMQ必须提前启动)
http://localhost:8042/doc.html
现在进行测试,在生产端程序发送数据,可以看到消费端收到了数据并临时保存在列表里
还可以使用List<String> 或者 List<Object> 或者直接java对象
测试下对象
消费端接收的数据
另外可以在控制台看到
A和B两个接收监听都收到了数据,看着像是轮询消费的,ActiveMQ支持存在多个消费者,但是对一个消费者而言,只会有一个消费者可以消费,其他的消费者便不能消费此消息
还有ActiveMQ默认开启持久化的,数据在安装目录的data目录下面
源码案例地址: SpringBoot+ActiveMQ源码案例