linux C++ 环境下的ActiveMQ学习

2023-05-16

ActiveMQ

1.概述

ActiveMQ 是Apache出品,最流行的、功能强大的即时通讯和集成模式的开源服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能。

2.特点

2.⒈ 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

●完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

●对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

●通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

●支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

●支持通过JDBC和journal提供高速的消息持久化

● 从设计上保证了高性能的集群,客户端-服务器,点对点

●支持Ajax

●支持与Axis的整合

●可以很容易的调用内嵌JMS provider,进行测试

3.安装测试

ActiveMQ服务安装

开发环境

windows64 PC

下载最新版本5.15.0release, 解压apache-activemq-5.15.0-bin.zip(或者apache-activemq-5.15.0-bin.tar.gz)目录如下:

+bin (windows下面的bat和unix/linux下面的sh)

+conf (activeMQ配置目录,包含最基本的activeMQ配置文件)

+data (默认是空的)

+docs (index,replease版本里面没有文档,-.-b不知道为啥不带)

+example (几个例子

+lib (activemMQ使用到的lib)

-apache-activemq-4.1-incubator.jar (ActiveMQ的binary)

-LICENSE.txt

-NOTICE.txt

-README.txt

-user-guide.html

你可以使用bin\win64\activemq.bat(activemq) 启动,如果一切顺利的话,你就会看见类似下面的信息:

但若出现如下类似信息

 

则查看或修改配置文件E:\apache-activemq-5.15.0\apache-activemq-5.15.0\conf\jetty中的host\port是否有误

服务已经启动。。。。。。。。。。。。。。。。。。。

我们可以web访问一下下 --------->192.168.8.107:8161

接着我们在一块开发板上安装c++实现的ActiveMQ客户端库ActiveMQ_cpp,可搜索下载

开发环境

RK3288(ubuntu)

安装步骤:

依赖安装

下载apr-1.6.3.tar    apr-iconv-1.2.2.tar  apr-util-1.6.1.tar  cppunit-1.12.1.tar 

分别解压默认安装进入目录执行./configure 执行 make && make install

下载activemq-cpp-library-3.9.4-src.tar

解压执行./configure  --with-apr=/usr/local/apr/ --with-cppunit=/usr/local/cppunit/ 

执行make && make install

一切准备就绪,则我们实现一个demo直接贴代码


#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <activemq/library/ActiveMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>

using namespace activemq;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;


class SimpleProducer : public Runnable {
private:
    CountDownLatch latch;
    Connection* connection;
    Session* session;
    Destination* destination;
    MessageProducer* producer;
    bool useTopic;
    bool clientAck;
    unsigned int numMessages;
    std::string brokerURI;
    std::string destURI;

private:

    SimpleProducer( const SimpleProducer& );
    SimpleProducer& operator= ( const SimpleProducer& );

public:

    SimpleProducer( const std::string& brokerURI, unsigned int numMessages,
                    const std::string& destURI, bool useTopic = false, bool clientAck = false ) :
        latch(1),
        connection(NULL),
        session(NULL),
        destination(NULL),
        producer(NULL),
        useTopic(useTopic),
        clientAck(clientAck),
        numMessages(numMessages),
        brokerURI(brokerURI),
        destURI(destURI) {
    }

    virtual ~SimpleProducer(){
        cleanup();
    }

    void close() {
        this->cleanup();
    }

    void waitUntilReady() {
        latch.await();
    }
    virtual void run() {
        try {
            auto_ptr<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory( brokerURI ) );

		   try{
                connection = connectionFactory->createConnection();
                connection->start();
            } catch( CMSException& e ) {
                e.printStackTrace();
                printf("%s..........\n",e.getStackTraceString());
                throw e;
            }

            if( clientAck )
			{
                session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
            } else
			{
                session = connection->createSession(Session::AUTO_ACKNOWLEDGE );
            }

            if( useTopic )
			{
                destination = session->createTopic( destURI );
            } else
			{
                destination = session->createQueue( destURI );
            }

            producer = session->createProducer( destination );
            producer->setDeliveryMode( DeliveryMode::PERSISTENT );

            string threadIdStr = Long::toString( Thread::currentThread()->getId() );
            string text = (string)"Hello world! from thread " + threadIdStr;
            while(1)
            {
                unsigned int ix=0;
            for(ix = 0 ; ix<numMessages; ++ix )
			{
                TextMessage* message = session->createTextMessage( text );
                message->setIntProperty( "Integer", ix );
                printf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() );
                producer->send( message );
                delete message;
            }

        }
        }catch ( CMSException& e ) {
            printf("%s..........\n",e.getStackTraceString());
            e.printStackTrace();
        }
    }

private:

    void cleanup(){
        try{
            if( destination != NULL ) delete destination;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        destination = NULL;

        try
		{
            if( producer != NULL ) delete producer;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        producer = NULL;

        try{
            if( session != NULL ) session->close();
            if( connection != NULL ) connection->close();
        }catch ( CMSException& e ) { e.printStackTrace(); }

        try{
            if( session != NULL ) delete session;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        session = NULL;

        try{
            if( connection != NULL ) delete connection;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        connection = NULL;
    }
};
#if 1
int main(int argc , char* argv[])
{
    std::cout<<"init ActiveMQCPP library"<<std::endl;
    activemq::library::ActiveMQCPP::initializeLibrary();
    std::cout << "=====================================================\n";
    std::cout << "Starting produce message:" << std::endl;
    std::cout << "-----------------------------------------------------\n";

    std::string  brokerURI =("failover:(tcp://192.168.8.107:61616)");
    unsigned int numMessages = 10;
    std::string destURI = "ckjiaoc@.isstech.com";

    bool useTopics = true;
    bool clientAck = false;
    SimpleProducer producer( brokerURI, numMessages, destURI, useTopics ,clientAck);
    producer.run();
    //producer.close();

    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished test" << std::endl;
    std::cout << "=====================================================\n";
    producer.waitUntilReady();

    std::cout << "delete ActiveMQCPP library=====================================================\n";
    activemq::library::ActiveMQCPP::shutdownLibrary();
}
#endif
./Serverservice 
init ActiveMQCPP library
=====================================================
Starting produce message:
-----------------------------------------------------
Sent message #1 from thread -1225509283
Sent message #2 from thread -1225509283
Sent message #3 from thread -1225509283
Sent message #4 from thread -1225509283
Sent message #5 from thread -1225509283
Sent message #6 from thread -1225509283
Sent message #7 from thread -1225509283
Sent message #8 from thread -1225509283
Sent message #9 from thread -1225509283
Sent message #10 from thread -1225509283
Sent message #1 from thread -1225509283
Sent message #2 from thread -1225509283
Sent message #3 from thread -1225509283
Sent message #4 from thread -1225509283
Sent message #5 from thread -1225509283
Sent message #6 from thread -1225509283

这时你会发现ActiveMQ的服务端的主题里就有你刚才发送的信息。。。。

主题名字可自定义,489值所对应的内容表示入队消息MessagesEnqueued 489条,0 MessageDequeued出队消息为0条,表示,此主题没有被订阅,下面,我们再实现一个可消费的ActiveMQ demo

代码如下

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>

using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;


class SimpleAsyncConsumer : public ExceptionListener,
                            public MessageListener,
                            public DefaultTransportListener {
private:

    Connection* connection;
    Session* session;
    Destination* destination;
    MessageConsumer* consumer;
    bool useTopic ;
    std::string brokerURI;
    std::string destURI;
    bool clientAck;

private:

    SimpleAsyncConsumer( const SimpleAsyncConsumer& );
    SimpleAsyncConsumer& operator= ( const SimpleAsyncConsumer& );

public:

    SimpleAsyncConsumer( const std::string& brokerURI,
                         const std::string& destURI,
                         bool useTopic = false,
                         bool clientAck = false ) :
        connection(NULL),
        session(NULL),
        destination(NULL),
        consumer(NULL),
        useTopic(useTopic),
        brokerURI(brokerURI),
        destURI(destURI),
        clientAck(clientAck) {
    }

    virtual ~SimpleAsyncConsumer() {
        this->cleanup();
    }

    void close() {
        this->cleanup();
    }

    void runConsumer() {

        try {
            ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( brokerURI );
            connection = connectionFactory->createConnection();
            delete connectionFactory;

            ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( connection );
            if( amqConnection != NULL )
			{
                amqConnection->addTransportListener( this );
            }

            connection->start();
            connection->setExceptionListener(this);

            if( clientAck ) {
                session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
            } else {
                session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
            }

            if( useTopic ) {
                destination = session->createTopic( destURI );
            } else {
                destination = session->createQueue( destURI );
            }

            consumer = session->createConsumer( destination );
            consumer->setMessageListener( this );

        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

    virtual void onMessage( const Message* message ) {
        static int count = 0;
        try
        {
            count++;
            const TextMessage* textMessage =
                dynamic_cast< const TextMessage* >( message );
            string text = "";

            if( textMessage != NULL ) {
                text = textMessage->getText();
            } else {
                text = "NOT A TEXTMESSAGE!";
            }

            if( clientAck ) {
                message->acknowledge();
            }

            printf( "Message #%d Received: %s\n", count, text.c_str() );
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

    virtual void onException( const CMSException& ex AMQCPP_UNUSED ) {
        printf("CMS Exception occurred.  Shutting down client.\n");
        exit(1);
    }

    virtual void transportInterrupted() {
        std::cout << "The Connection's Transport has been Interrupted." << std::endl;
    }

    virtual void transportResumed() {
        std::cout << "The Connection's Transport has been Restored." << std::endl;
    }

private:

    void cleanup(){
        try{
            if( destination != NULL ) delete destination;
        }catch (CMSException& e) {}
        destination = NULL;

        try{
            if( consumer != NULL ) delete consumer;
        }catch (CMSException& e) {}
        consumer = NULL;

        try{
            if( session != NULL ) session->close();
            if( connection != NULL ) connection->close();
        }catch (CMSException& e) {}

        try{
            if( session != NULL ) delete session;
        }catch (CMSException& e) {}
        session = NULL;

        try{
            if( connection != NULL ) delete connection;
        }catch (CMSException& e) {}
        connection = NULL;
    }
};
#if 1
int main(int argc, char* argv[]) {
    std::cout<<"init ActiveMQCPP library"<<std::endl;
    activemq::library::ActiveMQCPP::initializeLibrary();

    std::cout << "=====================================================\n";
    std::cout << "Starting recieve message:" << std::endl;
    std::cout << "-----------------------------------------------------\n";

    std::string  brokerURI =("failover:(tcp://192.168.8.107:61616)");

    std::string destURI = "ckjiaoc@.isstech.com";
    bool useTopics = true;
    bool clientAck = false;
    SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );
    consumer.runConsumer();
    std::cout << "Press 'q' to quit" << std::endl;
    while( std::cin.get() != 'q') {}
    consumer.close();

    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished with the example." << std::endl;
    std::cout << "=====================================================\n";

    activemq::library::ActiveMQCPP::shutdownLibrary();
}
#endif // 0

 

./Consumer 
init ActiveMQCPP library
=====================================================
Starting recieve message:
-----------------------------------------------------
The Connection's Transport has been Restored.
Press 'q' to quit
Message #1 Received: Hello world! from thread -1225255331
Message #2 Received: Hello world! from thread -1225255331
Message #3 Received: Hello world! from thread -1225255331
Message #4 Received: Hello world! from thread -1225255331
Message #5 Received: Hello world! from thread -1225255331
Message #6 Received: Hello world! from thread -1225255331
Message #7 Received: Hello world! from thread -1225255331
Message #8 Received: Hello world! from thread -1225255331
Message #9 Received: Hello world! from thread -1225255331
Message #10 Received: Hello world! from thread -1225255331
Message #11 Received: Hello world! from thread -1225255331
Message #12 Received: Hello world! from thread -1225255331

这时我们再看看ActiveMQ服务端会有什么变化

没错,结果表明,入队2254条信息,出队1136条信息

特别说明一下,ActiveMQ 的订阅消息模式

点对点

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。这里要注意: 
消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。 
Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。 

发布/订阅 
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。发布订阅模式适用于1个消息生产者,多个消费者场景,首先启动消息订阅方,在消息发布方开始执行后,接收该消息进行处理。在ActiveMQ管理界面会动态跟进消息产生-消费(入队、出队)情况;以及生产者个数,消费者个数。

这两种模式主要区别或解决的问题就是发送到队列的消息能否重复消费(多订阅)

注意:ActiveMQ 内容还有很多,如消息持久化和非持久化处理,不同的应用场景,其消息模式也有所不同,值得大家去学习

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

linux C++ 环境下的ActiveMQ学习 的相关文章

随机推荐

  • 常见移动机器人轮直径校准(图片版)

    关注同名微信公众号 混沌无形 xff0c 阅读更多有趣好文 xff01 原文链接 xff1a 差速驱动机器人轮间距校准 xff08 包含原文PDF百度云下载链接 xff09 精彩的理论论证过程见原文链接 xff08 含全文下载链接 xff0
  • 将博客搬至CSDN

    对与 H 由于AC CE 所以 CAB 61 H 39 61 H 此时满足 tan H 61 tan H 39 61 Lrt 1 39 hz2 tan H 61 tan H 39 61 Lrt 1 39 xff08 1 xff09 可由角速
  • JNI开发C调用Java的方法和构造函数(三)

    前言 JNI的基本使用 xff0c C中调用Java的成员变量 xff0c 成员属性 xff0c 构造方法 xff0c 方法 提示 xff1a 以下是本篇文章正文内容 xff0c 下面案例可供参考 一 JNI的上下文 xff1f 大家知道在
  • 常见移动机器人运动学模型总结

    文末有彩蛋 原文及其原文中所涉及文章的PDF xff08 共9篇 xff09 免费下载 混沌无形 混沌系统是世界本质 xff0c 无形之中存在规律 机器人智能化发展从线性过渡到混沌 xff0c 本号将分享机器人全栈技术 xff08 感知 规
  • 常见移动机器人多角度对比分析

    混沌无形 混沌系统是世界本质 xff0c 无形之中存在规律 机器人智能化发展从线性过渡到混沌 xff0c 本号将分享机器人全栈技术 xff08 感知 规划 控制 xff1b 软件 机械 硬件等 xff09 43篇原创内容 公众号 文末提供原
  • 设计搭建汽车机器人(M1)

    p 文末提供原文PDF免费下载 期刊论文版式 摘要 xff1a 汽车机器人是移动机器人类型中非常典型的一种 xff0c 本文从应用需求 场景等角度分析 xff0c 详细阐述汽车机器人MCR的本体设计 硬件系统设计及软件系统设计情况 xff0
  • STM32F103mini教程学习总结与心得(一)

    关注同名微信公众号 混沌无形 xff0c 阅读更多有趣好文 xff01 1 引脚是否兼容5V的判断 xff1a 引脚表中PF表示5V xff0c 原理图中有ADC的引脚为3 3V 2 PT xff1a 容忍5V xff0c 没有PT标示 x
  • STM32F103mini教程学习总结与心得(二)---->串口通信

    关注同名微信公众号 混沌无形 xff0c 阅读更多有趣好文 xff01 一 串口原理 1 处理器与外部设备通信的两种方式 xff1a 并行通信 xff08 速度快 xff0c 占用资源多 xff09 43 串行通信 xff08 反之 xff
  • 电源管理与驱动设计笔记

    关注同名微信公众号 混沌无形 xff0c 阅读更多有趣好文 xff01 1 电源管理的功能 xff1a 具备电压过高保护 电流过大保护 电量监测 过放保护 等功能 gt 自主充电 2 一款清洁机器人的运动控制系统方案设计示意图 2 1电源充
  • (4)(4.3) 将固件加载到已有ArduPilot固件的主板上

    文章目录 前言 1 将自动驾驶仪连接到电脑 2 选择COM端口 3 安装固件 4 使用测试版和开发版 4 1 测试版 4 2 最新开发版本 4 3 自定义固件构建服务器 5 测试 前言 这些说明将告诉你如何将最新的固件下载到已经安装了 Ar
  • 【MDK KEIL】keil添加文件夹目录结构(批量添加.c和.h文件到工程中)(keil添加头文件路径)

    第一步打开创建文件夹选项 xff1a 第二步 xff1a 直接创建删除 或者上移下移 就好了 3 添加文件夹的同时别忘记导入头文件 xff1a
  • 单片机IO详解(上拉 下拉 准双向 输入 输出 推挽 开漏)

    目录 上拉 xff1a 下拉 输入 xff1a 上拉输入 下拉输入 输入浮空 模拟 施密特输入 xff1a 三态输入 xff1a 输出 复用推挽和推挽输出区别 推挽输出特点 开漏输出 xff1a 准双向口 一般单片机都会提供上拉和下拉功能
  • 局部路径规划算法——实现DWA(dynamic window approach)控制空间采样

    DWA算法是局部路径规划算法 xff0c 在全局路径规划算法完成后 xff0c DWA算法能够根据当前小车 xff08 机器人 xff09 位置 障碍物 终点的位置进行控制空间 xff08 速度 角速度 xff09 的采用 xff0c 从而
  • make和cmake简要介绍

    GCC GCC xff08 GNU Compiler Collection xff0c GNU编译器套件 xff09 是由GNU开发的编程语言译器 GNU编译器套件包括C C 43 43 Objective C Fortran Java A
  • 什么是小端模式,什么是大端模式

    字节序 前言 内存在读写数据的时候 xff0c 都是以字节为单位进行读写的 xff0c 其最小的读写单位也是字节 一个字节占8位 xff0c 如果暂且只考虑无符号数 xff0c 那么其能表示的范围只有0 255这256个整数数值 如果将一个
  • 多旋翼飞行器螺旋桨动力学模型

    1 多旋翼螺旋桨动力学模型 1 1 螺旋桨几何位置基本描述1 2 螺旋桨拉力及拉力力矩1 3 螺旋桨反扭力矩1 4 螺旋桨陀螺力矩1 5 螺旋桨动力学方程 1 多旋翼螺旋桨动力学模型 1 1 螺旋桨几何位置基本描述 坐标系分为多旋翼机体重心
  • 雷达天线孔径与分辨率

    我们现在讲一下真实孔径的问题 xff0c 我们先有一个 最小分辨角 的概念 xff0c 根据瑞丽 xff08 Rayleigh xff09 判据 xff0c 几何光学 xff0c 物体上的一个发光点经透镜成像后得到的应是一个几何像点 而由于
  • 【新手】关于ros包安装时,提示找不到相应的ros包(unable to locate package ros- **** )

    我是在学习这个大佬的教程 xff1a 从零开始的ROS学习之仿真 43 SLAM https blog csdn net u011612364 article details 122147741 xff08 引用 xff09 时发现我自己出
  • c++实现守护进程

    概述 Linux Daemon xff08 守护进程 xff09 是运行在后台的一种特殊进程 它独立于控制终端并且周期性地执行某种任务或等待处理某些发生的事件 约束 守护进程一般在系统启动时开始运行 xff0c 除非强行终止 xff0c 否
  • linux C++ 环境下的ActiveMQ学习

    ActiveMQ 1 概述 ActiveMQ 是Apache出品 xff0c 最流行的 功能强大的即时通讯和集成模式的开源服务器 ActiveMQ 是一个完全支持JMS1 1和J2EE 1 4规范的 JMS Provider实现 提供客户端