最近在阅读《阿里巴巴Java开发手册》的时候,书中有这么一段话:
线程池这块理解不是很深,今天就抽时间重新学习一遍。对于书中的问题分析完成后答案便一目了然。
创建线程池的一个方式:
ExecutorService e = Executors.newFixedThreadPool(5);
Executors 相当于一个工厂类,它应该是提供了一下几种类型的线程池:
1.newFixedThreadPool:创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
2.newWorkStealingPool:创建持有足够线程的线程池来支持给定的并行级别,并通过使用多个队列,减少竞争,它需要穿一个并行级别的参数,如果不传,则被设定为默认的CPU数量。
3.newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
4.newCachedThreadPool:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
5.newSingleThreadScheduledExecutor:创建一个单例线程池,定期或延时执行任务。
6.newScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。
以 FixedThreadPool 为例分析一下源码:
创建:
//Executors类中:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//注意阻塞队列为 new LinkedBlockingQueue<Runnable>();↑
//ThreadPoolExecutor 构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
可以看出以 Executors.newFixedThreadPool(int nThreads) 这种方式创建出的线程池,阻塞队列为 LinkedBlockingQueue,线程工厂为默认工厂,拒绝策略为默认拒绝策略,等待销毁时间为0s。
LinkedBlockingQueue构造方法:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
所以开篇的问题就迎刃而解,new LinkedBlockingQueue(),默认容量为 Integer.MAX_VALUE,也就是说,Executors.newFixedThreadPool() 这种方式创建出的线程池并没有指定阻塞队列的容量,这样是可以不断添加线程直到达到 Integer.MAX_VALUE,这就会造成内存问题。
创建好线程池后,接下来就是往里面添加任务了。而添加任务有两种方式:
1.execute();
2.submit();
submit() 可以获取该任务执行的Future。下文再说,我们先看 execute():
在这之前先看一下 ThreadPoolExecutor的属性:
//1.ctl就比较厉害了,记录了线程池的状态(高3位 & 运算) and 任务数目(低29位 & 运算)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//32-3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
//1是32位,左移29位得到:高3位为001 低29位为0的数;之后再减1得到:高三位为000,低29位全为1的数
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
//下面几个状态都是:左移29位,用就是为了
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// ~COUNT_MASK取反,高3位都为1,低29位为0
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
知道这些,再看源码就没什么难的了:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//1.获取线程池的当前状态
int c = ctl.get();
//2.如果工作的线程小于核心线程数,执行addWorker()创建新的线程执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();//addWorker() 是个耗时操作需要重新获取线程池状态
}
//3.如果线程池是 Running 状态并且成功加入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();//重新获取 value
//如果此时发现线程池已经不是 Running并且从 workQueue中成功移除此线程,执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) { //自旋
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {//再一个自旋
//通过 core 判断:是否工作线程 >= corePoolSize 或 >= maximumPoolSize
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
//1.尝试 cas 增加
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
搞完这个源码,我可算是理解多态了.....
惯例沙雕图: