Activemq+spring整合

2023-05-16

activemq与spring的整合需要用到线程池。考虑到连接、会话等资源的建立和释放,无须人工操作,全部交给容器来处理。这里通过一个实例讲解activemq与spring如何整合。项目大致是这样的设计:通过jetty构建一个http请求,接收http://localhost:8080/send?msg=xxx的请求,然后将msg作为消息传递给生产者线程,将消息发送到指定的Queue,这里发送消息基本完成,为了验证接收消息,这里配置一个消费者,用来监听实时消息,而不是通过循环来读取消息,因此需要注册一个消息监听器,来监听生产者发送的消息,然后将接收的消息打印出来,完成activemq与spring整合之后发送接收的测试。

1、构建maven项目,并引入依赖库。项目最终项目结构如下:

pom.xml文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.xxx.activemq</groupId>
  <artifactId>sendmessage</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>sendmessage</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <spring-version>4.3.4.RELEASE</spring-version>
  </properties>

  <dependencies>
       <dependency>
              <groupId>org.eclipse.jetty</groupId>
              <artifactId>test-jetty-servlet</artifactId>
              <version>8.1.0.RC5</version>
        </dependency>
       <dependency>
   		      <groupId>log4j</groupId>
   		      <artifactId>log4j</artifactId>
   		      <version>1.2.17</version>
   		</dependency>
   		<dependency>
   		      <groupId>org.slf4j</groupId>
   		      <artifactId>slf4j-log4j12</artifactId>
   		      <version>1.7.21</version>
   		</dependency>
       <dependency>
   		      <groupId>commons-lang</groupId>
   		      <artifactId>commons-lang</artifactId>
   		      <version>2.6</version>
    	</dependency>
    	<dependency>
	        <groupId>org.apache.commons</groupId>
	        <artifactId>commons-pool2</artifactId>
	        <version>2.4.2</version>
	    </dependency>
       <dependency>
              <groupId>org.apache.activemq</groupId>
              <artifactId>activemq-all</artifactId>
              <version>5.15.0</version>
       </dependency>
       <dependency>
              <groupId>org.springframework</groupId>
              <artifactId>spring-beans</artifactId>
              <version>${spring-version}</version>
       </dependency>
       <dependency>
              <groupId>org.springframework</groupId>
              <artifactId>spring-context</artifactId>
              <version>${spring-version}</version>
       </dependency>
       <dependency>
              <groupId>org.springframework</groupId>
              <artifactId>spring-jms</artifactId>
              <version>${spring-version}</version>
       </dependency>
  </dependencies>
</project>

2、主要用到的类介绍

ISendService.java

package com.xxx.activemq.service;

public interface ISendService {
	public void send(String msg);
}

SendService.java,将/send?msg=xxx发送过来的msg参数作为消息发送给activemq的一个Queue,这里是brokerQueue,后面的applicationContext.xml配置文件中会配置该Queue。

package com.xxx.activemq.service;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.log4j.Logger;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public class SendService implements ISendService{
	
	private static final Logger LOG = Logger.getLogger(SendService.class);
	private String queueName;
	private JmsTemplate jmsTemplate;
	private ThreadPoolTaskExecutor threadExecutor;

	@Override
	public void send(String msg) {
		threadExecutor.execute(new Runnable() {			
			@Override
			public void run() {
				jmsTemplate.send(queueName,new MessageCreator() {					
					@Override
					public Message createMessage(Session session) throws JMSException {
						return session.createTextMessage(msg);
					}
				});
				LOG.info(String.format("send msg %s to "+queueName+" ok.", msg));
			}
		});
	}

	public String getQueueName() {
		return queueName;
	}

	public void setQueueName(String queueName) {
		this.queueName = queueName;
	}

	public JmsTemplate getJmsTemplate() {
		return jmsTemplate;
	}

	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	public ThreadPoolTaskExecutor getThreadExecutor() {
		return threadExecutor;
	}

	public void setThreadExecutor(ThreadPoolTaskExecutor threadExecutor) {
		this.threadExecutor = threadExecutor;
	}
	
}
SendServlet.java,负责处理/send?msg=xxx请求的Servlet。
package com.xxx.activemq.servlet;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.xxx.activemq.service.SendService;
import com.xxx.activemq.utils.WebContext;
public class SendServlet extends HttpServlet{

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	private static SendService sendService;
	static{
		sendService = (SendService)WebContext.getBean("sendService");
	}

	@Override
	protected void doGet(HttpServletRequest req, HttpServletResponse res)
			throws ServletException, IOException {
		doPost(req, res);
	}

	@Override
	protected void doPost(HttpServletRequest req, HttpServletResponse res)
			throws ServletException, IOException {
		String message = req.getParameter("msg");
		sendService.send(message);
		res.setCharacterEncoding("UTF-8");
		res.setContentType("application/json");
		res.setStatus(HttpServletResponse.SC_OK);
		res.getWriter().println("{\"message\":\"ok\"}");
	}
}
App.java,程序入口,这里会利用jetty,构建一个http的服务,监听8080端口,然后添加handler,接收来自http://localhost:8080/send?msg=xxx的请求。

package com.xxx.activemq.webapp;
import org.apache.log4j.Logger;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.xxx.activemq.servlet.SendServlet;
import com.xxx.activemq.utils.WebContext;

public class App {

	private static final int PORT = 8080;
	private static final Logger LOG = Logger.getLogger(App.class);
	public static void main(String[] args) {
		ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
		WebContext.setContext(context);
		Server server = new Server(PORT);
		ServletContextHandler handler = new ServletContextHandler(ServletContextHandler.SESSIONS);
		handler.setContextPath("/");
		handler.addServlet(new ServletHolder(new SendServlet()), "/send");
		server.setHandler(handler);
		try {
			LOG.info("webapp initialize success!");
			server.start();
			server.join();			
		} catch (Exception e) {
			LOG.error("webapp initialize error!");
		}
	}

}
WebContext.java,工具类,利用spring容器中定义的bean来创建bean实例。

package com.xxx.activemq.utils;

import org.apache.log4j.Logger;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public class WebContext implements ApplicationContextAware {
	
	private static final Logger LOG = Logger.getLogger(WebContext.class);
	private static ApplicationContext context;

	public void setApplicationContext(ApplicationContext context)
			throws BeansException {
		LOG.info("applicationContext init...");
		setContext(context);
	}

	public static void setContext(ApplicationContext context) {
		WebContext.context = context;		
	}
	
	public static Object getBean(String name){
		if(context==null){
			LOG.error("context is null");
			return null;
		}
		return context.getBean(name);
	}
}
ConsumerMessageListener.java,这个类用来接收消息,接收SendService.java中发送的消息,这个实时接收消息。

package com.xxx.activemq.listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
public class ConsumerMessageListener implements MessageListener {
	private static Logger LOG = Logger.getLogger(ConsumerMessageListener.class);
	@Override
	public void onMessage(Message msg) {
		if(msg instanceof TextMessage){
			TextMessage o = (TextMessage)msg;
			try {
				LOG.info("received message: " + o.getText());
			} catch (JMSException e) {
				LOG.error("received message for error!");
			}
		}
	}
}

配置文件:

applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
      
      <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
           <property name="locations">
               <list>
                     <value>classpath:config.properties</value>
               </list>
           </property>
      </bean> 
        
      
      <bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
             <property name="connectionFactory">
                   <bean class="org.apache.activemq.ActiveMQConnectionFactory">
		            <property name="brokerURL" value="${activemq.brokerURL}"></property>
		            <property name="useAsyncSend" value="true"/>
		            <property name="redeliveryPolicy">
		                 <bean class="org.apache.activemq.RedeliveryPolicy">
		                       <property name="initialRedeliveryDelay" value="2000"/>
		                       <property name="useExponentialBackOff" value="true"/>
		                 </bean>
		            </property>
                 </bean> 
             </property>
      </bean>
       
      <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
			 <property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
	  </bean> 
      <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
             <property name="connectionFactory" ref="connectionFactory"/>
      </bean>
      <bean id="threadExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="corePoolSize" value="${sendmsgspool.poolsize}" />
		    <property name="keepAliveSeconds" value="${sendmsgspool.keepaliveseconds}" />
		    <property name="maxPoolSize" value="${sendmsgspool.maxpoolsize}" />
		    <property name="rejectedExecutionHandler">  
			<bean class="java.util.concurrent.ThreadPoolExecutor$DiscardOldestPolicy" />  
			</property>
      </bean>
      <bean id="sendService" class="com.xxx.activemq.service.SendService">
           <property name="queueName" value="brokerQueue" />
           <property name="jmsTemplate" ref="jmsTemplate"/>
           <property name="threadExecutor" ref="threadExecutor"/>
      </bean> 
      <!-- 发送消息以上这些配置足够了 -->
      <!-- 下面配置收消息的消费者 -->     
      <bean id="consumerMessageListener" class="com.xxx.activemq.listener.ConsumerMessageListener">     
      </bean>
      <bean id="brokerQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="${activemq.brokerQueue}"/>
      </bean>
      <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="messageListener" ref="consumerMessageListener"/>
            <property name="destination" ref="brokerQueue"/>
      </bean>
</beans>
config.properties ,指定brokerURL及队列名发送消息线程池信息。

activemq.brokerURL=tcp://localhost:61616
activemq.brokerQueue=brokerQueue
sendmsgspool.keepaliveseconds=300
sendmsgspool.maxpoolsize=50
sendmsgspool.poolsize=15
log4j.xml(略)

3、启动App.java,监听8080端口,接收http://localhost:8080/send?msg=xxx的请求,然后将参数msg作为消息利用生产者发送给消息接收者。

项目启动成功日志:

第一次请求:

控制台日志:

第二次请求:

控制台日志:

管理界面查看队列消息:

4、这里重要的就是要理清jmsTemplate,jmsContainer,connectionFactory,jmsConnectionFactory之间的关系。

activemq与spring的整合和java直接编写activemq实例最大的区别在于,connection,session等资源的创建和释放无需人为操作,看不到很直观的API调用。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Activemq+spring整合 的相关文章

随机推荐

  • WebSocket 测试工具

    一 WebSocket 简介 WebSocket是一种在单个TCP连接上进行全双工通信的协议 WebSocket使得客户端和服务器之间的数据交换变得更加简单 xff0c 允许服务端主动向客户端推送数据 在WebSocket API中 xff
  • 利用pipework为docker容器设置固定IP

    今天介绍如何在redhat centos7系列机器上使用pipework为docker启动的容器指定一个固定ip 我们知道默认情况下 xff0c docker会使用bridge网络模式为每一个启动的容器动态分配一个IP xff0c 以172
  • 用docker玩坏ubuntu虚拟机容器

    当我们装上docker之后 xff0c 自然会pull一个或多个镜像玩玩 xff0c 这时候 xff0c docker hub仓库上有很多系列操作系统镜像 xff0c 每个系列又有很多不同功能的虚拟机镜像 xff0c 比如centos分6还
  • tornado入门实例

    tornado是python web开发的又一个轻量级框架 tornado框架需要安装 xff0c 为了方便 xff0c 我直接安装了Anaconda 2 4 1 里面直接就带了tornado 还有很多python库 numpy scipy
  • web.py框架入门

    web py是python web开发的一个轻量级框架 web py可以通过pip命令安装 xff0c pip install web py 编写官网示例代码 xff1a vi index py import web urls 61 34
  • graphviz快速上手

    graphviz最初是AT amp T实验室用来画流程图的工具 xff0c 使用dot语言 其中根据图的类型可以分为有向图 dirgraph 和无向图 graph 我们知道图是由点 node 和边 edge 组成的 xff0c 在有向图中边
  • mysqld: File './mysql-bin.index' not found (Errcode: 13 - Permission denied)

    我们通过yum方式安装mysql 会生成mysql mysql用户组和用户 xff0c 启动mysql默认是使用mysql用户 如果我们开启了慢log日志 xff0c 而且我们使用service mysqld start启动mysql 会报
  • redhat7编译安装php-5.5.38

    1 从官网下载php源码包 php 5 5 38 2 安装依赖包 yum install libxml2 libxml2 devel bzip2 devel libcurl devel y yum install openssl opens
  • spark-1.6.0源码编译安装

    环境准备 spark是scala语言写的 xff0c scala运行需要jdk 如果通过maven编译 xff0c 还需要maven环境 xff0c 因此spark源码编译需要安装jdk scala apache maven这三样环境 这里
  • ZendStudio+php+Apache开发环境搭建

    学习php xff0c 我们就想有一个好的ide xff0c ZendStudio是专门为php开发提供的ide xff0c 写完代码立马能够在工作空间中调试 xff0c 可以通过Run As gt PHP CLI Application
  • 图文详解win7实现局域网共享文件

    工作中 xff0c 我们有时候会拥有两台机器 xff0c 避免机器之间文件传来传去 xff0c 可以使用局域网文件共享 xff0c 在一台机器上开启文件共享 xff0c 另一台机器通过IP访问 xff0c 即可轻松实现文件互访 今天介绍我们
  • 模拟画图题P1185 绘制二叉树

    可能更好的观看体验 题目链接P1185 绘制二叉树 题意概述 根据规则绘制一棵被删去部分节点的满二叉树 节点用 o o o 表示 xff0c 树枝用 表示 每一层树枝长度会变化 xff0c 以满足叶子结点有如下特定 xff1a 相邻叶子节点
  • win7+MySQL5.7.18zip版本安装

    mysql5 7 18zip版本在windows的安装 xff0c 就是解压 xff0c 初始化 xff0c 然后做一些密码修改的设置即可使用 xff0c 如果需要远程连接 xff0c 需要更改用户表的host值为 39 39 xff0c
  • redhat7源码编译hadoop2.6.0

    以前在32位linux机器上编译过hadoop2 6 0 这次在redhat7 64bit上再次编译hadoop2 6 0 xff0c 除必须的jdk maven protobuf需要安装之外 xff0c 还需要安装系统依赖库gcc gcc
  • elasticsearch启动错误

    最近想尝试一下elk搭建实时日志分析系统 xff0c 结果运行elasticsearch时 xff0c 就遇到了一些问题 这些问题基本都是系统参数相关的 现在整理出来 xff0c 以免后面再次遇到 xff0c 也供大家参考 xff0c 少走
  • kafka+flume+hdfs实时日志流系统初探

    本次实验 xff0c 主要为了测试将kafka的消息通过flume接收并存入hdfs xff0c 如果之前搭建过hadoop flume kafka的 xff0c 这里会很快就会完成 xff0c 思路比较清晰 xff0c 主要配置在flum
  • 让Eclipse中spring的xml配置文件出现属性和类提示

    在spring配置文件中可以让配置bean的时候出现提示 xff0c 这里需要做一些设置 设置包括安装springide插件 spring beans version xsd文件引入 xff0c 增加xml编辑提示的字符 xff0c 默认只
  • win7查看端口占用的进程

    之前遇到一个问题 xff0c 系统上mysql启动了 xff0c 无法通过navicat客户端来连接 xff0c 这就很郁闷了 xff0c 最后定位到问题 xff0c 是我机器上还开启了一个开发php的应用程序phpwamp 它自带了一个m
  • cxf+spring实现webservice

    1 构建maven项目 xff0c 工程结构如下 xff1a 这里需要特别指出就是cxf core 3 1 12 jar类路径META INF cxf下有一个cxf xml的配置文件 xff0c 这个在applicationContext
  • Activemq+spring整合

    activemq与spring的整合需要用到线程池 考虑到连接 会话等资源的建立和释放 xff0c 无须人工操作 xff0c 全部交给容器来处理 这里通过一个实例讲解activemq与spring如何整合 项目大致是这样的设计 xff1a