MQ 入门实践

2023-11-05

MQ

Message Queue,消息队列,FIFO 结构。

例如电商平台,在用户支付订单后执行对应的操作;

优点:

  • 异步
  • 削峰
  • 解耦

缺点

  • 增加系统复杂性
  • 数据一致性
  • 可用性

JMS

Java Message Service,Java消息服务,类似 JDBC 提供了访问数据库的标准,JMS 也制定了一套系统间消息通信的规范;

区别于 JDBC,JDK 原生包中并未定义 JMS 相关接口。

  1. ConnectionFactory

  2. Connection

  3. Destination

  4. Session

  5. MessageConsumer

  6. MessageProducer

  7. Message

协作方式图示为;

业界产品

ActiveMQ RabbitMQ RocketMQ kafka
单机吞吐量 万级 万级 10 万级 10 万级
可用性 非常高 非常高
可靠性 较低概率丢失消息 基本不丢 可以做到 0 丢失 可以做到 0 丢失
功能支持 较为完善 基于 erlang,并发强,性能好,延时低 分布式,拓展性好,支持分布式事务 较为简单,主要应用与大数据实时计算,日志采集等
社区活跃度

ActiveMQ

作为 Apache 下的开源项目,完全支持 JMS 规范。并且 Spring Boot 内置了 ActiveMQ 的自动化配置,作为入门再适合不过。

快速开始

添加依赖;

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

消息发送;

// 1. 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 2. 工厂创建连接
Connection connection = factory.createConnection();
// 3. 启动连接
connection.start();
// 4. 创建连接会话session,第一个参数为是否在事务中处理,第二个参数为应答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 根据session创建消息队列目的地
Destination queue = session.createQueue("test-queue");
// 6. 根据session和目的地queue创建生产者
MessageProducer producer = session.createProducer(queue);
// 7. 根据session创建消息实体
Message message = session.createTextMessage("hello world!");
// 8. 通过生产者producer发送消息实体
producer.send(message);
// 9. 关闭连接
connection.close();

Spring Boot 集成

自动注入参考:org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.SimpleConnectionFactoryConfiguration

添加依赖;

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

添加 yaml 配置;

spring:
  activemq:
    broker-url: tcp://localhost:61616
  jms:
    #消息模式 true:广播(Topic),false:队列(Queue),默认时false
    pub-sub-domain: true

收发消息;

@Autowired
private JmsTemplate jmsTemplate;

// 接收消息
@JmsListener(destination = "test")
public void receiveMsg(String msg) {
    System.out.println(msg);
}

// 发送消息
public void sendMsg(String destination, String msg) {
    jmsTemplate.convertAndSend(destination, msg);
}

高可用

基于 zookeeper 实现主从架构,修改 activemq.xml 节点 persistenceAdapter 配置;

<persistenceAdapter>
    <replicatedLevelDB
        directory="${activemq.data}/levelDB"
        replicas="3"
        bind="tcp://0.0.0.0:0"
        zkAddress="172.17.0.4:2181,172.17.0.4:2182,172.17.0.4:2183"
        zkPath="/activemq/leveldb-stores"
        hostname="localhost"
    />
</persistenceAdapter>

broker 地址为:failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false

负载均衡

在高可用集群节点 activemq.xml 添加节点 networkConnectors;

<networkConnectors>
    <networkConnector uri="static:(tcp://192.168.0.103:61616,tcp://192.168.0.103:61617,tcp://192.168.0.103:61618)" duplex="false"/>
</networkConnectors>

更多详细信息可参考:https://blog.csdn.net/haoyuyang/article/details/53931710

集群消费

由于发布订阅模式,所有订阅者都会接收到消息,在生产环境,消费者集群会产生消息重复消费问题。

ActiveMQ 提供 VirtualTopic 功能,解决多消费端接收同一条消息的问题。于生产者而言,VirtualTopic 就是一个 topic,对消费而言则是 queue。

在 activemq.xml 添加节点 destinationInterceptors;

<destinationInterceptors> 
    <virtualDestinationInterceptor> 
        <virtualDestinations> 
            <virtualTopic name="testTopic" prefix="consumer.*." selectorAware="false"/>    
        </virtualDestinations>
    </virtualDestinationInterceptor> 
</destinationInterceptors>

生产者正常往 testTopic 中发送消息,订阅者可修改订阅主题为类似 consumer.A.testTopic 这样来消费。

更多详细信息可参考:https://blog.csdn.net/java_collect/article/details/82154829

RocketMQ

是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。

架构图示

  1. Name Server

    名称服务器,类似于 Zookeeper 注册中心,提供 Broker 发现;

  2. Broker

    RocketMQ 的核心组件,绝大部分工作都在 Broker 中完成,接收请求,处理消费,消息持久化等;

  3. Producer

    消息生产方;

  4. Consumer

    消息消费方;

快速开始

安装后,依次启动 nameserver 和 broker,可以用 mqadmin 管理主题、集群和 broker 等信息;

https://segmentfault.com/a/1190000017841402

添加依赖;

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
</dependency>

消息发送;

DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("producer");
producer.start();
Message msg = new Message(
    "producer-topic",
    "msg",
    "hello world".getBytes()
);
//msg.setDelayTimeLevel(1);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
producer.shutdown();

delayLevel 从 1 开始默认依次是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

参考 org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel。

消息接收;

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("producer-topic", "msg");
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
    for (MessageExt msg : list) {
        System.out.println(new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

.\mqadmin.cmd sendMessage -t producer-topic -c msg -p “hello rocketmq” -n localhost:9876

Spring Boot 集成

添加依赖;

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>

添加 yaml 配置;

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: producer

发送消息;

@Autowired
private RocketMQTemplate mqTemplate;

public void sendMessage(String topic, String tag, String message) {
    SendResult result = mqTemplate.syncSend(topic + ":" + tag, message);
    System.out.println(JSON.toJSONString(result));
}

接收消息;

@Component
@RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test")
public class MsgListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println(message);
    }
}

Console 控制台

RocketMQ 拓展包提供了管理控制台;

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

重复消费

产生原因:

  1. 生产者重复投递;
  2. 消息队列异常;
  3. 消费者异常消费;

怎么解决重复消费的问题,换句话怎么保证消息消费的幂等性

通常基于本地消息表的方案实现,消息处理过便不再处理。

顺序消息

消息错乱的原因:

  1. 一个消息队列 queue,多个 consumer 消费;
  2. 一个 queue 对应一个 consumer,但是 consumer 多线程消费;

要保证消息的顺序消费,有三个关键点:

  1. 消息顺序发送
  2. 消息顺序存储
  3. 消息顺序消费

参考 RocketMq 中的 MessageQueueSelector 和 MessageListenerOrderly。

分布式事务

在分布式系统中,一个事务由多个本地事务组成。这里介绍一个基于 MQ 的分布式事务解决方案。

通过 broker 的 HA 高可用,和定时回查 prepare 消息的状态,来保证最终一致性。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MQ 入门实践 的相关文章

  • 在 Eclipse 中隐藏重复的工具栏项

    我不知道如何 但我的 STS 有重复的工具栏项目 我不知道如何删除它们 这是我复制的工具栏的样子 我想摆脱这些 我试图隐藏工具栏 但这没有帮助 有人知道如何删除重复的吗 自从升级到 Oxygen 以来 我一直遇到同样的问题 我无法可靠地重现
  • 使用 Eclipse 将具有外部依赖项的 Java 项目导出到 jar

    有没有一种简单的方法可以将 Java 项目 包括其所有外部依赖项 导出到标准 jar 文件 我开发了一个使用多个 Apache 库的 SDK 我希望能够将该项目作为单个 jar 发布 到目前为止我找到的这个问题的答案要求将项目打包为 Run
  • Android CursorAdapter、ListView 和后台线程

    我一直在开发的这个应用程序有包含数兆字节数据的数据库可供筛选 许多活动只是列表视图 通过数据库中的各个级别的数据下降 直到到达 文档 即从数据库中提取并显示在手机上的 HTML 我遇到的问题是 其中一些活动需要能够通过捕获击键并重新运行带有
  • APNS(Apple 推送通知服务器)的反馈服务

    我们正在使用Java作为推送通知提供商APNS I我能够将消息发送到APNS但我不知道如何获得该消息的反馈 请帮忙 反馈服务具有类似于用于发送推送通知的接口的二进制接口 您可以通过以下方式访问生产反馈服务feedback push appl
  • Ant 中回显目标描述

  • Apache Commons VFS - 无法解析文件

    VFS 方法无法处理此 URI jboss server temp dir local outgoing配置在jboss beans xml这是决心 C Download jboss eap 5 1 1 server default tmp
  • Java - toString 到 Color

    我一整天都在努力解决这个问题 基本上我做了一个 for 循环 将条目添加到数组列表中 其中一项是 颜色 变量 我已经用过random nextInt为颜色构造函数的红色 绿色和蓝色部分创建新值 我还设置了一个toString方法 这样我就可
  • 为本地@ExceptionHandler编写JUnit测试

    我有以下控制器 class Controller ResponseStatus HttpStatus OK RequestMapping value verifyCert method RequestMethod GET public vo
  • 欧拉项目 45

    我还不是一名熟练的程序员 但我认为这是一个有趣的问题 我想我应该尝试一下 三角形 五边形 六边形 数字由以下生成 公式 三角形 T n n n 1 2 1 3 6 10 15 五边形 P n n 3n 1 2 1 5 12 22 35 六角
  • 在 Java 中的 JFrame/JPanel/JComponent 中添加 Web 浏览器

    我正在开发一个 Java 应用程序 需要在应用程序中使用 Web 浏览器 我见过一些应用程序这样做 例如在同一应用程序中单击左侧面板中的提要并打开右侧面板中的链接时的 RSS 阅读器 我想实现类似的功能 在java中可以做到这一点吗 Jav
  • java数学中的组合“N选择R”?

    java库中是否有内置方法可以为任何N R计算 N选择R 公式 实际上很容易计算N choose K甚至不需要计算阶乘 我们知道 公式为 N choose K is N N K K 因此 公式为 N choose K 1 is N N N
  • 默认情况下,JSF 生成不可用的 ID,这些 ID 与 Web 标准的 CSS 部分不兼容

    活跃的 JSF 或 Primefaces 用户能否解释一下为什么默认情况下会发生这种情况 为什么没有人对此采取任何措施
  • 单元测试、集成测试还是设计中的问题?

    我编写了我的第一个单元测试 我认为它过于依赖其他模块 我不确定是否是因为 这是一个复杂的测试 我实际上已经编写了集成测试或 我的设计有问题 我首先要说的是 虽然我有大约 4 年的开发经验 但我从未学过 也没有人教过自动化测试 我刚刚使用 H
  • Java8 项目上的 SonarQube 给出 jacoco-Exception

    我刚刚下载了最新版本 SonarQube 4 3 然后尝试使用以下命令构建 java 8 项目 mvn clean install mvn sonar sonar 这给了我下面的例外 谷歌搜索 我的印象是这是一个早期的问题 应该已经解决 h
  • 用于安装 R 软件包的备用编译器:clang:错误:不支持的选项“-fopenmp”

    我正在尝试在 OS X 10 11 6 上使用 R 版本 3 4 0 安装 rJava 包 install packages rJava type source 我收到以下错误 clang o libjri jnilib Rengine o
  • SDK尚未初始化,请务必先调用FacebookSdk.sdkInitialize()

    我在实现 Facebook SDK 时遇到此错误 并且我tried https stackoverflow com questions 15490399 error inflating class com facebook widget l
  • 线程上下文类加载器和普通类加载器的区别

    线程的上下文类加载器和普通类加载器有什么区别 也就是说 如果Thread currentThread getContextClassLoader and getClass getClassLoader 返回不同的类加载器对象 将使用哪一个
  • 动态创建 JSON 对象

    我正在尝试使用以下格式创建 JSON 对象 tableID 1 price 53 payment cash quantity 3 products ID 1 quantity 1 ID 3 quantity 2 我知道如何使用 JSONOb
  • 内部类的访问修饰符[重复]

    这个问题在这里已经有答案了 可能的重复 受保护 公共内部类 https stackoverflow com questions 595179 protected public inner classes 我确信这个问题已经被问过 但我找不到
  • AES 密钥是随机的吗?

    AES 密钥可以通过此代码生成 KeyGenerator kgen KeyGenerator getInstance AES kgen init 128 but 如果我有一个 非常可靠 的生成随机数的方法 我可以这样使用它吗 SecureR

随机推荐

  • python期末复习提纲

    1 注释 变量命名 缩进 2 数据输入字符串函数input 注意结果为字符串 3 字符串解析函数eval的使用 特别注意输入字符串可直接解析为组合数据类型 理解 将字符串类型转化为现有组合类型 list dict set 或现有定义的变量等
  • 01-Embedding层是什么?怎么理解?简单的评论情感分类实验

    文章目录 1 One hot编码 2 Embedding 3 语义理解中Embedding意义 4 文本评论 代码实验 1 One hot编码 要知道embedding的作用 首先要了解独热编码 one hot 假设现在有如下对应关系 那么
  • 2016年下半年信息安全工程师上午选择题及解析

    以下有关信息安全管理员职责的叙述 不正确的是 A 信息安全管理员应该对网络的总体安全布局进行规划 B 信息安全管理员应该对信息系统安全事件进行处理 C 信息安全管理员应该负责为用户编写安全应用程序 D 信息安全管理员应该对安全设备进行优化配
  • SSM基本系统架构设计(Spring、Spring MVC 、MyBatis)

    系统根据功能的不同 项目结构可以划分为以下几个层次 1 持久对象层 也称持久层或持久化层 该层由若干持久化类 实体类 组成 2 数据访问层 DAO 层 该层由若干DAO 接口和MyBatis 映射文件组成 接口的名称统一以Dao 结尾 且M
  • 总汇nexus 服务启动异常

    总汇nexus 服务启动异常 故障描述1 故障描述2 故障描述3 备份准备修复的数据库 故障描述1 nexus oss 3 一直运行得没什么问题 忽然发现运行特别慢然后到服务器去重启 莫名其妙一直启动失败 查看日志发现如下报错信息 2022
  • C51单片机晶振频率、时钟周期、状态周期、机器周期、指令周期和总线周期的关系

    一 晶振频率 1 英文全称 frequency oscillate 2 定义 晶体振荡器的固有频率 不能改变 3 如果外接12Mhz晶振 则晶振频率12Mhz 二 时钟周期 1 英文全称 Clock Cycle 为晶振频率12Mhz倒数 2
  • 面试题computed和watch的区别

    computed和watch的区别 1 英文翻译成中文 computed就是计算属性的意思 是用来计算出一个值的 这个值 我们在调用的时候 1 不需要加括号 2 根据依赖缓存 watch就是监听的意思 1 immediat表示是否第一次执行
  • Prometheus on k8s 部署与实战操作进阶篇

    文章目录 一 概述 二 常见的几款监控工具 1 kube prometheus 和 kube prometheus stack 区别 2 Prometheus Operator 和kube prometheus 或 kube prometh
  • openpose人体姿态估计

    参考博客 Openpose驾驶员危险驾驶检测 抽烟打电话 人体姿态识别模型 openpose OpenPose人体姿态识别项目是美国卡耐基梅隆大学 CMU 基于卷积神经网络和监督学习并以caffe为框架开发的开源库 可以实现人体动作 面部表
  • Spring MVC实例(增删改查)

    本文转载自 https www cnblogs com beast king p 5786752 html 作者 beast king 转载请注明该声明 数据库配置文件application context jdbc xml
  • 计算机视觉中自注意力构建块的PyTorch实现

    作者 AI Summer 编译 ronghuaiyang 导读 一个非常好用的git仓库 封装了非常全面的计算机视觉中的自注意力构建块 直接调用 无需重复造轮子了 git仓库地址 https github com The AI Summer
  • 深度学习(20)—— ConvNext 使用

    深度学习 20 ConvNext 使用 本篇主要使用convnext做分类任务 其中使用convnext tiny 其主要有5块 stage0 stage1 stage2 stage3 head 文章目录 深度学习 20 ConvNext
  • 小程序获取后台数据方法封装

    前言 实际再很早之前就有想再小程序里面做和vue中的axios封装的方法的操作 现在终于可以把自己的理解和整理封装整理下 目录 实现步骤 1 目录结构 总共两个文件 http js是对 wx request 和 wx uploadFile
  • 在云原生时代,构建高效的大数据存储与分析平台

    文章目录 1 选择适当的数据存储技术 2 采用分布式架构 3 数据分区和索引 4 采用列式存储 5 数据压缩和编码 6 使用缓存技术 7 数据分片和复制 8 自动化运维和监控 9 数据安全和权限控制 10 实时处理和流式分析 11 数据质量
  • 连接打印机出现0x0000007e错误的解决方法

    办公室用自己电脑连接公共服务器打印机时 出现出现0x0000007e错误 查找了微软论坛 解决方案如下 删除服务器电脑注册表中的copyfiles项 具体路径如下 registry gt Local Machine gt system gt
  • element组件库的el-select多选时候,选择三个时候下拉框抖动问题解决办法

    添加如下样式即可解决
  • 华为OD机试真题- 寻找链表的中间结点【2023Q1】【JAVA、Python、C++】

    题目描述 给定一个单链表 L 请编写程序输出 L 中间结点保存的数据 如果有两个中间结点 则输出第二个中间结点保存的数据 例如 给定 L 为 1 7 5 则输出应该为 7 给定 L 为 1 2 3 4 则输出应该为 3 输入描述 每个输入包
  • php mysql layui分页_php+layui数据表格实现数据分页渲染代码

    一 HTML 二 JS 说明 需要引入layui中的table和laytpl模板引擎 laytpl可以自定义事件及自定义数据字段等 查看详情 if d hotcake 超级爆款 d hotcake else if d hotcake 大爆款
  • resultType和parameterType的基本使用和区别

    resultType与parameterType的基本使用和区别 Mybatis的Mapper文件中的select insert update delect元素中都有一个parameterType和resultType属性 paramete
  • MQ 入门实践

    MQ Message Queue 消息队列 FIFO 结构 例如电商平台 在用户支付订单后执行对应的操作 优点 异步 削峰 解耦 缺点 增加系统复杂性 数据一致性 可用性 JMS Java Message Service Java消息服务