记一次kafka Consumer线程停止消费过程分析

2023-10-31

前言

kafka消息队列在项目开发中经常被使用,尤其是在大数据领域经常见到它的身影。spring集成了kafka方便我们使用,只要引入spring-kafka即可。

问题描述

有一天我们后台版本发布,虽然改动很大,但是大家还是自信满满,因为经过了好几轮测试验证都没有问题,但是很不幸,结果还是出现问题了,上线后发现kafka消费线程只拉取了一次就莫名其妙停止,重启后会重新拉,但是也就一次就停止(理论上消费者是不停的从服务端(broke)拉取(poll)消息的)最奇怪的是没有任何异常堆栈信息打出来,于是大家都陷入了沉思,尝试各种无效措施都无法解决,最后已只能回滚。

项目使用的是spring-kafka这个组件来和kafka服务端交互的,如果大家没有用它,那就可能不是同一个问题哈。

分析过程

spring-kafka版本是1.3.5版本,它依赖kafka-client版本是0.11.0.2

业务中使用@KafkaListener注解来启动消费者线程

@KafkaListener(topics = {"my.topic"}, groupId = "mygroup")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
    // do something
    ack.acknowledge();
}

有了这个注解,spring-kafka就会帮我们启动消费者线程,看以下处理类
ConcurrentMessageListenerContainer.java

@Override
	protected void doStart() {
		if (!isRunning()) {
			ContainerProperties containerProperties = getContainerProperties();
			TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
			if (topicPartitions != null
					&& this.concurrency > topicPartitions.length) {
				this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
						+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
						+ topicPartitions.length);
				this.concurrency = topicPartitions.length;
			}
			setRunning(true);

			for (int i = 0; i < this.concurrency; i++) {
				KafkaMessageListenerContainer<K, V> container;
				if (topicPartitions == null) {
					container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);
				}
				else {
					container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,
							partitionSubset(containerProperties, i));
				}
				String beanName = getBeanName();
				container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
				if (getApplicationEventPublisher() != null) {
					container.setApplicationEventPublisher(getApplicationEventPublisher());
				}
				container.setClientIdSuffix("-" + i);
				container.start();
				this.containers.add(container);
			}
		}
	}

按设置的并发度(concurrency默认1)开启创建消费者线程,
进入KafkaMessageListenerContainer.java

@Override
	protected void doStart() {
		if (isRunning()) {
			return;
		}
		// 省略掉部分代码
		
		this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
		setRunning(true);
		this.listenerConsumerFuture = containerProperties
				.getConsumerTaskExecutor()
				.submitListenable(this.listenerConsumer);
	}

仍然是doStart方法,可以看到最终是通过
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
创建一个Executor,并且提交了任务(其中ListenerConsumer是一个Runnable)
Executor默认是SimpleAsyncTaskExecutor,进入它的方法

@Override
	public ListenableFuture<?> submitListenable(Runnable task) {
		ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
		execute(future, TIMEOUT_INDEFINITE);
		return future;
	}

我们看到了ListenableFutureTask,是不是有点熟悉的味道,没错,它就是JDK中FutureTask的衍生类,到了这里我们也应该能猜到,它最终会调用FutureTask的run方法,最终会回到KafkaMessageListenerContainer的run方法,如下

@Override
		public void run() {
			this.consumerThread = Thread.currentThread();
			if (this.theListener instanceof ConsumerSeekAware) {
				((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
			}
			if (this.transactionManager != null) {
				ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
			}
			this.count = 0;
			this.last = System.currentTimeMillis();
			if (isRunning() && this.definedPartitions != null) {
				initPartitionsIfNeeded();
			}
			long lastReceive = System.currentTimeMillis();
			long lastAlertAt = lastReceive;
			while (isRunning()) {
				try {
					if (!this.autoCommit && !this.isRecordAck) {
						processCommits();
					}
					processSeeks();
					ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
					this.lastPoll = System.currentTimeMillis();

					if (records != null && this.logger.isDebugEnabled()) {
						this.logger.debug("Received: " + records.count() + " records");
					}
					if (records != null && records.count() > 0) {
						if (this.containerProperties.getIdleEventInterval() != null) {
							lastReceive = System.currentTimeMillis();
						}
						invokeListener(records);
					}
					else {
						if (this.containerProperties.getIdleEventInterval() != null) {
							long now = System.currentTimeMillis();
							if (now > lastReceive + this.containerProperties.getIdleEventInterval()
									&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
								publishIdleContainerEvent(now - lastReceive);
								lastAlertAt = now;
								if (this.theListener instanceof ConsumerSeekAware) {
									seekPartitions(getAssignedPartitions(), true);
								}
							}
						}
					}
				}
				catch (WakeupException e) {
					// Ignore, we're stopping
				}
				catch (NoOffsetForPartitionException nofpe) {
					this.fatalError = true;
					ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
					break;
				}
				catch (Exception e) {
					if (this.containerProperties.getGenericErrorHandler() != null) {
						this.containerProperties.getGenericErrorHandler().handle(e, null);
					}
					else {
						this.logger.error("Container exception", e);
					}
				}
			}

ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());

这句就是从broke服务端拉取消息的,它里面用了while循环来实现线程不退出拉取(poll) 消息,拉取到消息的话,就会触发invokeListener(records); 最终就调到我们开头使用@KafkaListener这个方法的地方来执行我们的业务代码,好了,按道理这里应该会不断拉取消息消费的,可为什么偏偏停止了呢?

我们接着分析,跟进去invokeListener

private void invokeListener(final ConsumerRecords<K, V> records) {
			if (this.isBatchListener) {
				invokeBatchListener(records);
			}
			else {
				invokeRecordListener(records);
			}
		}
private void invokeRecordListener(final ConsumerRecords<K, V> records) {
			if (this.transactionTemplate != null) {
				innvokeRecordListenerInTx(records);
			}
			else {
				doInvokeWithRecords(records);
			}
		}

接着doInvokeWithRecords

private void doInvokeWithRecords(final ConsumerRecords<K, V> records) throws Error {
			Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
			while (iterator.hasNext()) {
				final ConsumerRecord<K, V> record = iterator.next();
				if (this.logger.isTraceEnabled()) {
					this.logger.trace("Processing " + record);
				}
				doInvokeRecordListener(record, null);
			}
		}

上面这段是遍历前面拉取到的消息,一个个去调用处理它,接着往下

private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record,
				@SuppressWarnings("rawtypes") Producer producer) throws Error {
			try {
				if (this.acknowledgingMessageListener != null) {
					this.acknowledgingMessageListener.onMessage(record,
							this.isAnyManualAck
									? new ConsumerAcknowledgment(record)
									: null);
				}
				else {
					this.listener.onMessage(record);
				}
				ackCurrent(record, producer);
			}

this.acknowledgingMessageListener.onMessage然后最终会调用到以下部分:
InvocableHandlerMethod.java

public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
		Object[] args = getMethodArgumentValues(message, providedArgs);
		if (logger.isTraceEnabled()) {
			logger.trace("Invoking '" + ClassUtils.getQualifiedMethodName(getMethod(), getBeanType()) +
					"' with arguments " + Arrays.toString(args));
		}
		Object returnValue = doInvoke(args);
		if (logger.isTraceEnabled()) {
			logger.trace("Method [" + ClassUtils.getQualifiedMethodName(getMethod(), getBeanType()) +
					"] returned [" + returnValue + "]");
		}
		return returnValue;
	}

接着doInvoke方法

protected Object doInvoke(Object... args) throws Exception {
		ReflectionUtils.makeAccessible(getBridgedMethod());
		try {
			return getBridgedMethod().invoke(getBean(), args);
		}
		catch (IllegalArgumentException ex) {
			assertTargetBean(getBridgedMethod(), getBean(), args);
			String text = (ex.getMessage() != null ? ex.getMessage() : "Illegal argument");
			throw new IllegalStateException(getInvocationErrorMessage(text, args), ex);
		}
		catch (InvocationTargetException ex) {
			// Unwrap for HandlerExceptionResolvers ...
			Throwable targetException = ex.getTargetException();
			if (targetException instanceof RuntimeException) {
				throw (RuntimeException) targetException;
			}
			else if (targetException instanceof Error) {
				throw (Error) targetException;
			}
			else if (targetException instanceof Exception) {
				throw (Exception) targetException;
			}
			else {
				String text = getInvocationErrorMessage("Failed to invoke handler method", args);
				throw new IllegalStateException(text, targetException);
			}
		}
	}

这里看到了很多对异常处理的地方,也就是如果我们业务异常了,这里就能全部捕捉到,包括了Error这种非检查型异常。好了,我们再回到之前方法doInvokeRecordListener
在KafkaMessageListenerContainer类中

private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record,
				@SuppressWarnings("rawtypes") Producer producer) throws Error {
			try {
				if (this.acknowledgingMessageListener != null) {
					this.acknowledgingMessageListener.onMessage(record,
							this.isAnyManualAck
									? new ConsumerAcknowledgment(record)
									: null);
				}
				else {
					this.listener.onMessage(record);
				}
				ackCurrent(record, producer);
			}
			catch (RuntimeException e) {
				if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
					ackCurrent(record, producer);
				}
				if (this.errorHandler == null) {
					throw e;
				}
				try {
					this.errorHandler.handle(e, record);
					// 省略部分代码
				}
				catch (RuntimeException ee) {
				// 省略部分代码
			}
			return null;
		}

如果我们业务异常了this.errorHandler.handle会帮我们处理,errorHandler默认是LoggingErrorHandler类,它里面很简单,就是抛出堆栈信息。

但是注意到没有,它这里只是catch了RuntimeException这种类型的异常,对于其他异常,如Error这种它是不管的,也就是会往前面继续抛,好,我们回到最开始的run方法,就是while循环拉取消息那个地方,最终它会抛到这里去

catch (WakeupException e) {
					// Ignore, we're stopping
}catch (NoOffsetForPartitionException nofpe) {
	this.fatalError = true;
	ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
	break;
}
catch (Exception e) {
			if (this.containerProperties.getGenericErrorHandler() != null) {
						this.containerProperties.getGenericErrorHandler().handle(e, null);
					}
		else {
		this.logger.error("Container exception", e);
}
}

这里会再次catch,保证run方法不退出,线程保持住继续拉取,但是发现没有,如果抛出的是Error呢?是不是线程就退出了,好了,真相大白了,就是因为我们业务中有代码抛出了这种Error类型的异常(我们业务中确实是触发了Error),导致消费者线程退出了,也就是run方法结束,好,既然消费线程都退出了,还怎么拉取消息对吧,到此真相大白。

只是心跳线程还在,后续因为kafka会有检测消费者两次拉取间隔时长来判断消费者是否还活着,如果超过最大时长没有拉取(poll)就被踢掉,所以最后心跳线程也结束了,一切都结束了…

等等,这就完了吗?

还有个问题,线程是将Error往外面抛了,理论上JVM会帮我们打印出来堆栈,可是怎么没有看到异常堆栈信息呢? 为了弄清楚这个问题,又得回到上面提到的FutureTask这哥们,如果它run方法里面异常,不管你什么异常,如果往外抛就被捕捉到,并且最终将异常setException,也就是被吞掉了,熟悉JDK线程池的应该都知道,OK,分析到此可以收尾了。

最后补充下我为什么知道业务存在Error异常

异常没有抛出来,我是怎么知道业务存在Error异常的,因为重启应用就触发了doStop方法,会重新抛出来异常信息,这个跟JDK线程池将异常先吞掉,只有通过Future.get()才抛出来思想是类似的
org.springframework.kafka.listener.KafkaMessageListenerContainer#doStop

@Override
	protected void doStop(final Runnable callback) {
		if (isRunning()) {
			this.listenerConsumerFuture.addCallback(new ListenableFutureCallback<Object>() {

				@Override
				public void onFailure(Throwable e) {
					KafkaMessageListenerContainer.this.logger.error("Error while stopping the container: ", e);
					if (callback != null) {
						callback.run();
					}
				}
				// 省略
			});
			setRunning(false);
			this.listenerConsumer.consumer.wakeup();
		}
	}

总结

消费者线程停止消费罪魁祸首其实是我们在业务中抛了Error类型的异常导致线程退出,异常被吞掉所以看不到异常堆栈,所以我们在开发业务过程要警惕这种错误异常的抛出,即使是有也要在业务代码中catch它,以免造成这种情况发生。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

记一次kafka Consumer线程停止消费过程分析 的相关文章

  • 通过SOCKS代理连接Kafka

    我有一个在 AWS 上运行的 Kafka 集群 我想用标准连接到集群卡夫卡控制台消费者从我的应用程序服务器 应用程序服务器可以通过 SOCKS 代理访问互联网 无需身份验证 如何告诉 Kafka 客户端通过代理进行连接 我尝试了很多事情 包
  • 使用 GWT 读取非常大的本地 XML 文件

    我正在使用 GWT 构建我的第一个 Java 应用程序 它必须从一个非常大的 XML 文件中读取数据 当我尝试发送对文件中信息的请求时遇到问题 并且我不太确定它是否与文件的大小或我的语义有关 在我的程序中 我有以下内容 static fin
  • Spring RestTemplate 使用 cookie 遵循重定向

    最近我遇到了一个问题 我需要做一个GET请求远程服务 我假设使用一个简单的 servlet 并且 RestTemplate 返回Too many redirects 经过一番调查 似乎对指定远程服务发出的第一个请求实际上只是一个 302 重
  • Spring Boot自动装配存储库始终为空[重复]

    这个问题在这里已经有答案了 每次我进入我的服务类时 存储库似乎都没有自动连接 因为它不断抛出 NullPointerException 谁能帮我检查一下我缺少什么吗 这是我的代码 演示应用程序 java package com exampl
  • 通往楼梯顶部的可能路径

    这是一个非常经典的问题 我听说谷歌在他们的面试中使用过这个问题 问题 制定一个递归方法 打印从楼梯底部到楼梯顶部的所有可能的独特路径 有 n 个楼梯 您一次只能走 1 步或 2 步 示例输出 如果它是一个有 3 级楼梯的楼梯 1 1 1 2
  • Integer.parseInt("0x1F60A") 以 NumberformatException 结束

    我尝试从数据库中获取长字符串内的表情符号代码 格式如下 0x1F60A 所以我可以访问代码 但它将是String 起初 我尝试通过执行以下操作来转换变量tv setText beforeEmo getEmijoByUnicode int e
  • 当客户端关闭连接时,Spring StreamingResponseBody 请求线程未清理

    我在控制器中有一个端点 它返回一个StreamingResponseBody 用于向客户端发送文件 其代码大致如下 RestController RequestMapping value api public class Controlle
  • 是否可以通过编程方式查找 logback 日志文件?

    自动附加日志文件以支持电子邮件会很有用 我可以以编程方式设置路径 如以编程方式设置 Logback Appender 路径 https stackoverflow com questions 3803184 setting logback
  • 从 GitHub 上托管的 Spring Cloud Config Server 访问存储库的身份验证问题

    我在 GitHub 上的存储库中托管配置 如果我将回购公开 一切都好 但如果我将其设为私有 我将面临 org eclipse jgit errors TransportException https github com my user m
  • Java Swing For mac 中的 DJ Native Swing 浏览器

    我有一个用 Swing 制作的 Java 应用程序 并且使用了一个 DJ Native Swing 浏览器 当我尝试在 OS X 上使用它时 它抛出了一个NoClassDefFoundError尽管我添加了 swt jar 但始终如此 有人
  • 在另一个模块中使用自定义 gradle 插件模块

    我正在开发一个自定义插件 我希望能够在稍后阶段将其部署到存储库 因此我为其创建了一个独立的模块 在对其进行任何正式的 TDD 之前 我想手动进行某些探索性测试 因此 我创建了一个使用给定插件的演示模块 到目前为止 我发现执行此操作的唯一方法
  • GWT 2.3 开发模式 - 托管模式 JSP 编译似乎不使用 java 1.5 兼容性

    无法编译 JSP 类 生成的 servlet 错误 DefaultMessage 上次更新 0 日期 中 0 时间 HH mm ss z 语法 错误 注释仅在源级别为 1 5 时可用 在尝试以开发模式在 Web 浏览器中打开我的 gwt 模
  • 使用架构注册表对 avro 消息进行 Spring 云合约测试

    我正在查看 spring 文档和 spring github 我可以看到一些非常基本的内容examples https github com spring cloud samples spring cloud contract sample
  • Docker 和 Eureka 与 Spring Boot 无法注册客户端

    我有一个使用 Spring Boot Docker Compose Eureka 的非常简单的演示 我的服务器在端口 8671 上运行 具有以下应用程序属性 server port 8761 eureka instance prefer i
  • java库维护数据库结构

    我的应用程序一直在开发 所以偶尔 当版本升级时 需要创建 更改 删除一些表 修改一些数据等 通常需要执行一些sql代码 是否有一个 Java 库可用于使我的数据库结构保持最新 通过分析类似 db structure version 信息并执
  • 返回 Java 8 中的通用函数接口

    我想写一种函数工厂 它应该是一个函数 以不同的策略作为参数调用一次 它应该返回一个函数 该函数根据参数选择其中一种策略 该参数将由谓词实现 嗯 最好看看condition3为了更好的理解 问题是 它没有编译 我认为因为编译器无法弄清楚函数式
  • Java Swing:需要一个高质量的带有复选框的开发 JTree

    我一直在寻找一个 Tree 实现 其中包含复选框 其中 当您选择一个节点时 树中的所有后继节点都会被自动选择 当您取消选择一个节点时 树中其所有后继节点都会自动取消选择 当已经选择了父节点 并且从其后继之一中删除了选择时 节点颜色将发生变化
  • 在 Google App-Engine JAVA 中将文本转换为字符串,反之亦然

    如何从字符串转换为文本 java lang String to com google appengine api datastore Text 反之亦然 Check Javadoc http code google com appengin
  • Resteasy 可以查看 JAX-RS 方法的参数类型吗?

    我们使用 Resteasy 3 0 9 作为 JAX RS Web 服务 最近切换到 3 0 19 我们开始看到很多RESTEASY002142 Multiple resource methods match request警告 例如 我们
  • 在浏览器刷新中刷新检票面板

    我正在开发一个付费角色系统 一旦用户刷新浏览器 我就需要刷新该页面中可用的统计信息 统计信息应该从数据库中获取并显示 但现在它不能正常工作 因为在页面刷新中 java代码不会被调用 而是使用以前的数据加载缓存的页面 我尝试添加以下代码来修复

随机推荐

  • 期望、方差、协方差与相关系数

    1 利用切比雪夫不等式可以证明方差为0意味着随机变量的取值集中在一点上 2 从协方差可以得到两个变量增减的趋势 称为相关性 3 不相关 比 独立 更弱的概念 独立 必导致 不相关 不相关 不一定导致 独立 4 相关系数是相应标准化变量的协方
  • 使用Termux在安卓手机上运行tomcat服务器

    使用Termux在安卓手机上安装运行tomcat服务器 简单背景 探索尝试 尝试一 使用limbo虚拟机 失败 想念二 使用Linux Deploy安装 直接放弃 尝试三 使用Aid Learning 成功但搁置 尝试四 使用Termux直
  • Django之数据库并发处理

    1 数据库并发处理问题 在多个用户同时发起对同一个数据提交修改操作时 先查询 再修改 会出现资源竞争的问题 导致最终修改的数据结果出现异常 比如限量商品在热销时 当多个用户同时请求购买商品时 最终修改的数据就会出现异常 下面我们来写点代码还
  • getline()的使用详解

    一 getline int main string line while getline cin line cout lt
  • 图像去噪的OPenCV添加噪声和去噪

    添加噪声 添加高斯噪声 IplImage AddGuassianNoise IplImage src 添加高斯噪声 IplImage dst cvCreateImage cvGetSize src src gt depth src gt n
  • sass的日常使用

    sass跟css的简单对比 css 由于CSS的语法不够强大 没有变量和合理的样式复用机制 使得逻辑上相关的属性值必须以字面的形式反复出现 导致难以维护 而动态演示语言为CSS赋予了动态语言的特性 极大的提高了样式语言的可维护性 sass
  • github.com访问慢解决

    修改hosts HOSTS文件路径 C Windows System32 drivers etc hosts 1 打开Dns查询 站长工具 http tool chinaz com dns 2 搜索http github com 3 把TT
  • 使用Linux内核里的spi屏驱动-fbtft

    Linux内核里已经提供spi接口小屏的设备驱动 在内核的配置选项 make menuconfig ARCH arm CROSS COMPILE arm linux gnueabihf Device Drivers gt Graphics
  • 多线程练习之:生产电脑

    生产电脑 题目 设计一个生产电脑和搬运电脑类 要求生产出一台电脑就搬走一台电脑 如果没有新的电脑生产出来 则搬运工要等待新电脑产出 如果生产出的电脑没有搬走 则要等待电脑搬走之后再生产 并统计出生产的电脑数量 public class Co
  • python乘法出现小数位很多_js小数运算出现多位小数如何解决

    小数相乘出现很多位小数的问题 这个问题自己以前也遇到过 现在特意来总结一下 Number类型 Number类型是ECMAScript中最常用和最令人关注的类型了 这种类型使用IEEE754格式来表示整数和浮点数值 浮点数值在某些语言中也被成
  • 字节跳动前端面经

    面试经历 我入职字节大概一个多月 目前准大四 实习生 现在写面经不仅仅是牛客网 YYSD 真tm灵 的还愿 还是就是想给大家推荐推荐我们部门 技术中台 可能大家对技术中台不太了解 但是大家肯定听说过掘金吧 我们组主要负责ByteTech 字
  • 相机的信噪比

    在图像传感器的成像过程中 真实的信号是无法探测到的理想值 在成像过程中理想值被引入了一系列的不确定性 最终形成读出信号也即图像 此过程中的不确定性被统一称为噪声 而信号与噪声的比值被定义为信噪比 Signal to NoiseRatio S
  • hibernate: Duplicate class/entity; Could not parse mapping document from resource

    近日在学习Hibernate时 总是遇到以下异常 org hibernate InvalidMappingException Could not parse mapping document from resource kpy db Cus
  • ModelScope-Agent: Building Your Customizable Agent System with Open-source Large Language Models

    本文是LLM系列文章 针对 ModelScope Agent Building Your Customizable Agent System with Open source Large Language Models 的翻译 ModelS
  • 1072. 开学寄语(20)

    下图是上海某校的新学期开学寄语 天将降大任于斯人也 必先删其微博 卸其QQ 封其电脑 夺其手机 收其ipad 断其wifi 使其百无聊赖 然后 净面 理发 整衣 然后思过 读书 锻炼 明智 开悟 精进 而后必成大器也 本题要求你写个程序帮助
  • Webpack 基础配置介绍(二)

    今天继续分享webpack的有关内容 我还是接着从上篇文章的项目来给大家分享后续内容 如果还有小伙伴没有阅读之前的文章 请关注博主进行阅读 今日分享 1 webpack的规范配置 2 webpack config js基础配置 3 单页开发
  • JSP+ssm计算机毕业设计考研资源共享平台设计与实现399xv【源码、数据库、LW、部署】

    项目运行 项目含有源码 文档 程序 数据库 配套开发软件 软件安装教程 环境配置 Jdk1 8 Tomcat7 0 Mysql HBuilderX Webstorm也行 Eclispe IntelliJ IDEA Eclispe MyEcl
  • 环境变量路径中有空格该怎么办?

    本机环境变量的设置 java home C Program Files Java jdk1 5 0 09 bat中的命令格式 java home bin java 或者 C Program Files Java jdk1 5 0 09 bi
  • Python、Matplot的subplot实现一行3列的子图绘制,并添加背景色

    Python Matplot的subplot实现一行3列的子图绘制 并添加背景色 1 可能遇到的问题 2 示例 1 绘制2 2 俩行俩列 的子图 并设置背景色 2 绘制1 3 一行三列 的子图 并设置横轴纵轴值 3 绘制1 3 一行三列 的
  • 记一次kafka Consumer线程停止消费过程分析

    前言 kafka消息队列在项目开发中经常被使用 尤其是在大数据领域经常见到它的身影 spring集成了kafka方便我们使用 只要引入spring kafka即可 问题描述 有一天我们后台版本发布 虽然改动很大 但是大家还是自信满满 因为经