activemq 应用实践——queue

2023-05-16

 

首先创建发送端程序SenderTestBase和接收端程序ReceiveTestBase

发送端: SenderTestBase.java

package test;

 

import java.util.Date;

 

import javax.jms.Connection;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

 

import org.apache.activemq.ActiveMQConnectionFactory;

 

public class SenderTestBase {

private static int messageSize = 100 ; 

 

/**

* @param args

*/

public static void main(String[] args) throws Exception  {

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

Connection connection = factory.createConnection();

connection.start();

//使用事务  自动签收

Session session =  connection.createSession(true,Session.AUTO_ACKNOWLEDGE);

//创建queue  如果改队列在activemq服务器上存在 那么就获取到该queue的实例

Destination destination = session.createQueue("test-queue");

MessageProducer producer = session.createProducer(destination);

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

int i = 0 ;

TextMessage message = null ; 

while(i<100){

i++;

message = session.createTextMessage(createMessageText(i));

producer.send(message);

Thread.sleep(80);

}

System.out.println("发送完毕!");

try {

session.commit();

session.close();

connection.close();

} catch (JMSException e) {

e.printStackTrace();

}  

}

private static String createMessageText(int index) {

StringBuffer buffer = new StringBuffer(messageSize);

buffer.append("Message: " + index + " sent at: " + new Date());

if (buffer.length() > messageSize) {

return buffer.substring(0, messageSize);

}

for (int i = buffer.length(); i < messageSize; i++) {

buffer.append(' ');

}

return buffer.toString();

}

 

}

接收端程序: ReceiveTestBase.java

package test;

 

import javax.jms.Connection;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageConsumer;

import javax.jms.Session;

import javax.jms.TextMessage;

 

import org.apache.activemq.ActiveMQConnectionFactory;

 

public class ReceiveTestBase {

 

/**

* @param args

*/

public static void main(String[] args) throws Exception {

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

Connection connection = factory.createConnection();

connection.start();

//使用事务  自动签收

Session session =  connection.createSession(true,Session.AUTO_ACKNOWLEDGE);

Destination destination = session.createQueue("test-queue");

MessageConsumer consumer = session.createConsumer(destination);

 

int i = 0 ;

TextMessage message = null ; 

while(true){

if(i < 100){

System.out.println("i = " + i);

}else{

break;

}

i++;

//该方法为阻塞式的方法 一直等待

message = (TextMessage) consumer.receive();

//该方法告诉服务器指定队列已经被签收

session.commit();

System.out.println("收到消息:" + message.getText());

// Thread.sleep(200);

}

try {

session.close();

connection.close();

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

 

}

其中发送端发送格式为 "Message: " + index + " sent at: " + new Date() 的信息100次,

接收端接受该信息。

其次、我们按照下面的操作运行程序。

1、运行ReceiveTestBase ,结果为:

2010-9-24 11:30:11 org.apache.activemq.transport.failover.FailoverTransport doReconnect

信息: Successfully connected to tcp://localhost:61616

i = 0

ReceiveTestBase的console中的光标停在“i = 0”的下一行,程序没有退出。

说明 consumer.receive(); 方法为阻塞式。

我们在打开浏览器,在地址栏中输入 “http://localhost:8161/admin”,点击进入 “Queues” 链接

我们能看到队列 test-queue 中的Numbe Of Consumer 中的数值为 1 。

Messages Enqueued(入列) 和 Messages Dequeued(出列) 的值均为 0 。

2、接着我们不停止 ReceiveTestBase 程序的阻塞,而是运行 SenderTestBase 程序, SenderTestBase的console结果为:

2010-9-24 11:39:08 org.apache.activemq.transport.failover.FailoverTransport doReconnect

信息: Successfully connected to tcp://localhost:61616

发送完毕!

在去观察ReceiveTestBase的console中的结果,发现

i = 99

收到消息:Message: 100 sent at: Fri Sep 24 11:39:16 CST 2010    

这种类型的信息接受到了 100条 ,并且程序退出。

说明交互成功,且读取的顺序是和写入的顺序一致!

 

接着我们去服务器监控页面查看queue中test-queue队列的情况,

发现 Messages Enqueued(入列) 和 Messages Dequeued(出列) 的值均为 100 。

以上的操作时常规的发送消息和接受消息的过程,接下来我们做一下 

session.createQueue("test-queue"); 和 session.commit(); 的作用。

3、快速的运行 SenderTestBase 程序三次,每个console都没有抛出错误,说明session.createQueue("test-queue");语句

在test-queue在服务器上没有的时候是创建,在服务器上有的时候是获取该queue的信息创建实例。

我们在打开服务器的监控页面看看queue中的test-queue的情况如何。

Messages Enqueued(入列) 和 Messages Dequeued(出列) 的值分别为300和0 ,

且Number Of Pending Messages (等待信息数)也为300。说明发送端发送出数据之后,

如果找不到客户端接受,那么activemq将把该信息存在服务器中。这个时候如果我们启动3个客户端情况会怎样?

快速运行 ReceiveTestBase 程序三次,我们看看每个客户端的console的结果如何,

每个客户端均得到了100条数据,但是这100条数据是无序的,说民activemq服务器端在queue中有多个客户端的情况下作了负载均衡。

4、我们在看看 接收端的 session.commit(); 有什么作用!

我们首先把ReceiveTestBase.java中的session.commit();行注释掉。

然后运行发送端程序 SenderTestBase ,在运行接收端程序ReceiveTestBase。

你会发送,接收端可以正常的接收到数据,但是当你打开activemq的监控系统时,

你会发现Messages Dequeued列中的值没有变化,说明只有在客户端告知服务器端接收到信息的时候,

Messages Dequeued列才会变化。所以有时候Messages Dequeued也不能反映真实的状况。

如果你运行多次接收端程序,你会发现它每次都能接收到正确的数据。只有把session.commit();注释去掉,

才能只接受到一次。

小结:引用网上的一段话“

JMS Queue执行load balancer语义:

一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。

如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。

一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡。

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

activemq 应用实践——queue 的相关文章

  • 计算机硬件技术基础第一章总结

    1 1 计算机发展概述 1 1 1 计算机的发展简史 第一台计算机 xff1a ENIAC 第一代 xff1a 电子管数字计算机 xff08 1946 1958 xff09 逻辑元件 xff1a 真空电子管体积大 xff0c 功耗高 xff
  • CentOS7安装Oracle JDK1.8

    JDK1 8下载地址 https www oracle com java technologies javase javase8 archive downloads html 需要登录之后才能下载文件 xff0c 下载jdk 8u202 l
  • Ubuntu 16.04 安装 rtl8812au系列 (DWA-182) wireless adapter driver

    Ubuntu 16 04 安装 rtl8812au系列 DWA 182 wireless adapter driver 刚刚开始使用Linux xff0c 一脸懵逼 xff0c 命令行搞得一愣一愣的 xff0c 不过熟悉了之后就好很多了 一
  • SpringBoot项目启动失败报错Annotation-specified bean name ‘xx‘ for bean class [xxx] conflicts with existing

    问题描述 xff1a 项目启动就会报 xff1a Annotation specified bean name xx for bean class xxx conflicts with existing non compatible bea
  • Visual Studio高效实用的扩展工具、插件

    说明 xff1a 对一个有想法的程序员来说 xff0c 善于使用一款高效的开发工具是很重要的 xff0c 今天给大家介绍的是宇宙第一IDE vs用起来很不错的开发工具 xff0c 假如大家觉得不错也可以尝试的用用 xff0c 毕竟对于我们这
  • java琐事

    并发编程 并发的意义 并发通常是提高运行在单处理器上的程序的性能 如果程序中的某个任务因为该程序控制范围之外的某些条件 I O 而导致不能继续执行 xff0c 那么这个任务或线程就阻塞了 如果没有并发 xff0c 整个程序都讲停下来 从性能
  • java类的初始化和实例化的初始化(类的初始化过程)

    Java类的加载顺序 父类静态代变量 父类静态代码块 子类静态变量 子类静态代码块 父类非静态变量 xff08 父类实例成员变量 xff09 父类构造函数 子类非静态变量 xff08 子类实例成员变量 xff09 子类构造函数 上面的说法也
  • 最优吞吐量和最短停顿时间

    在实践活动中 xff0c 我们通过最优吞吐量和最短停顿时间来评价jvm系统的性能 吞吐量越高算法越好 暂停时间越短算法越好 首先让我们来明确垃圾收集 GC 中的两个术语 吞吐量 throughput 和暂停时间 pause times JV
  • sql执行慢的原因有哪些,如何进行sql优化?

    一 导致SQL执行慢的原因 1 硬件问题 如网络速度慢 xff0c 内存不足 xff0c I O吞吐量小 xff0c 磁盘空间满了等 2 没有索引或者索引失效 xff08 一般在互联网公司 xff0c DBA会在半夜把表锁了 xff0c 重
  • 阿里java开发手册2019年最新版619(华山版)PDF下载

    链接 https pan baidu com s 1ANvBu1hidnvRCZILDGXuQA 密码 ugq8
  • Mockito:org.mockito.exceptions.misusing.InvalidUseOfMatchersException

    org span class token punctuation span mockito span class token punctuation span exceptions span class token punctuation
  • 一个简单通用的基于java反射实现pojo转为fastjson对象的方法

    最近在公司工作需要实现一个工具实现一个pojo转为fastjson对象的通用工具 xff0c 直接上源码 span class token comment 通用的pojo转为Json对象的方法 64 author ZFX 64 date20
  • Java魔法类:Unsafe应用解析

    这个美团大神对于Unsafe的分析很全面 https tech meituan com 2019 02 14 talk about java magic class unsafe html
  • Linux X-Window Error: Can‘t open display: :0

    问题过程描述 许多经常部署Oracle数据的管理员经常需要对数据库软件进行部署 xff0c 但大多数都是通过远程部署的方式进行部署 xff0c 使用远程部署有两种方式 xff0c 一种是通过脚本部署 xff0c 另一种就是通过图形化进行部署
  • maven打包生成war跳过单元测试

    maven将项目打包成war包的命令是 mvn install 或mvn package 每次生成war包时会进行所以的单元测试 xff0c 如果想跳过单元测试直接生成war包有以下3种方式 方法1 xff1a 在pom xml中加入如下代
  • 程序员每天工作多少个小时_程序员每天实际工作几个小时?

    程序员每天工作多少个小时 您如何看待 xff0c 程序员每天实际工作多长时间 xff1f 大多数人会说答案是8到9个小时 有人说他们每天工作12个小时或更长时间 尽管这是正确的 xff0c 但它并不是大多数程序员实际工作的数量 xff0c
  • ubuntu 显示缺少库文件 libcom_err.so.2 解决办法

    运行任何代码都显示 xff1a error while loading shared libraries libcom err so 2 cannot open shared object file No such file or dire
  • 记CVTE第一次面试

    首先说明一下博主是一个大三的学生 xff0c 专业计算机科学与技术 xff0c 主学的方向是Web后台开发 xff0c 主语言是Java 前几天看到CVTE有校园招聘实习生 xff0c 就报名参加了 xff0c 做了CVTE的笔试题 xff
  • Java Socket 编程那些事(1)

    前言 最近在准备面试和笔试的一些东西 xff0c 回去翻看了Java关于IO的基础 xff0c 发现很多基础还是没有记牢固 xff0c 现在回头重新学习 xff0c 就从socket通讯开始吧 xff0c 虽然说现在企业很少直接编写sock
  • Redis集群的原理和搭建

    前言 Redis 是我们目前大规模使用的缓存中间件 xff0c 由于它强大高效而又便捷的功能 xff0c 得到了广泛的使用 单节点的Redis已经就达到了很高的性能 xff0c 为了提高可用性我们可以使用Redis集群 本文参考了Rdis的

随机推荐

  • Java多线程爬虫爬取京东商品信息

    前言 网络爬虫 xff0c 是一种按照一定的规则 xff0c 自动地抓取万维网信息的程序或者脚本 爬虫可以通过模拟浏览器访问网页 xff0c 从而获取数据 xff0c 一般网页里会有很多个URL 爬虫可以访问这些URL到达其他网页 xff0
  • 关于js中的“Uncaught SyntaxError: Unexpected token

    我在js中为一个已经定义的数组重新定义新的一个维度的数组时 xff0c 调试器这样报错 只说结果 xff1a 肯定是在给已经定义的数组中的元素重新定义下一维度时 xff0c 多在前面加了一个 var 就像下面的这样 xff1a var gr
  • 学成在线--day03 CMS页面管理开发

    学成在线 第3天 讲义 CMS页面管理开发 1 自定义条件 1 1 需求分析 在页面输入查询条件 xff0c 查询符合条件的页面信息 查询条件如下 xff1a 站点Id xff1a 精确匹配 模板Id xff1a 精确匹配 页面别名 xff
  • Ubuntu下安装Inode后双击InodeClient无反映解决方法

    由于比较喜好linux编程环境 xff0c 所以准本一直使用linux 学校无线有时有有时没很不爽 xff0c 所以准本安装Inode xff0c 但是安装完Inode后双击是一直没反映 最后求助万能的百度 xff0c 谁知道百度的搜索不得
  • 使用GitHub托管网站,自定义域名

    1 如何使用GitHub托管 官网链接 xff1a 点击跳转 官网首页就有详细的搭建步骤 xff0c 总共5步便可搭建成功 访问 github用户名 github io 便可看到自己的网站 2 自定义域名的方法 1 申请一个域名 xff0c
  • ElasticsearchRestTemplate 基本使用

    随着数据量的增加和数据结构的复杂化 xff0c 传统的关系型数据库已经不能满足用户的需求 xff0c 而搜索引擎则成为了一种更加高效 可扩展的数据检索方案 而 Elasticsearch 则是一个流行的搜索引擎 xff0c 在 Java 生
  • Navcat无法连接mysql报错1449

    把mysql从5升级成8后第二次连接mysql就报错1449 不清楚什么原因 xff0c 反正肯定是升级数据库之后mysql用户被动了 xff0c 看了很多博客都没有用 xff0c 什么在navcat里新建用户 xff0c 数据库都连不上怎
  • Kafka —— java实现一生产者多消费者实例

    架构图 xff1a xff08 网图 xff0c 很通俗易懂了 xff0c 就不自己画了 xff0c 这里实现的是一个Producer 两个Consumer xff09 前提 xff1a 已经开启zookeeper 和kafka xff0c
  • 程序员玩游戏之三--天天爱消除非暴力脚本

    评论 xff1a 此款游戏成功在其好友排名上 好友的分数超过了你无疑会增加你的斗志 中级策略 xff1a 七手八脚多人一起点 这相当于多个CPU处理一个大任务了 xff0c 哈哈 终极策略 xff1a 自动化 机器总是比人快的多 你两个人一
  • 程序员玩游戏之四--娱网棋牌大连打滚子记牌器

    话说大连人都爱打滚子 xff0c 所以本人就做了一个打滚子记牌器 基本原理同 程序员玩游戏之一 自动对对碰 xff0c 故此处不再赘述 xff0c 只留下一张截图吧 代码请见资源地址 xff1a http download csdn net
  • 为SIGSEGV设置handler有用吗?

    背景 最近几天看到先辈们30年前留下了一块代码 xff0c 为SIGSEGV设置了handler xff0c 所以心中有了两个疑问 xff1a 为SIGSEGV设置handler有没有用 xff1f 能否跳过引起崩溃的那一句指令 xff1f
  • GDB调试技巧实战--为优化版release版本的函数寻找参数值

    在上一篇 GDB调试技巧实战 为release版本的函数寻找参数值 中 xff0c 我们探讨了一种为函数找参数的办法 xff0c 但是 xff0c 那是最理想的情况 编译时没有使用 fomit frame pointer 编译时没有开启优化
  • 通过实例了解uprobe及其对性能的影响

    前言 uprobe是用户空间探针的意思 xff0c 可以用来给用户程序的任何地方下探针 xff0c 不仅仅是函数粒度层级的 所以异常灵活 如果不熟悉ftrace uprobe 可参考以下文档 xff1a https www kernel o
  • bpftrace各维度捕捉SIGKILL信号

    一 问题 Ftrace 几乎适配任何主流内核版本 xff09 和 bpftrace xff08 要求内核版本4 1以上 xff09 中都有两个现成的脚本execsnoop bt killsnoop bt 我经常用他们从外部 xff08 不去
  • 图形化VS201x工程中的项目依赖关系

    目录 1 背景 2 入手 2 1 分析sln文件 2 2 给出正则表达式 3 程序 4 demo 5 补充 另外一种情况 6 补充 完整代码 1 背景 初次接手一个大工程时 往往因为复杂的项目依赖而遇到各种编译问题 同时如果能图形化其中的依
  • 深入应用python关键字yield--实现任务调度

    在此假设读者了解yield关键字的基本用法 如不了解请参照python文档或google之 大家知道遇到yield关键字时python会把当前的环境 xff0c 比如局部变量 全局变量等 xff0c 给记录下来以便以后能正确的继续向下运行
  • 刘慈欣(三体作者)写给200年以后的女儿的一封信

    原文地址 xff1a http blog sina com cn s blog 540d5e800101lcsb html 亲爱的女儿 xff0c 你好 xff01 这是一封你可能永远收不到的信 xff0c 我将把这封信保存到银行的保险箱中
  • 程序员玩游戏之二--篡改植物大战僵尸2的阳光值

    植物大战僵尸1几年前曾经风靡一时 xff0c 妇孺皆知 xff01 其续作奇幻时空之旅千呼万唤始出来 xff0c 不过从首发到目前都1月有余 xff0c 本人竟然还没玩过 于是昨晚下载了一个汉化版 本人系统为IOS5 0 1 xff0c a
  • 设计模式:生产者消费者模式

    在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题 该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度 为什么要使用生产者和消费者模式 xff1f 在线程世界里 xff0c 生产者就是生产数据的线程 xff0c
  • activemq 应用实践——queue

    首先创建发送端程序SenderTestBase和接收端程序ReceiveTestBase 发送端 xff1a SenderTestBase java package test import java util Date import jav