Apache MQ Artemis - MQTT 保留消息对于连接到集群中其他节点的客户端不可用

2024-01-09

I have 4 ActiveMQ 阿耳忒弥斯 2.10.1实例运行在cluster。客户端 (A) 连接到其中一个节点并发布MQTTQoS=0 的消息并且保留=真。当新的 MQTT 客户端 (B) 连接到另一个节点在集群中并订阅了该主题,之前发送的消息是没收到。当客户端 A 发布下一条消息时,客户端 B 会收到该消息。

这是 Artemis 中的一个错误,当客户端 B 连接到集群中的另一个节点时,客户端 B 没有收到 A 发送的保留消息,或者我丢失了某些内容?

如果我将客户端 A 和客户端 B 连接到集群中的同一节点,则客户端 B 在连接并订阅主题后会收到消息。

Broker.xml - 我使用poc.#用于测试:

<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

    <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq:core ">

        <name>Broker 01</name> <!-- changed on every node -->

        <persistence-enabled>true</persistence-enabled>
        <journal-type>NIO</journal-type>
        <paging-directory>data/paging</paging-directory>
        <bindings-directory>data/bindings</bindings-directory>
        <journal-directory>data/journal</journal-directory>
        <large-messages-directory>data/large-messages</large-messages-directory>
        <journal-datasync>true</journal-datasync>
        <journal-min-files>2</journal-min-files>
        <journal-pool-files>10</journal-pool-files>
        <journal-device-block-size>4096</journal-device-block-size>
        <journal-file-size>10M</journal-file-size>

        <journal-buffer-timeout>1328000</journal-buffer-timeout>

        <journal-max-io>1</journal-max-io>
        <disk-scan-period>5000</disk-scan-period>
        <max-disk-usage>90</max-disk-usage>
        <critical-analyzer>true</critical-analyzer>
        <critical-analyzer-timeout>120000</critical-analyzer-timeout>
        <critical-analyzer-check-period>60000</critical-analyzer-check-period>
        <critical-analyzer-policy>HALT</critical-analyzer-policy>

        <acceptors>
            <acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
            <acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
        </acceptors>

        <connectors>
            <connector name="netty-connector">tcp://10.0.0.2:61616</connector>  <!-- changed on every node -->
        </connectors>

        <broadcast-groups>  
            <broadcast-group name="my-broadcast-group">  
                <group-address>231.7.7.7</group-address>  
                <group-port>9876</group-port>  
                <broadcast-period>2000</broadcast-period>  
                <connector-ref>netty-connector</connector-ref>  
            </broadcast-group>  
        </broadcast-groups>
        <discovery-groups>  
            <discovery-group name="my-discovery-group">  
                <group-address>231.7.7.7</group-address>  
                <group-port>9876</group-port>  
                <refresh-timeout>10000</refresh-timeout>  
            </discovery-group>  
        </discovery-groups>

        <cluster-connections>
             <cluster-connection name="artemis-cluster">
                <address></address>
                <connector-ref>netty-connector</connector-ref>
                <retry-interval>1000</retry-interval>
                <use-duplicate-detection>true</use-duplicate-detection>
                <message-load-balancing>ON_DEMAND</message-load-balancing>
                <max-hops>1</max-hops>
                <discovery-group-ref discovery-group-name="my-discovery-group"/> 
             </cluster-connection>
        </cluster-connections>

        <security-settings>
            <security-setting match="#">
                <permission type="createNonDurableQueue" roles="amq"/>
                <permission type="deleteNonDurableQueue" roles="amq"/>
                <permission type="createDurableQueue" roles="amq"/>
                <permission type="deleteDurableQueue" roles="amq"/>
                <permission type="createAddress" roles="amq"/>
                <permission type="deleteAddress" roles="amq"/>
                <permission type="consume" roles="amq"/>
                <permission type="browse" roles="amq"/>
                <permission type="send" roles="amq"/>
                <!-- we need this otherwise ./artemis data imp wouldn't work -->
                <permission type="manage" roles="amq"/>
            </security-setting>

            <!-- Security setting for testing. -->
            <security-setting match="poc.#">
                <permission type="createNonDurableQueue" roles="read-write"/>
                <permission type="deleteNonDurableQueue" roles="read-write"/>
                <permission type="createDurableQueue" roles="read-write"/>
                <permission type="deleteDurableQueue" roles="read-write"/>
                <permission type="createAddress" roles="read-write"/>
                <permission type="deleteAddress" roles="write"/>
                <permission type="consume" roles="read-write"/>
                <permission type="browse" roles="read-write"/>
                <permission type="send" roles="write"/>
            </security-setting>
        </security-settings>

        <address-settings>
            <address-setting match="activemq.management#">
                <dead-letter-address>DLQ</dead-letter-address>
                <expiry-address>ExpiryQueue</expiry-address>
                <redelivery-delay>0</redelivery-delay>
                <max-size-bytes>-1</max-size-bytes>
                <message-counter-history-day-limit>10</message-counter-history-day-limit>
                <address-full-policy>PAGE</address-full-policy>
                <auto-create-queues>true</auto-create-queues>
                <auto-create-addresses>true</auto-create-addresses>
                <auto-create-jms-queues>true</auto-create-jms-queues>
                <auto-create-jms-topics>true</auto-create-jms-topics>
            </address-setting>
            <address-setting match="#">
                <dead-letter-address>DLQ</dead-letter-address>
                <expiry-address>ExpiryQueue</expiry-address>
                <redelivery-delay>0</redelivery-delay>
                <max-size-bytes>-1</max-size-bytes>
                <message-counter-history-day-limit>10</message-counter-history-day-limit>
                <address-full-policy>PAGE</address-full-policy>
                <auto-create-queues>true</auto-create-queues>
                <auto-create-addresses>true</auto-create-addresses>
                <auto-create-jms-queues>true</auto-create-jms-queues>
                <auto-create-jms-topics>true</auto-create-jms-topics>
            </address-setting>

            <!-- Address setting for testing -->
            <address-setting match="poc.#">
                <dead-letter-address>DLQ</dead-letter-address>
                <expiry-address>ExpiryQueue</expiry-address>
                <redelivery-delay>0</redelivery-delay>
                <max-size-bytes>-1</max-size-bytes>
                <message-counter-history-day-limit>10</message-counter-history-day-limit>
                <address-full-policy>PAGE</address-full-policy>
                <auto-create-queues>true</auto-create-queues>
                <auto-delete-queues>true</auto-delete-queues>
                <auto-create-addresses>true</auto-create-addresses>
                <auto-delete-addresses>true</auto-delete-addresses>
                <default-address-routing-type>MULTICAST</default-address-routing-type>
            </address-setting>
        </address-settings>

        <addresses>
            <address name="DLQ">
                <anycast>
                    <queue name="DLQ" />
                </anycast>
            </address>
            <address name="ExpiryQueue">
                <anycast>
                    <queue name="ExpiryQueue" />
                </anycast>
            </address>
        </addresses>
    </core>
</configuration>


你需要设置这个<address-setting>对于您关心的地址:

<redistribution-delay>0</redistribution-delay>

See the “消息重新分发” http://activemq.apache.org/components/artemis/documentation/latest/clusters.html#message-redistribution有关更多详细信息,请参阅 ActiveMQ Artemis 文档部分。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache MQ Artemis - MQTT 保留消息对于连接到集群中其他节点的客户端不可用 的相关文章

  • 如何在android中判断.3gp文件是音频还是视频

    我有一个应用程序 允许用户导入媒体 视频 照片 音频 然后由应用程序管理 作为证据 我发现一些录音应用程序会将音频保存为 3gp 格式 特别是 Whats App Messenger 如果我使用以下代码获取 mime 类型 MimeType

随机推荐

  • 使用 iframe 将 CSS 媒体查询应用于块元素

    我当时正在做一些网页设计 心想 嘿 如果我们可以将 CSS 媒体查询应用于块元素 即div section ETC 说真的 如果我们能够拥有此功能 我们可以制作一些非常令人惊叹的流体布局 在这篇文章中 我将使用一个简单的社交插件 我可以将其
  • 使用查找表中的值替换文本,无需 for 循环

    我正在编写一个拼写纠正功能 我刮了拼写变体 https en wikipedia org wiki Wikipedia List of spelling variants来自维基百科的页面并将其转换为表格 我现在想使用它作为查找表 拼写 并
  • pandas python 中没有名为 read_csv 的属性

    我是机器学习新手 正在使用 Python 中的 pandas 创建数据集 我查找了一个教程 只是尝试了创建数据框的基本代码 但我不断收到以下回溯 AttributeError 模块 对象没有属性 read csv 我已将 Excel 13
  • 如何从 C++ 中的函数返回结构体?

    我在几个不同的论坛上尝试过 似乎无法得到直接的答案 如何让这个函数返回结构 如果我尝试 返回 newStudent 我收到错误 不存在从 StudentType 到 StudentType 的合适的用户定义转换 Input function
  • 如何编写一个 for 循环来创建模型并具有引用同一模型的函数

    我正在尝试使用不平衡的双向方差分析进行事后分析anova test函数在rstatix包裹 我需要迭代地运行这个事后测试 因为我有大约 26 个响应 y 变量 我的第一步是创建我所有的模型y与相关的变量group and treatment
  • 使用 React Native Realm 查询多个列表对象

    第一天使用 React Native 和 Realm 我很难弄清楚如何通过两个 Realm 列表对象执行查询 Tasks have Reservations 其中有Renters 其中有first name and last name字段
  • yii -> dropDownList -> CHtml::listData -> 设置选定值

    如何在以下语句中设置所选值 echo form gt dropDownList model land id CHtml listData ListeLand model gt findAll iso landname array class
  • HTTP 状态 403 - 在请求参数中发现无效的 CSRF 令牌“null”

    我必须向我的宁静服务发出 HTTP Post Android 应用程序 来注册新用户 问题是 当我尝试向注册端点 没有安全性 发出请求时 Spring 一直阻止我 我的项目依赖项
  • SwiftUI 何时更新其视图?

    如果你看一下flagTapped函数 你会看到它修改了很多State变量 通过使用断点 我意识到body暂时没有更新round被修改 但仅在函数运行之后 有人可以向我解释一下 SwiftUI 何时更新其视图吗 import SwiftUI
  • java中将字符串转换为日期

    我正在尝试将字符串解析为 Android 应用程序中的日期字段 但我似乎无法正确解析 这是我试图转换为日期 03 26 2012 11 49 00 AM 的字符串 我正在使用的功能是 private Date ConvertToDate S
  • 从命令行获取一组数字的平均值的最快方法是什么?

    使用您希望在 nix 系统上找到的任何工具 事实上 如果您愿意 msdos 也很好 计算一组数字的平均值的最简单 最快的方法是什么 假设您有一个流或文件中的每行 awk n 1 END print n NR 这将总和累加到n 然后除以项目数
  • 元素左侧和右侧的文本

    使用 CSS 将文本放在元素的右侧和左侧并垂直位于同一位置的最佳方式是什么 Thus ending up with the following layout 容器有固定的宽度 所以我不想使用定位 因为我知道我不必这样做 1 在包含每个文本字
  • Android - 确定短信发送失败的原因

    我有一个提醒应用程序 它会发送短信通知用户提醒时间已过 这效果很好 然而 我一直在测试当手机处于睡眠状态并错过提醒时会发生什么 我在用着AlarmManager设置闹钟以对应提醒时间 我的日志显示 当手机重新启动并尝试发送短信时 警报响起
  • 如何从 jQuery 函数返回值?

    我正在尝试从 jQuery 函数返回值 这是我的功能 output keypress function event var keycode event keyCode event keyCode event which if keycode
  • 将二维数组写入C中的文件

    我曾经使用下面的代码将一维数组写入文件 FILE fp float floatValue 5 1 1F 2 2F 3 3F 4 4F 5 5F int i if fp fopen test wb NULL printf Cannot ope
  • 运行时的真实 SBT 类路径

    我有一些测试用例需要查看类路径以提取其中某些文件 目录的路径 这在 IDE 中运行良好 问题是 当运行 SBT 测试时 Properties javaClassPath给我 usr share sbt launcher packaging
  • MVC4 RC 中缺少单页应用程序模板

    当我在开发 PC 上安装 MVC4 beta 时 它有一个 SPA 单页应用程序 的模板 今天在具有相同设置 VS2010 SP1 win7 的另一台PC上 我安装了MVC4 RC http www microsoft com en us
  • 如何在单独的类库.net core中使用DbContext?

    我正在尝试从类库中的 net core 3 1 MVC 项目访问我的 dbcontext 目前我将数据库注入到服务集合中startup cs public class AppDbContext DbContext public DbSet
  • 在 foreach 循环内部和外部调用 SaveChanges() 有什么区别吗?

    假设对循环内部的 EF 实体进行了更改 在 foreach 循环中或循环外部调用 EF SaveChanges 之间是否存在任何性能优势 技术差异 YES 如果在循环内调用它 EF会将更改写回数据库对于每一个实体 每个实体都将进行自己的单独
  • Apache MQ Artemis - MQTT 保留消息对于连接到集群中其他节点的客户端不可用

    I have 4 ActiveMQ 阿耳忒弥斯 2 10 1实例运行在cluster 客户端 A 连接到其中一个节点并发布MQTTQoS 0 的消息并且保留 真 当新的 MQTT 客户端 B 连接到另一个节点在集群中并订阅了该主题 之前发送