从源码角度分析RabbitMQ重启后,消费者停止消费怎么解决

2023-10-28

        前段时间的RabbitMQ broker服务端由于某个队列一直积压消息,运维在凌晨对mq服务端机器pod进行了扩容,重启了RabbitMQ,然后早上发现自己的服务在mq重启之后一直报异常,停止消费了,导致影响了业务的运行,虽然mq重启成功了但是消费者却没有重连成功。本节会通过分析spring-rabbit的源码,来分析问题出现的原因以及解决办法。

目录

一、出现的问题

二、spring-rabbit消费源码分析         

三、解决消费者停止消费问题


一、出现的问题

        先看下报了什么异常,这里挑了一些主要的异常堆栈贴出来

o.s.a.r.l.BlockingQueueConsumer - Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException
Failed to declare queue(s):[work_queue]

...............................................

Consumer received fatal=false exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.handleDeclarationException(BlockingQueueConsumer.java:661)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:601)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:581)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1196)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1041)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[work_queue]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:710)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:594)
    ... 4 common frames omitted
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1006)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:52)
    at sun.reflect.GeneratedMethodAccessor175.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1110)
    at com.sun.proxy.$Proxy285.queueDeclarePassive(Unknown Source)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:689)
    ... 5 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost 'work_platform', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
    ... 13 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost 'work_platform', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:670)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:597)
    ... 1 common frames omitted

...........................

o.s.a.r.l.SimpleMessageListenerContainer  message:  Stopping container from aborted consumer

        我们挑里面的几个主要的错误信息: 

  1. Failed to declare queue(s):[work_queue]
  2. Consumer received fatal=false exception on startup org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
  3. Stopping container from aborted consumer

二、spring-rabbit消费源码分析         

        打开spring-rabbit的源码发现Consumer received fatal=false exception on startup异常在SimpleMessageListenerContainer类中的子类AsyncMessageProcessingConsumer的run()方法中。

         下面简单分析一下spring-rabbit的消费者源码流程:

        spring-rabbit会为我们的每个消费者(可能消费一个或者多个队列)创建一个SimpleMessageListenerContainer对象,SimpleMessageListenerContainer继承了AsyncMessageProcessingConsumer,而AsyncMessageProcessingConsumer又实现了Lifecycle接口的start()方法,启动时会调用start()方法。

         AsyncMessageProcessingConsumer的start()方法如下:

	/**
	 * Start this container.
	 * @see #doStart
	 */
	@Override
	public void start() {
		if (isRunning()) {
			return;
		}
		if (!this.initialized) {
			synchronized (this.lifecycleMonitor) {
				if (!this.initialized) {
					afterPropertiesSet();
				}
			}
		}
		try {
			if (logger.isDebugEnabled()) {
				logger.debug("Starting Rabbit listener container.");
			}
			configureAdminIfNeeded();
			checkMismatchedQueues();
			doStart();
		}
		catch (Exception ex) {
			throw convertRabbitAccessException(ex);
		}
	}

        start()方法会调用doStart()方法

/**
 * Start this container, and notify all invoker tasks.
 */
protected void doStart() {
	// Reschedule paused tasks, if any.
	synchronized (this.lifecycleMonitor) {
		this.active = true;
		this.running = true;
		this.lifecycleMonitor.notifyAll();
	}
}

        实际上调用的是子类SimpleMessageListenerContainer中的doStart方法,

 /**
 * Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer
 * to this container's task executor.
 */
@Override
protected void doStart() {
	checkListenerContainerAware();
	super.doStart();
	synchronized (this.consumersMonitor) {
		if (this.consumers != null) {
			throw new IllegalStateException("A stopped container should not have consumers");
		}
		int newConsumers = initializeConsumers();
		if (this.consumers == null) {
			logger.info("Consumers were initialized and then cleared " +
					"(presumably the container was stopped concurrently)");
			return;
		}
		if (newConsumers <= 0) {
			if (logger.isInfoEnabled()) {
				logger.info("Consumers are already running");
			}
			return;
		}
		Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
		for (BlockingQueueConsumer consumer : this.consumers) {
			AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
			processors.add(processor);
			getTaskExecutor().execute(processor);
			if (getApplicationEventPublisher() != null) {
				getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
			}
		}
		waitForConsumersToStart(processors);
	}
}

        里面的initializeConsumers方法根据我们配置的消费者线程数量concurrentConsumers创建对应数量的消费者,实际的消费逻辑都在BlockingQueueConsumer中。

        然后循环遍历BlockingQueueConsumer集合,将每个BlockingQueueConsumer包装创建一个AsyncMessageProcessingConsumer(实现了Runnable接口)。        

        getTaskExecutor().execute(processor)获取线程池执行创建的线程任务,然后发布了一个AsyncConsumerStartedEvent事件。      

protected int initializeConsumers() {
	int count = 0;
	synchronized (this.consumersMonitor) {
		if (this.consumers == null) {
			this.cancellationLock.reset();
			this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
			for (int i = 0; i < this.concurrentConsumers; i++) {
				BlockingQueueConsumer consumer = createBlockingQueueConsumer();
				this.consumers.add(consumer);
				count++;
			}
		}
	}
	return count;
}

        下面看下消费的最核心逻辑,也就是AsyncMessageProcessingConsumer实现的run方法:

         我们看到try catch代码块中有一个while循环中有一个mainLoop()循环,mainLoop方法中就是拉取消息的逻辑,可以看到catch了很多的异常。在上面定义了一个boolean类型的变量aborted默认false,在catch到的有些异常当中将aborted改为了true,aborted这个变量的值直接决定了下面在killOrRestart()方法中的处理逻辑。

        看下前面抛出的QueuesNotAvailableException异常的catch逻辑,会判断missingQueuesFatal属性是否为true,如果missingQueuesFatal为true会将aborted改成了true,然后调用了publishConsumerFailedEvent方法。

        missingQueuesFatal属性默认为true,表示队列出现异常(队列不可用/被删除)的时候是否失败,如果失败的话消费者容器(MessageListenerContainer)不会自动进行重启,如果为false则表示队列出现异常时会自动重启消费容器。将missingQueuesFatal改成false也是一种解决消费者不消费的解决办法。

spring.rabbitmq.listener.simple.missing-queues-fatal=true

protected boolean isMissingQueuesFatal() {
	return this.missingQueuesFatal;
}
......
catch (QueuesNotAvailableException ex) {
	logger.error("Consumer received fatal=" + isMismatchedQueuesFatal() + " exception on startup", ex);
	if (isMissingQueuesFatal()) {
		this.startupException = ex;
		// Fatal, but no point re-throwing, so just abort.
		aborted = true;
	}
	publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex);
}

         publishConsumerFailedEvent方法中传入了aborted(此时为true),即fatal的值为true,abortEvents(BlockingQueue)添加了一个ListenerContainerConsumerFailedEvent事件(fatal此时为true)。

private final BlockingQueue<ListenerContainerConsumerFailedEvent> abortEvents = new LinkedBlockingQueue<>();
......
protected void publishConsumerFailedEvent(String reason, boolean fatal, Throwable t) {
	if (!fatal || !isRunning()) {
		super.publishConsumerFailedEvent(reason, fatal, t);
	}
	else {
		try {
			this.abortEvents.put(new ListenerContainerConsumerFailedEvent(this, reason, t, fatal));
		}
		catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		}
	}
}

        killOrRestart()方法:关键的地方都写了注释,可以看到aborted的值是true的时候,会从阻塞队列abortEvents中获取ListenerContainerConsumerFailedEvent事件,并且广播该事件;如果aborted为false的时候会调用restart方法重启消费者容器。

        注意:这里当前面的catch的异常中将aborted改成了true或者消费者已经关闭的状态下,消费者容器不会自动重启,仅仅是发布了一个ListenerContainerConsumerFailedEvent广播事件,其他情况下消费者会自动重启。

        我们针对消费者停止消费的处理逻辑也就可以从ListenerContainerConsumerFailedEvent广播事件入手。

private void killOrRestart(boolean aborted) {
	//判断consumer是关闭状态 || aborted==true
	if (!isActive(this.consumer) || aborted) {
		logger.debug("Cancelling " + this.consumer);
		try {
			this.consumer.stop();
			SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
			if (getApplicationEventPublisher() != null) {
				getApplicationEventPublisher().publishEvent(
						new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
			}
		}
		catch (AmqpException e) {
			logger.info("Could not cancel message consumer", e);
		}
		if (aborted && SimpleMessageListenerContainer.this.containerStoppingForAbort
				.compareAndSet(null, Thread.currentThread())) {
			logger.error("Stopping container from aborted consumer");
			stop();
			SimpleMessageListenerContainer.this.containerStoppingForAbort.set(null);
			ListenerContainerConsumerFailedEvent event = null;
			do {
				try {
					//从阻塞队列abortEvents中获取ListenerContainerConsumerFailedEvent事件
					event = SimpleMessageListenerContainer.this.abortEvents.poll(ABORT_EVENT_WAIT_SECONDS,
							TimeUnit.SECONDS);
					if (event != null) {
						//如果ListenerContainerConsumerFailedEvent不为空,发布广播该事件
						SimpleMessageListenerContainer.this.publishConsumerFailedEvent(
								event.getReason(), event.isFatal(), event.getThrowable());
					}
				}
				catch (InterruptedException e) {
					Thread.currentThread().interrupt();
				}
			}
			while (event != null);
		}
	}
	else {
		logger.info("Restarting " + this.consumer);
		//调用restart方法重启消费者容器
		restart(this.consumer);
	}
}

         restart方法会重新创建消费者,发布消费者重启事件AsyncConsumerRestartedEvent,通过线程池执行AsyncMessageProcessingConsumer任务。

private void restart(BlockingQueueConsumer oldConsumer) {
	BlockingQueueConsumer consumer = oldConsumer;
	synchronized (this.consumersMonitor) {
		if (this.consumers != null) {
			try {
				// Need to recycle the channel in this consumer
				consumer.stop();
				// Ensure consumer counts are correct (another is going
				// to start because of the exception, but
				// we haven't counted down yet)
				this.cancellationLock.release(consumer);
				this.consumers.remove(consumer);
				if (!isActive()) {
					// Do not restart - container is stopping
					return;
				}
				//重新创建消费者BlockingQueueConsumer
				BlockingQueueConsumer newConsumer = createBlockingQueueConsumer();
				newConsumer.setBackOffExecution(consumer.getBackOffExecution());
				consumer = newConsumer;
				this.consumers.add(consumer);
				if (getApplicationEventPublisher() != null) {
					//发布消费者重启事件AsyncConsumerRestartedEvent
					getApplicationEventPublisher()
							.publishEvent(new AsyncConsumerRestartedEvent(this, oldConsumer, newConsumer));
				}
			}
			catch (RuntimeException e) {
				logger.warn("Consumer failed irretrievably on restart. " + e.getClass() + ": " + e.getMessage());
				// Re-throw and have it logged properly by the caller.
				throw e;
			}
			//通过线程池异步执行任务AsyncMessageProcessingConsumer
			getTaskExecutor()
					.execute(new AsyncMessageProcessingConsumer(consumer));
		}
	}
}

三、解决消费者停止消费问题

         通过上面的源码分析,我们得知消费出现某些异常(例如QueuesNotAvailableException)的时候会发布一个ListenerContainerConsumerFailedEvent事件,我们可以监听这个事件重启消费者容器。

        spring中跟RabbitMQ相关的事件是AmqpEvent的子类

      spring通过发布事件的方式,可以通知观察者(即事件监听器)消费者的一些行为,消费者相关的事件如下所示:

  • AsyncConsumerStartedEvent:一个新的消费者启动事件
  • AsyncConsumerStoppedEvent:一个消费者停止事件
  • AsyncConsumerRestartedEvent:一个消费者重启事件
  • ListenerContainerConsumerFailedEvent:一个消息监听器消费失败的事件

        我们可以监听ListenerContainerConsumerFailedEvent事件,其定义如下所示:其中有一个属性fatal,我们上面也提到过,fatal为true时表示消费者出现了致命的错误,此时消费者不会自动重试进行重新启动,需要我们在事件处理逻辑中进行重启。当fatal为false时,我们可以忽略该事件,消费者容器中会自动重试启动。

public class ListenerContainerConsumerFailedEvent extends AmqpEvent {
	private final String reason;
	private final boolean fatal;
	private final Throwable throwable;
}

        处理逻辑代码:判断event的fatal是true时,先判断container是否在运行,如果没有在运行则调用start进行启动,然后发送告警信息。

import java.util.Arrays;
import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {
 
    @Override
    public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
        log.error("消费者失败事件发生:{}", event);
        if (event.isFatal()) {
            log.error("Stopping container from aborted consumer. Reason::{}", event.getReason(), event.getThrowable());
            SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
            String queueNames = Arrays.toString(container.getQueueNames());
            try {
                try {
                    Thread.sleep(30000);
                } catch (Exception e) {
                    log.error(e.getMessage());
                }
                //判断此时消费者容器是否正在运行
                Assert.state(!container.isRunning(), String.format("监听容器%s正在运行!", container));
                //消费者容器没有在运行时,进行启动
                container.start();
                log.info("重启队列{}的监听成功", queueNames);
            } catch (Exception e) {
                log.error("重启队列{}的监听失败", queueNames, e);
            }
            // TODO 短信/邮件/钉钉...告警,包含队列信息,监听断开原因,断开时异常信息,重启是否成功等...
        }
    }
}

        另外,也可以将missingQueuesFatal改成false,可以在抛出QueuesNotAvailableException异常时不改变aborted的值,这样在killOrRestart方法中就会自动自动调用重启的方法,但是这种处理方式仅限于QueuesNotAvailableException异常,不像上面的处理方式具有通用性。

spring.rabbitmq.listener.simple.missing-queues-fatal=false

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从源码角度分析RabbitMQ重启后,消费者停止消费怎么解决 的相关文章

  • 如何优雅地结束 spring @Schedule 任务?

    我正在尝试让 Spring Boot 服务优雅地结束 它有一个方法 Scheduled注解 该服务使用 spring data 作为数据库 使用 spring cloud stream 作为 RabbitMQ 在计划的方法结束之前 数据库和
  • Airflow Worker 没有监听默认的 RabbitMQ 队列

    我已经使用rabbitmq代理配置了Airflow 服务 airflow worker airflow scheduler airflow webserver 正在运行 没有任何错误 调度程序正在推动任务执行default兔子MQ队列 即使
  • Camel 中的无限循环 - Rabbitmq

    我有一个小型服务器路由 它从queue in 获取消息并放入queue out 当我输入一条消息时queue in 服务器无限循环运行 我想知道我在配置方面缺少什么 这看起来是一条非常简单的路线 小服务器路由
  • 使用spring-amqp和rabbitmq实现带退避的非阻塞重试

    我正在寻找一种使用 spring amqp 和 Rabbit MQ 的退避策略来实现重试的好方法 但要求是侦听器不应被阻止 因此可以自由地处理其他消息 我在这里看到了类似的问题 但它不包括 后退 的解决方案 RabbitMQ 和 Sprin
  • 在点网核心应用程序中使用 RabbitMQ 跳过 MassTransit 中的队列

    我有三个项目 一个是Dot net core MVC 两个是API项目 MVC 正在调用一个 API 来获取用户详细信息 当询问用户详细信息时 我通过 MassTransit 向队列发送消息 我看到跳过队列 第三个项目中有消费者 即API项
  • 如何在 celery task.apply_async 中使用优先级

    我有一个testcelery 中的队列 我为它定义了一个任务 celery app task queue test ignore result True def priority test priority print priority 它
  • 保持鼠兔 BlockingConnection 存活而不禁用心跳

    我正在使用 pika 0 10 0 和 python 2 7 版本开发 RabbitMQ 消费者 在我的消费者客户端中 我有一个根据输入消息运行一段时间的进程 时间可能从 3 到 40 分钟不等 我不想禁用心跳 相反 我正在寻找一些回滚机制
  • 使用Camel的spring-rabbitmq组件时如何自动声明交换?

    我正在尝试从 Camel 3 x 迁移到 Camel 4 x 版本 因此我需要从rabbitmq替换组件spring rabbitmq With rabbitmq我正在使用的组件declare https camel apache org
  • 在 RabbitMQ 监听器中隐藏运行时异常

    在某些故意发生的情况下 我使用了一些异常来拒绝消息 但在控制台中显示了乍一看似乎不太正常的异常 如何在登录控制台 文件时隐藏该特定异常 我正在使用 spring boot 和默认记录器 public static class Undispa
  • RabbitMQ C# API:如何检查绑定是否存在?

    使用 RabbitMQ C API 我如何检查给定队列到给定交换是否存在绑定 很多 RabbitMQ 调用都是幂等的 所以有些人可能会说在这些情况下检查是不必要的 但我认为它们在测试中很有用 您可以使用他们的 REST API 来调用并查看
  • 每次发布后我应该关闭通道/连接吗?

    我在 Node js 中使用 amqplib 但我不清楚代码中的最佳实践 基本上 我当前的代码调用amqp connect 当 Node 服务器启动时 然后为每个生产者和每个消费者使用不同的通道 而不会真正关闭它们中的任何一个 我想知道这是
  • 为什么需要消息队列来与 Web 套接字聊天?

    我在互联网上看到了很多使用 Web 套接字和 RabbitMQ 进行聊天的示例 https github com videlalvaro rabbitmq chat https github com videlalvaro rabbitmq
  • 死信交换 RabbitMQ 丢弃消息

    我正在尝试在 RabbitMQ 中实现 dlx 队列 场景很简单 我有 2 个队列 1 活着 2 死亡 x dead letter exchange 立即 x message ttl 5000 以及 立即 交换 这必然是 1 活着 我尝试运
  • 如何使用 Celery、RabbitMQ 和 Django 确保每个用户的任务执行顺序?

    我正在运行 Django Celery 和 RabbitMQ 我想要实现的是确保与一个用户相关的任务按顺序执行 具体来说 一次执行一个 我不希望每个用户执行任务并发 每当为用户添加新任务时 它应该取决于最近添加的任务 如果此类型的任务已为此
  • 在 Celery 工作线程中捕获 Heroku SIGTERM 以优雅地关闭工作线程

    我对此进行了大量研究 令我惊讶的是我还没有在任何地方找到一个好的答案 我正在 Heroku 上运行一个大型应用程序 并且我有某些运行很长时间处理的 celery 任务 并在任务结束时保存结果 每次我在 Heroku 上重新部署时 它都会发送
  • Erl 无法连接到本地 EPMD。为什么?

    Erlang R14B04 erts 5 8 5 source 64 bit rq 1 async threads 0 kernel poll false Eshell V5 8 5 abort with G root ip 10 101
  • AMQP如何克服直接使用TCP的困难?

    AMQP如何克服直接使用TCP发送消息时的困难 或者更具体地说 在发布 订阅场景中 在 AMQP 中 有一个代理 该代理接收消息 然后完成将消息路由到交换器和队列的困难部分 您还可以设置持久队列 即使客户端断开连接 也可以为客户端保存消息
  • 基于多线程的 RabbitMQ 消费者

    我们有一个 Windows 服务 它监听单个 RabbitMQ 队列并处理消息 我们希望扩展相同的 Windows 服务 以便它可以监听 RabbitMQ 的多个队列并处理消息 不确定使用多线程是否可以实现这一点 因为每个线程都必须侦听 阻
  • RabbitMQ - 如何死信/处理过期队列中的消息?

    我有一个队列x expires放 我遇到的问题是我需要对队列中的消息进行进一步处理IF队列过期 我最初的想法是设置x dead letter exchange在队列中 但是 当队列过期时 消息就会消失而不会进入死信交换 如何处理死信或以其他
  • Rabbit mq - 等待 Mnesia 表时出错

    我已经在 Kubernetes 集群上使用 Helm Chart 安装了 RabbitMQ rabbitmq pod不断重新启动 在检查 pod 日志时 我收到以下错误 2020 02 26 04 42 31 582 warning lt

随机推荐

  • java里面获取map的key和value的方法

    获取map的key和value的方法分为两种形式 map keySet 先获取map的key 然后根据key获取对应的value map entrySet 同时查询map的key和value 只需要查询一次 两者的性能比较可以查看map k
  • python中的np.empty_python中numpy.empty()函数实例讲解

    在使用python编程的过程中 想要快速的创建ndarray数组 可以使用numpy empty 函数 numpy empty 函数所创建的数组内所有元素均为空 没有实际意义 所以它也是创建数组最快的方法 本文介绍python中numpy
  • 整理的apollo 入门课程

    转自 https blog csdn net weixin 36662031 article details 81081744 转载自 https mp csdn net postedit 81081744 自动驾驶系统主要包含三个部分 感
  • MATLAB:运行出现 “ 未定义函数或变量 ”

    出现的错误页面 第一次接触MATLAB 出现错误还以为函数写错了 检查之后发现不是 上网找了找方法 如果编写的文件没有放在当前运行目录下 也会出现这个错误 具体可点击参考 检查目录后 发现路径没有错 最后 发现 编写的M文件 命名需要注意
  • SpringBoot 配置 Redis 连接池

    前言 SpringBoot2 0默认采用 Lettuce 客户端来连接 Redis 服务 默认是不使用连接池的 只有配置 redis lettuce pool下的属性的时候才可以使用到redis连接池 版本说明 spring boot st
  • C++模板特化

    模板特化 在学习模板的时候我们用模板来解决了一个add模板函数 实现不同类型的传参相加 实践证明 模板函数比普通函数好用 那么现在如果我们要新增一个需求 就是如果传入的是两个string类型的参数 我们不要简单的拼接 我们要在两个字符串之间
  • LeetCode-18-四数之和

    18 四数之和 说明 给定一个包含 n 个整数的数组 nums 和一个目标值 target 判断 nums 中是否存在四个元素 a b c 和 d 使得 a b c d 的值与 target 相等 找出所有满足条件且不重复的四元组 注意 答
  • java代码实现百度网盘文件上传返回下载链接-已封装工具类!可用于maven,spring-boot,spring-boot-cloud等项目,以及思路全解!

    阿丹 查找晚上很多案例都出现各种问题所以专门出一篇文章 因为业务涉及到需要较大的内存空间 使用oss以及fastdfs来说一个对金钱需求太大 fastdfs对服务器的损耗太大 于是寻找第三方 百度网盘相对来说就不错 本文章集合百度官方文档
  • Mask Rcnn目标分割-项目搭建及跑通测试代码

    本文介绍了Mask Rcnn目标分割项目的搭建及运行过程 并对搭建过程中可能出现的问题进行了解答 环境 Cuda10 2 tensorflow gpu1 13 2 Mask R CNN是一个实例分割算法 可以用来做 目标检测 目标实例分割
  • ipad能不能写python_如何在ipad上写python

    ipad或者手机支持python代码编写 让我们再也不用一本正经待在办公室或者家里正襟危坐了 让我们编写代码的方式更加随意 当我们写一些轻量级的代码可以随时随地的进行 那么如何在ipad上写python代码呢 这里给大家推荐两款应用于不同平
  • 区块链技术的本质是分布式数据库

    当微服务撞上区块链 系列微课分为 1 区块链的业务价值是通过数据共享降低信任成本 2 区块链的本质是分布式数据库 本文 3 区块链与微服务是天生的一对 转载本文需注明出处 微信公众号EAWorld 违者必究 区块链技术是基于比特币应用提出的
  • anaconda使用系列教程--4)环境迁移

    概述 跨平台尽量避免 比如windows和linux就不要跨平台 就在linux之间跨还是可以的 直接copy整体环境文件 适合于无法联网或网速不佳的新环境 anaconda最好是同版本的 迁移方法 使用requirement文件 A机器
  • pytorch 目标检测数据处理(二)提取困难样本,低ap样本

    摘要 比赛当中数据处理有很多种 对图像数据的分析 和分析之后该如何加强比较低的ap类别 今天就讲解我最近使用的几种困难样本学习和专注低ap的数据增强后的处理 困难样本就是loss比较大的 在每一个批次训练当中都占有很大部分的loss 导致l
  • 蓝牙之五-bludroid协议栈和厂商代码的交互

    协议栈和厂商代码交互 完整的蓝牙调用图 协议栈所在的目录是 system bt 厂商代码所在的目录是hardware broadcom libbt 这两个不同的目录反应的是协议栈和厂商固件的交互流程 它们通过hci层进行交互 在bluez时
  • SCWS分词库自定义

    最近因为要进行搜索功能的实现 而实现搜索给用户一个更好的体验就需要对输入的内容进行分词 所以静下心来 好好看看分词的知识 并记录下来 还是很有必要的 今天主要做了写关于SCWS的分词的词库的一些了解学习 首先就是需要知道SCWS这个分词的词
  • XSSLabs Less1-10

    Xsslabs下载地址 https github com do0dl3 xss labs Less 1 常规插入语句 把test的值改成跨站脚本语句即可name Less 2 gt 闭合前面的语句
  • 代理IP和Socks5代理:跨界电商与全球爬虫的关键技术

    跨界电商在全球化市场中崭露头角 而代理IP和Socks5代理则成为实现全球市场洞察和数据采集的不可或缺的工具 本文将深入探讨这两种代理技术在跨界电商 爬虫技术和出海战略中的关键作用 引言 介绍跨界电商的崛起和全球市场的机遇与挑战 引出代理I
  • 说句“圣诞快乐”不容易!

    1 圣诞快乐 嘿 别以为我们这里都是基督教徒 2 哦 好吧 光明节快乐 犹太教节日 宽扎节快乐 非裔美国人的节日 3 那些节日到了吗 4 好吧 只能说节日快乐了 别说 快乐 抑郁的人伤不起
  • Python对接LDAP/AD的过程详解

    不同公司的 LDAP AD 服务配置各不相同 很难封装一个通用的方法 所以我们在对接 LDAP AD 的过程中 需要了解自己公司的 LDAP AD 服务配置是怎么样的 才能写出正确的对接代码 因此下面将拆解过程并提供相关的文档地址 首先需要
  • 从源码角度分析RabbitMQ重启后,消费者停止消费怎么解决

    前段时间的RabbitMQ broker服务端由于某个队列一直积压消息 运维在凌晨对mq服务端机器pod进行了扩容 重启了RabbitMQ 然后早上发现自己的服务在mq重启之后一直报异常 停止消费了 导致影响了业务的运行 虽然mq重启成功了