我在用ExecutorService
为了方便并发多线程程序。采取以下代码:
while(xxx) {
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
...
Future<..> ... = exService.submit(..);
...
}
就我而言,问题是submit()
如果全部都没有阻塞NUMBER_THREADS
被占用。结果是任务队列被许多任务淹没。这样做的结果是,关闭执行服务ExecutorService.shutdown()
需要很长时间(ExecutorService.isTerminated()
长时间内将是错误的)。原因是任务队列仍然很满。
现在我的解决方法是使用信号量来禁止任务队列中有太多条目ExecutorService
:
...
Semaphore semaphore=new Semaphore(NUMBER_THREADS);
while(xxx) {
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
...
semaphore.aquire();
// internally the task calls a finish callback, which invokes semaphore.release()
// -> now another task is added to queue
Future<..> ... = exService.submit(..);
...
}
我确信有更好的封装解决方案吗?
诀窍是使用固定的队列大小并且:
new ThreadPoolExecutor.CallerRunsPolicy()
我还推荐使用番石榴监听执行服务 https://github.com/google/guava/wiki/ListenableFutureExplained。
这是消费者/生产者队列的示例。
private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
如果有更好的,您可能需要考虑像 RabbitMQ 或 ActiveMQ 这样的 MQ,因为它们具有 QoS 技术。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)