控制Azure服务总线消息监听器在Spring Boot中启动或停止从主题或队列监听

2024-01-02

我想要实现什么 - 用于启动/停止从队列/主题接收消息的 Azure 服务总线消息侦听器。

下面是详细的解释。

目前,我已将 Azure 服务总线集成到我的应用程序中,一旦 Spring Boot 应用程序启动,我们就会监听消息。现在我想修改这个逻辑。默认情况下,Azure 服务总线消息侦听器将被禁用。在ApplicationReadyEvent我想要执行一些任务,然后再次启用 Azure 服务总线消息侦听器以开始从主题或队列侦听。

那么我怎样才能做到这一点呢?

应用程序.yml

spring:
  cloud:
    azure:
      servicebus:
        namespace: **********
        
xxx:
  azure:
    servicebus:
      connection: ***********
      queue: **********

AzureConfiguration.java

import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;


@Configuration
public class AzureConfiguration{

    @Value("${xxx.azure.servicebus.connection}")
    private String serviceBusConnection;

    @Value("${xxx.azure.servicebus.queue}")
    private String serviceBusQueue;

    private static final String SERVICE_BUS_INPUT_CHANNEL = "yyyyy";
    private static final String SENSOR_DATA_CHANNEL = "zzzzz";
    private static final String SERVICE_BUS_LISTENER_CONTAINER = "aaaaa";


    @Bean(name = SERVICE_BUS_LISTENER_CONTAINER)
    public ServiceBusMessageListenerContainer serviceBusMessageListenerContainer(ServiceBusProcessorFactory processorFactory) {

        ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
        containerProperties.setConnectionString(serviceBusConnection);
        containerProperties.setEntityName(serviceBusQueue);
        containerProperties.setAutoComplete(true);
        return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
    }


    @Bean
    public ServiceBusInboundChannelAdapter serviceBusInboundChannelAdapter(
            @Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel,
            @Qualifier(SERVICE_BUS_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {

        ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
        adapter.setOutputChannel(inputChannel);
        
        return adapter;
    }


    @Bean(name = SERVICE_BUS_INPUT_CHANNEL)
    public MessageChannel serviceBusInputChannel() {

        return new DirectChannel();
    }


    @Bean(name = SENSOR_DATA_CHANNEL)
    public MessageChannel sensorDataChannel() {

        return new DirectChannel();
    }


    @Bean
    public IntegrationFlow serviceBusMessageFlow() {

        return IntegrationFlows.from(SERVICE_BUS_INPUT_CHANNEL)
                .<byte[], String>transform(String::new)
                .channel(SENSOR_DATA_CHANNEL)
                .get();
    }
}

AppEventListenerService.java

import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;

import java.util.List;

@Slf4j
@Service
@AllArgsConstructor
public class AppEventListenerService{

   
    @EventListener(ApplicationReadyEvent.class)
    public void OnApplicationStarted() {
        log.debug("Enter OnApplicationStarted");
        // By Default Azure Service Bus Message Listener will be disable
        // do some task
        // Enable Azure Bus Message Listener
        log.debug("Exit OnApplicationStarted");
    }
}

在上面的 AppEventListenerService.java 代码中,

// 启用 Azure 总线消息监听器 - 这里我想启动 ServiceBusConsumer 以接收来自主题/队列的消息。


从字面上看,如果您只想停止侦听器然后启动它ApplicationReadyEvent,然后你可以自动装配ServiceBusInboundChannelAdapter(or ServiceBusMessageListenerContainer) 在你的AppEventListenerService.java然后简单地调用它的 stop() 和 start() APIAppEventListenerService#OnApplicationStarted method.

然而,两者ServiceBusMessageListenerContainer and ServiceBusInboundChannelAdapter实施SmartLifecycle接口并默认启用自动启动。所以如果使用上面的解决方案,监听器(以及适配器)之前已经被触发启动ApplicationReadyEvent,这意味着监听器仍然会在一段时间内消费消息。

所以我假设您可能想要关闭监听器,直到您自己的业务逻辑完成为止。如果是的话,那么目前ServiceBusMessageListenerContainer不提供禁用自动启动的功能,我们会将您的功能请求放入我们的待办事项中。

但您仍然可以使用以下解决方法来满足您的请求。

解决方法-1

  1. 您可以延长ServiceBusMessageListenerContainer覆盖自动启动行为,
public class CustomServiceBusMessageListenerContainer extends ServiceBusMessageListenerContainer {

    private boolean autoStartUp = true;
    /**
     * Create an instance using the supplied processor factory and container properties.
     *  @param processorFactory the processor factory.
     * @param containerProperties the container properties.
     */
    public CustomServiceBusMessageListenerContainer(ServiceBusProcessorFactory processorFactory, ServiceBusContainerProperties containerProperties) {
        super(processorFactory, containerProperties);
    }

    public void setAutoStartUp(boolean autoStartUp) {
        this.autoStartUp = autoStartUp;
    }

    @Override
    public final boolean isAutoStartup() {
        return this.autoStartUp;
    }
}
  1. 声明时ServiceBusMessageListenerContainer and ServiceBusInboundChannelAdapterbean,禁用其自动启动功能。
    @Bean(SERVICE_BUS_LISTENER_CONTAINER)
    public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
        ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
        containerProperties.setEntityName(QUEUE_NAME);
        ...
        CustomServiceBusMessageListenerContainer listenerContainer = new CustomServiceBusMessageListenerContainer(processorFactory, containerProperties);
        listenerContainer.setAutoStartUp(false);
        return listenerContainer;
    }

    @Bean
    public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
        @Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel,
        @Qualifier(SERVICE_BUS_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
        ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
        adapter.setOutputChannel(inputChannel);
        adapter.setAutoStartup(false);
        return adapter;
    }
  1. 开始ServiceBusInboundChannelAdapter在你的业务逻辑之后AppEventListenerService#OnApplicationStarted.

解决方法-2

这可能有点 hack,因为我们没有公开 api 来禁用自动启动ServiceBusMessageListenerContainer,但可以在ServiceBusInboundChannelAdapter。所以你可以选择不声明一个beanServiceBusMessageListenerContainer但将其更改为适配器的局部变量,

    @Bean
    public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
        @Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusProcessorFactory processorFactory) {
        ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
        containerProperties.setEntityName(QUEUE_NAME);
        ...
        ServiceBusMessageListenerContainer listenerContainer = new ServiceBusMessageListenerContainer(processorFactory, containerProperties);

        ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
        adapter.setOutputChannel(inputChannel);
        adapter.setAutoStartup(false);
        return adapter;
    }

然后开始ServiceBusInboundChannelAdapter在你的业务逻辑之后AppEventListenerService#OnApplicationStarted.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

控制Azure服务总线消息监听器在Spring Boot中启动或停止从主题或队列监听 的相关文章

随机推荐