ActiveMQ Artemis 前缀为“jms.topic”。到 Spring Boot 客户端上定义的所有主题名称


我正在使用 ActiveMQ Artemis 2.18.0 和版本 2.5.5spring-boot-starter-artemis对 Spring Boot 客户端的依赖。在我的用例中,客户需要通过主题相互通信。问题是字符串jms.topic.被添加到客户端定义的每个主题的前缀。例如主题foo.sendInfo变成

The broker.xml文件如下图所示。这acceptorSpring Boot 客户端使用的是netty-ssl-acceptor在港口61617.

<configuration xmlns="urn:activemq"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi=""
         xsi:schemaLocation="urn:activemq:core ">












       This value was determined through a calculation.
       Your system could perform 1.18 writes per millisecond
       on the current journal configuration.
       That translates as a sync write every 844000 nanoseconds.

       Note: If you specify 0 the system will perform writes directly to the disk.
             We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.


      <!-- how often we are looking for how many bytes are being used on the disk in ms -->

      <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
           that won't support flow control. -->

      <!-- should the broker detect dead locks and other issues -->






         <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
         <!-- amqpCredits: The number of credits sent to AMQP producers -->
         <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
         <!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
                                      as duplicate detection requires applicationProperties to be parsed on the server. -->
         <!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
                                       default: 102400, -1 would mean to disable large mesasge control -->

         <!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
                    "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
                    See for more information. -->

         <!-- Acceptor for every supported protocol -->
         <acceptor name="artemis">tcp://;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>

         <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
         <acceptor name="amqp">tcp://;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>

         <!-- STOMP Acceptor. -->
         <acceptor name="stomp">tcp://;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

         <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
         <acceptor name="hornetq">tcp://;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

         <!-- MQTT Acceptor -->
         <acceptor name="mqtt">tcp://;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=false</acceptor>
         <!-- SSL Acceptor -->
         <acceptor name="netty-ssl-acceptor">tcp://;tcpReceiveBufferSize=1048576;anycastPrefix=jms.queue;multicastPrefix=jms.topic.;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE</acceptor>

         <acceptor name ="mqtt+ssl">tcp://;tcpReceiveBufferSize=1048576;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;needClientAuth=true;protocols=MQTT;useEpoll=true</acceptor>


         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="admins, users"/>
            <permission type="deleteNonDurableQueue" roles="admins, users"/>
            <permission type="createDurableQueue" roles="admins, users"/>
            <permission type="deleteDurableQueue" roles="admins, users"/>
            <permission type="createAddress" roles="admins, users"/>
            <permission type="deleteAddress" roles="admins, users"/>
            <permission type="consume" roles="admins, users"/>
            <permission type="browse" roles="admins, users"/>
            <permission type="send" roles="admins, users"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="admins"/>

         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="">
            <!-- with -1 only the global-max-size is in use for limiting -->
         <!--default for catch all-->
         <address-setting match="#">
            <!-- with -1 only the global-max-size is in use for limiting -->

         <address name="DLQ">
               <queue name="DLQ" />
         <address name="ExpiryQueue">
               <queue name="ExpiryQueue" />


Spring Boot 客户端上的连接工厂配置如下所示。

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;

public class MQTTConfig {

    private String pathToTrustStore;

    private String pathToKeystore;

    private String truststorePassword;

    private String keystorePassword;

    public ActiveMQConnectionFactory artemisSSLConnectionFactory() {
        ActiveMQConnectionFactory artemisConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61617?&" + "sslEnabled=true&" +
                "trustStorePath=" + pathToTrustStore + "&trustStorePassword=changeit");
        return artemisConnectionFactory;

     * Initialise {@link JmsTemplate} as required
    public JmsTemplate jmsTemplate() throws JMSException {
        JmsTemplate jmsTemplate = new JmsTemplate();

        //setting PuSubDomain to true configures JmsTemplate to work with topics instead of queues
        return jmsTemplate;

     * Initialise {@link DefaultJmsListenerContainerFactory} as required
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws JMSException {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        //setting PuSubDomain to true configures the DefaultJmsListenerContainerFactory to work with topics instead of queues
        return factory;

下面是 POM 文件,仅包含相关依赖项。

    <relativePath/> <!-- lookup parent from repository -->

下面的代码片段显示了发布到主题的生产者server.weatherForecast以及订阅同一主题的消费者。消息在生产者和消费者之间毫无问题地交换,如下所示jms.topic.是 Spring Boot 客户端上定义的每个主题的前缀。但是,当我使用外部工具订阅 MQTT 消息时,不会收到该工具上定义的主题的消息,除非订阅的主题从server.weatherForecast to jms.topic.server.weatherForecast.

import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

public void samplePC() {

    private JMSTemplate jmsTemplate;

    //producer that is called by a cron job
    public void tester() {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("serialNumber", "105");
        jmsTemplate.convertAndSend("server/forecast", jsonObject.toString().toCharArray());

    //consumer (a message from the producer should be received here, but nothing arrives)
    @JmsListener(destination = "server/forecast")
    private void consumeWeatherForecastRequest(char[] incomingMessage) {
        //some logic
        jmsTemplate.convertAndSend("someTopic", "someMessage");

启用后TRACE记录为RemotingConnectionImpl,我在CreateSessionResponseMessage, the serverVersion属性的值为 131,并且在CreateSessionMessage, the version属性的值为 127。


可以从以下位置下载最小的可重现示例this 存储库。 我尝试在代码中记录前缀,但没有找到任何方法,所有日志仅显示没有前缀的主题名称。但是,订阅从外部客户端发布的主题应指示前缀。订阅时topicName and jms.topic.topicName,很明显该消息将传递给后者。我注意到有些客户会解析“.”。作为“/”,因此可以尝试其他方法以防“。”不起作用。

我拿走了你的复制器我设法重新创建了您在客户端使用的地方看到的问题jms.topic.test.topic。然而,一旦我添加multicastPrefix=jms.topic. to the “阿尔忒弥斯”acceptor in broker.xml问题就消失了。代理现在剥离客户端的前缀并使用test.topic反而。


我也跑了mvn dependency:tree了解您的应用程序为何使用 ActiveMQ Artemis 1.3.0 客户端。这是它的输出(部分):

[INFO] \- org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5:compile
[INFO]    +- jakarta.jms:jakarta.jms-api:jar:2.0.3:compile
[INFO]    +- jakarta.json:jakarta.json-api:jar:1.1.6:compile
[INFO]    \- org.apache.activemq:artemis-jms-client:jar:1.3.0:compile
[INFO]       +- org.apache.activemq:artemis-core-client:jar:1.3.0:compile
[INFO]       |  +- org.jgroups:jgroups:jar:3.6.9.Final:compile
[INFO]       |  +- org.apache.activemq:artemis-commons:jar:1.3.0:compile
[INFO]       |  |  +- commons-beanutils:commons-beanutils:jar:1.9.2:compile
[INFO]       |  |  |  \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO]       |  |  \-
[INFO]       |  \- io.netty:netty-all:jar:4.0.32.Final:compile
[INFO]       +- org.apache.activemq:artemis-selector:jar:1.3.0:compile
[INFO]       \- javax.inject:javax.inject:jar:1:compile

所以看来依赖于org.apache.activemq:artemis-jms-client:jar:1.3.0直接来自org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5这真的很奇怪,因为它有明确的定义依赖于org.apache.activemq:artemis-jms-client:jar:2.17.0。但是,如果我改变<parent> to use 2.5.5代替1.4.1.RELEASE问题就消失了,例如:

        <relativePath/> <!-- lookup parent from repository -->

这是什么mvn dependency:tree现在输出(部分):

[INFO] \- org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5:compile
[INFO]    +- jakarta.jms:jakarta.jms-api:jar:2.0.3:compile
[INFO]    +- jakarta.json:jakarta.json-api:jar:1.1.6:compile
[INFO]    \- org.apache.activemq:artemis-jms-client:jar:2.17.0:compile
[INFO]       +- org.apache.activemq:artemis-core-client:jar:2.17.0:compile
[INFO]       |  +- org.jgroups:jgroups:jar:3.6.13.Final:compile
[INFO]       |  +- org.apache.johnzon:johnzon-core:jar:1.2.14:compile
[INFO]       |  +- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.68.Final:compile
[INFO]       |  |  \- io.netty:netty-transport-native-unix-common:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-transport-native-kqueue:jar:osx-x86_64:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-codec-http:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-buffer:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-transport:jar:4.1.68.Final:compile
[INFO]       |  |  \- io.netty:netty-resolver:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-handler:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-handler-proxy:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-codec:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-codec-socks:jar:4.1.68.Final:compile
[INFO]       |  \- io.netty:netty-common:jar:4.1.68.Final:compile
[INFO]       +- org.apache.activemq:artemis-commons:jar:2.17.0:compile
[INFO]       |  +- org.jboss.logging:jboss-logging:jar:3.4.2.Final:compile
[INFO]       |  \- commons-beanutils:commons-beanutils:jar:1.9.4:compile
[INFO]       |     \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO]       \- org.apache.activemq:artemis-selector:jar:2.17.0:compile

