我正在使用 ActiveMQ Artemis 2.18.0 和版本 2.5.5spring-boot-starter-artemis
对 Spring Boot 客户端的依赖。在我的用例中,客户需要通过主题相互通信。问题是字符串jms.topic.
被添加到客户端定义的每个主题的前缀。例如主题foo.sendInfo
变成jms.topic.foo.sendInfo
.
The broker.xml
文件如下图所示。这acceptor
Spring Boot 客户端使用的是netty-ssl-acceptor
在港口61617
.
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<journal-type>NIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<!--
This value was determined through a calculation.
Your system could perform 1.18 writes per millisecond
on the current journal configuration.
That translates as a sync write every 844000 nanoseconds.
Note: If you specify 0 the system will perform writes directly to the disk.
We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
-->
<journal-buffer-timeout>844000</journal-buffer-timeout>
<journal-max-io>1</journal-max-io>
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>844000</page-sync-timeout>
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
as duplicate detection requires applicationProperties to be parsed on the server. -->
<!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
default: 102400, -1 would mean to disable large mesasge control -->
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=false</acceptor>
<!-- SSL Acceptor -->
<acceptor name="netty-ssl-acceptor">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;anycastPrefix=jms.queue;multicastPrefix=jms.topic.;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE</acceptor>
<acceptor name ="mqtt+ssl">tcp://0.0.0.0:8883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;needClientAuth=true;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="admins, users"/>
<permission type="deleteNonDurableQueue" roles="admins, users"/>
<permission type="createDurableQueue" roles="admins, users"/>
<permission type="deleteDurableQueue" roles="admins, users"/>
<permission type="createAddress" roles="admins, users"/>
<permission type="deleteAddress" roles="admins, users"/>
<permission type="consume" roles="admins, users"/>
<permission type="browse" roles="admins, users"/>
<permission type="send" roles="admins, users"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="admins"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-addresses>false</auto-delete-addresses>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
</addresses>
</core>
</configuration>
Spring Boot 客户端上的连接工厂配置如下所示。
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
@Configuration
@EnableJms
public class MQTTConfig {
@Value("${JMS_BROKER_TRUSTSTORE}")
private String pathToTrustStore;
@Value("${JMS_BROKER_KEYSTORE}")
private String pathToKeystore;
@Value("${JMS_BROKER_TRUSTSTORE_PASSWORD}")
private String truststorePassword;
@Value("${JMS_BROKER_KEYSTORE_PASSWORD}")
private String keystorePassword;
@Bean
public ActiveMQConnectionFactory artemisSSLConnectionFactory() {
ActiveMQConnectionFactory artemisConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61617?&" + "sslEnabled=true&" +
"trustStorePath=" + pathToTrustStore + "&trustStorePassword=changeit");
artemisConnectionFactory.setUser("user");
artemisConnectionFactory.setPassword("password");
return artemisConnectionFactory;
}
/**
* Initialise {@link JmsTemplate} as required
*/
@Bean
public JmsTemplate jmsTemplate() throws JMSException {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(artemisSSLConnectionFactory());
jmsTemplate.setExplicitQosEnabled(true);
//setting PuSubDomain to true configures JmsTemplate to work with topics instead of queues
jmsTemplate.setPubSubDomain(true);
return jmsTemplate;
}
/**
* Initialise {@link DefaultJmsListenerContainerFactory} as required
*/
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws JMSException {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(artemisSSLConnectionFactory());
//setting PuSubDomain to true configures the DefaultJmsListenerContainerFactory to work with topics instead of queues
factory.setPubSubDomain(true);
return factory;
}
}
下面是 POM 文件,仅包含相关依赖项。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
<version>2.5.5</version>
</dependency>
下面的代码片段显示了发布到主题的生产者server.weatherForecast
以及订阅同一主题的消费者。消息在生产者和消费者之间毫无问题地交换,如下所示jms.topic.
是 Spring Boot 客户端上定义的每个主题的前缀。但是,当我使用外部工具订阅 MQTT 消息时,不会收到该工具上定义的主题的消息,除非订阅的主题从server.weatherForecast
to jms.topic.server.weatherForecast
.
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public void samplePC() {
@Autowired
private JMSTemplate jmsTemplate;
//producer that is called by a cron job
public void tester() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("serialNumber", "105");
jmsTemplate.convertAndSend("server/forecast", jsonObject.toString().toCharArray());
}
//consumer (a message from the producer should be received here, but nothing arrives)
@JmsListener(destination = "server/forecast")
private void consumeWeatherForecastRequest(char[] incomingMessage) {
//some logic
jmsTemplate.convertAndSend("someTopic", "someMessage");
}
}
启用后TRACE
记录为RemotingConnectionImpl
,我在CreateSessionResponseMessage
, the serverVersion
属性的值为 131,并且在CreateSessionMessage
, the version
属性的值为 127。
我如何确保jms.topic.
不是主题名称的前缀吗?
可以从以下位置下载最小的可重现示例this https://github.com/IsThisABug/ArtemisClientGitHub 存储库。
我尝试在代码中记录前缀,但没有找到任何方法,所有日志仅显示没有前缀的主题名称。但是,订阅从外部客户端发布的主题应指示前缀。订阅时topicName
and jms.topic.topicName
,很明显该消息将传递给后者。我注意到有些客户会解析“.”。作为“/”,因此可以尝试其他方法以防“。”不起作用。