ActiveMQ发布-订阅消息模式

2023-11-05

ActiveMQ发布-订阅消息模式

一、订阅杂志

我们很多人都订过杂志,其过程很简单。只要告诉邮局我们所要订的杂志名、投递的地址,付了钱就OK。出版社定期会将出版的杂志交给邮局,邮局会根据订阅的列表,将杂志送达消费者手中。这样我们就可以看到每一期精彩的杂志了。
在这里插入图片描述
仔细思考一下订杂志的过程,我们会发现这样几个特点:
1、消费者订杂志不需要直接找出版社;
2、出版社只需要把杂志交给邮局;
3、邮局将杂志送达消费者。
邮局在整个过程中扮演了非常重要的中转作用,在出版社和消费者相互不需要知道对方的情况下,邮局完成了杂志的投递。

二、 发布-订阅消息模式

刚刚讲了订阅杂志,下面我们会讲传统调用模式演化到发布-订阅消息模式。

有些网站在注册用户成功后发一封激活邮件,用户收到邮件后点击激活链接后才能使用该网站。一般的做法是在注册用户业务逻辑中调用发送邮件的逻辑。这样用户业务就依赖于邮件业务。如果以后改为短信激活,注册用户业务逻辑就必须修改为调用发送短信的逻辑。如果要注册后给用户加点积分,再加一段逻辑。经过多次修改,我们发现很简单的注册用户业务已经越来越复杂,越来越难以维护。相信很多开发者都会有类似痛苦的经历。
在这里插入图片描述
即使用户业务实现中对其他业务是接口依赖,也避免不了业务变化带来的依赖影响。怎么办?解耦!将注册用户业务逻辑中注册成功后的处理剥离出来。

再回头看看“订阅杂志”,如果没有邮局,出版社就必须自己将杂志送达所有消费者。这种情形就和现在的注册用户业务一样。我们发现问题了,在用户业务和其他业务之间缺少了邮局所扮角色。

我们把邮局抽象成一个管理消息的地方,叫“消息管理器”。注册用户成功后发送一个消息给消息管理器,由消息管理器转发该消息给需要处理的业务。现在,用户业务只依赖于消息管理器了,它再也不会为了注册用户成功后的其他处理而烦恼。
在这里插入图片描述
注册用户的改造就是借鉴了“订阅杂志”这样原始的模式。我们再进一步抽象,用户业务就是消息的“生产者”,它将消息发布到消息管理器。邮件业务就是消息的“消费者”,它将收到的消息进行处理。邮局可以订阅很多种杂志,杂志都是通过某种编号来区分;消息管理器也可以管理多种消息,每种消息都会有一个“主题”来区分,消费者都是通过主题来订阅的。
在这里插入图片描述
发布-订阅消息模式已经呈现在我们面前,在这里,对于发布者来说,它和所有的订阅者就构成了一个1对多的关系。这种关系如下图所示:
在这里插入图片描述
示例
1、Publish.java:消息发布者

package com.ljq.durian.test.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Publish {
    private ConnectionFactory factory;
    private Connection connection;
    private Session session;
    private MessageProducer producer;

    public Publish() {
        try {
            factory = new ActiveMQConnectionFactory("ljq", "ljq", "failover:(tcp://localhost:61616)?Randomize=false");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            producer = session.createProducer(null);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void sendMessage() throws Exception {
        Destination destination = session.createTopic("Topic001");
        TextMessage msg = session.createTextMessage("我是消息内容...");
        producer.send(destination, msg);
        
        if(connection != null){
            connection.close();
        }    
    }

    public static void main(String[] args) throws Exception {
        Publish publish= new Publish();
        publish.sendMessage();
    }
}

2、Subscriber1.java:消息订阅者

package com.ljq.durian.test.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Subscriber1 {
    private ConnectionFactory factory;
    private Connection connection;
    private Session session;

    public Subscriber1() {
        try {
            factory = new ActiveMQConnectionFactory("ljq", "ljq", "failover:(tcp://localhost:61616)?Randomize=false");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void receive() throws Exception {
        Destination topic = session.createTopic("Topic001") ;
        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(new Listener());
    }

    class Listener implements MessageListener {
        @Override
        public void onMessage(Message message) {
            try {
                TextMessage tm = (TextMessage) message;
                System.out.println("Subscriber1 Received message: " + tm.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Subscriber1 subscriber = new Subscriber1();
        subscriber.receive();
    }
}

3、Subscriber2.java:消息订阅者

package com.ljq.durian.test.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Subscriber2 {
    private ConnectionFactory factory;
    private Connection connection;
    private Session session;

    public Subscriber2() {
        try {
            factory =
                    new ActiveMQConnectionFactory("ljq", "ljq",
                            "failover:(tcp://192.168.1.101:61616)?Randomize=false");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void receive() throws Exception {
        Destination topic = session.createTopic("Topic001") ;
        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(new Listener());
    }

    class Listener implements MessageListener {
        public void onMessage(Message message) {
            System.out.println(message);
            try {
                TextMessage tm = (TextMessage) message;
                System.out.println("Subscriber2 Received message: " + tm.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Subscriber2 subscriber = new Subscriber2();
        subscriber.receive();
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ActiveMQ发布-订阅消息模式 的相关文章

随机推荐

  • Python爬虫实战之爬淘宝商品并做数据分析

    前言 是这样的 之前接了一个金主的单子 他想在淘宝开个小鱼零食的网店 想对目前这个市场上的商品做一些分析 本来手动去做统计和分析也是可以的 这些信息都是对外展示的 只是手动比较麻烦 所以想托我去帮个忙 一 项目要求 具体的要求如下 1 在淘
  • 新星计划->Python循环语句while_for_猜拳游戏的实现-学习笔记

    作者 芝士小熊饼干 系列专栏 数据结构 gt 线性表 支持我 点赞 收藏 留言 新星计划参与者 创作不易 十年运道龙困井 一朝得势入青云 金鲤岂是池中物 一遇风雨变化龙 本节将学会while for in的一些基础用法及注意事项 while
  • 编译预处理:#if

    用法 if
  • MYSQL threadPool 线程池

    概述 mysql企业版安装插件使用 限制最大并发 减少服务器CPU调度 mysql默认线程调度one thread per connection 每连接一个线程 线程池适合大量短连接或高并发情况 相关参数 Plain Text 查看线程池参
  • hive中判断一个字符串是否包含另一个子串的四种方法,sql中也可用

    hive中判断一个字符串是否包含另一个子串的四种方法 如果你有一个数据需求 需要从一个字段中 判断是否有一个字符串 你该怎么做 一 方法1 like和rlike 最能想到的方法 用like或者rlike select i want to t
  • gnome的win10主题

    1 下载gnome的win10主题包 http gnome look org content show php Windows 10 Theme content 171327 2 下载win10图标 http www mediafire c
  • 异常org.hibernate.QueryException: could not resolve property的原因 解决办法

    异常org hibernate QueryException could not resolve property的原因 解决办法 异常消息如下所示 org hibernate QueryException could not resolv
  • 一个在线卷积池化计算器-推荐不会计算的同学

    很多刚入门的身边的小伙伴不会计算卷积和池化 于是为了方便大家计算顺便根据卷积池化计算公式设计了一个在线计算器 卷积池化计算器地址 如果你很懒的话 可以用这个计算
  • 攻防世界新手训练区6之disabled_button

    0x00 Tips X老师今天上课讲了前端知识 然后给了大家一个不能按的按钮 小宁惊奇地发现这个按钮按不下去 到底怎么才能按下去呢 这里用到了html中的input标签的属性 input标签一般属性可以为text password butt
  • 租赁OLED透明屏:打造独特商业体验的智慧选择

    近年来 OLED透明屏技术在商业领域中迅速崛起 其高透明度和卓越的图像质量为商家创造了全新的展示方式 租赁OLED透明屏作为一种智慧选择 不仅能提升品牌形象和吸引力 还能创造与众不同的视觉体验 对此 尼伽将和大家一起深入探讨租赁OLED透明
  • 软件测试中有几种造数据的方法呢?

    大家好 今天我们一起来聊聊在测试过程中如何进行造数据 在微信群很多伙伴问到测试执行中需要大量的数据支撑或者性能测试需要数据时 如何更好的 更高效的进行数据制造呢 一起来探讨一下吧 一 SQL语句 1 直接通过insert语句 现在工具也都支
  • 数字图像处理中imfilter函数卷积算法代码(matlab)

    function result my imfilter I h 此函数只能处理灰度图像 m n size I i add zeros m 2 n 2 m add n add size i add 此处i add为填充0边后的矩阵图像 i a
  • java socket编程

    一 网络编程中两个主要的问题 一个是如何准确的定位网络上一台或多台主机 另一个就是找到主机后如何可靠高效的进行数据传输 在TCP IP协议中IP层主要负责网络主机的定位 数据传输的路由 由IP地址可以唯一地确定Internet上的一台主机
  • iOS语音识别报错 +[AFAggregator logDictationFailedWithError:] Error Domain=kAFAssistantErrorDomain Code=7 “

    iOS语音识别引擎报错问题查找记录笔记 报错信息 iOS语音识别 测试的时候发现总是报错误 Utility AFAggregator logDictationFailedWithError Error Domain kAFAssistant
  • 信心危机的Curve 事件

    经历了前两天的 Curve 事件 DeFi 板块正面临重大的信心危机 其中与 Curve 创始人 Michael Egorov 个人债务有关的借贷平台这两天可谓是提心吊胆 Egorov 已通过一些列操作来降低被清算的风险 例如 Curve
  • DCDC降压电路学习记录

    一 输出建立过程分析 BUCK电路的基本原理就不说了 首先来分析一个BUCK电路它到输出电压稳定这个过程是如何建立起来的 假设输入电压15V输出电压5V 达到稳态时的电流为3A 它的基本电路如图所示 当输入端上电 控制电路就工作起来 MOS
  • C++ auto关键字

    文章目录 C 98 auto C 11 auto auto的用法 注意事项 C 98 auto 早在C 98标准中就存在了auto关键字 那时的auto用于声明变量为自动变量 自动变量意为拥有自动的生命期 这是多余的 因为就算不使用auto
  • Qt 实现关闭窗口触发事件

    目录 Qt 实现关闭窗口触发事件 QT中窗口关闭自动销毁 参考 Qt保存MainWindow的窗口布局 并在再次打开时显示 https blog csdn net qq 39417283 article details 114024286
  • GitHub中公私钥的配置

    一 生成新 SSH 密钥 1 打开 git 的命令行窗口 即 Git Bash 2 粘贴下面的文本 替换为你的 GitHub 电子邮件地址 GitHub 中生成新的 SSH 密钥 ssh keygen t ed25519 C your em
  • ActiveMQ发布-订阅消息模式

    ActiveMQ发布 订阅消息模式 一 订阅杂志 二 发布 订阅消息模式 一 订阅杂志 我们很多人都订过杂志 其过程很简单 只要告诉邮局我们所要订的杂志名 投递的地址 付了钱就OK 出版社定期会将出版的杂志交给邮局 邮局会根据订阅的列表 将