自定义线程池
0.概念与模型
- 主线程不断地生产任务
- 直接交付线程执行(当前线程池数量 < 核心数)
- 否则:加入阻塞任务队列,等到线程池中空闲的线程获取执行
- 否则:阻塞队列已满,开发接口(拒绝策略)
- 等待/超时等待队列不满时加入队列
- 放弃任务
- 抛出异常
- 调用者/生产者/主线程执行任务
- 阻塞队列是双向队列,一端加入任务,一端移除任务
- 保证线程安全
- 设置多条件
- 超时等待和唤醒
- 生产者阻塞条件:队列已满
- 消费者阻塞条件:队列已空
- 线程池
- 高效利用多核CPU,核心数 <= 线程数量
- 线程复用
- 有任务执行
- 没有任务超时获取阻塞队列中未执行的任务
- 空闲移除线程(保证线程安全)
1.定义任务队列
/**
* 自定义任务队列
*
* @author xzx
* @date 2022/10/18
*/
@Slf4j
public class BlockingQueue<T> {
//1.任务队列
private Deque<T> queue = new ArrayDeque<>();
//2.锁
private ReentrantLock lock = new ReentrantLock();
//3.生产者条件变量
private Condition fullWaitSet = lock.newCondition();
//4.消费者条件
private Condition emptyWaitSet = lock.newCondition();
//5.队列容量
private int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
/**
* 超时等待获取
* @param timeout
* @param unit
* @return
*/
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
//阻塞获取
while (queue.isEmpty()) {
try {
if (nanos <= 0) {
return null;
}
//阻塞原因:请求队列为空
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
log.info("从任务队列获取任务成功:{}",t);
//唤醒因队列已满阻塞的线程
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
/**
* 阻塞获取
* @return
*/
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
log.info("从任务队列获取任务成功:{}",t);
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
/**
* 存入
* @param task
*/
public void put(T task) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
log.info("等待加入任务队列: {} ...", task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("加入任务队列成功: {}", task);
queue.addLast(task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
/**
* 超时存入
* @param task
* @param timeout
* @param timeUnit
* @return
*/
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if (nanos <= 0) {
return false;
}
log.info("等待加入任务队列: {} ...", task);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("加入任务队列成功: {}", task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
/**
* 开发获取失败策略
* @param rejectPolicy 策略
* @param task 任务
*/
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capacity) {
log.info("任务队列已满... 执行策略:{}",rejectPolicy);
rejectPolicy.reject(this, task);
} else {
log.info("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
}
2.定义线程池
/**
* 自定义线程池
* @author xzx
* @date 2022/10/18
*/
@Slf4j
public class ThreadPool {
//任务队列
private BlockingQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers = new HashSet<>();
//核心数量
private int coreSize;
//超时时间
private long timeout;
//单位
private TimeUnit timeUnit;
//拒绝策略
private RejectPolicy<Runnable> rejectPolicy;
/**
* 执行任务
* 1.线程数量 < 核心数量 :新增线程,直接交付任务
* 2.线程数量 >= 核心数量 : 将任务加入队列,等待线程获取
* @param task
*/
public void execute(Runnable task) {
synchronized (workers) {
if (workers.size() < coreSize) {
/**
* 直接交付:新建线程执行任务
*/
Worker worker = new Worker(task);
log.info("直接交付--新增线程 worker:{}, 任务 task:{}", worker, task);
workers.add(worker);
worker.start();
} else {
/**
* 暂缓:将任务加入队列
*/
// taskQueue.put(task);
// 1) 死等
// 2) 带超时等待
// 3) 让调用者放弃任务执行
// 4) 让调用者抛出异常
// 5) 让调用者自己执行任务
log.info("暂缓交付--加入任务队列 任务 task:{}", task);
taskQueue.tryPut(rejectPolicy, task);
}
}
}
/**
*
* @param coreSize 核心数目
* @param timeout 超时时间
* @param timeUnit 单位
* @param queueCapacity 任务队列容量
* @param rejectPolicy 拒绝策略
*/
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity,
RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
/**
* 工作线程
*/
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
//有任务或者从阻塞队列中接取到任务
while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
//该线程执行完任务就移除
synchronized (workers) {
log.info("worker 被移除{}", this);
workers.remove(this);
}
}
}
}
3.定义拒绝策略接口
/**
* 自定义线程池
* @author xzx
* @date 2022/10/18
*/
@Slf4j
public class ThreadPool {
//任务队列
private BlockingQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers = new HashSet<>();
//核心数量
private int coreSize;
//超时时间
private long timeout;
//单位
private TimeUnit timeUnit;
//拒绝策略
private RejectPolicy<Runnable> rejectPolicy;
/**
* 执行任务
* 1.线程数量 < 核心数量 :新增线程,直接交付任务
* 2.线程数量 >= 核心数量 : 将任务加入队列,等待线程获取
* @param task
*/
public void execute(Runnable task) {
synchronized (workers) {
if (workers.size() < coreSize) {
/**
* 直接交付:新建线程执行任务
*/
Worker worker = new Worker(task);
log.info("直接交付--新增线程 worker:{}, 任务 task:{}", worker, task);
workers.add(worker);
worker.start();
} else {
/**
* 暂缓:将任务加入队列
*/
// taskQueue.put(task);
// 1) 死等
// 2) 带超时等待
// 3) 让调用者放弃任务执行
// 4) 让调用者抛出异常
// 5) 让调用者自己执行任务
log.info("暂缓交付--加入任务队列 任务 task:{}", task);
taskQueue.tryPut(rejectPolicy, task);
}
}
}
/**
*
* @param coreSize 核心数目
* @param timeout 超时时间
* @param timeUnit 单位
* @param queueCapacity 任务队列容量
* @param rejectPolicy 拒绝策略
*/
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity,
RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
/**
* 工作线程
*/
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
//有任务或者从阻塞队列中接取到任务
while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
//该线程执行完任务就移除
synchronized (workers) {
log.info("worker 被移除{}", this);
workers.remove(this);
}
}
}
}
4.测试
/**
* @author xzx
* @date 2022/10/18
*/
@Slf4j
public class Test {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(4,
1000, TimeUnit.MILLISECONDS, 2, (queue, task) -> {
// 1. 死等
// queue.put(task);
// 2) 带超时等待
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);
// 3) 让调用者放弃任务执行
// log.info("放弃{}", task);
// 4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败 " + task);
// 5) 让调用者自己执行任务
log.info("让调用者自己执行任务");
task.run();
});
for (int i = 0; i < 20; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("任务执行成功: {}", j);
});
}
}
}
控制台打印日志
15:11:41.887 [main] INFO com.example.juc.threadpool.ThreadPool - 直接交付--新增线程 worker:Thread[Thread-0,5,main], 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@66cd51c3
15:11:41.890 [main] INFO com.example.juc.threadpool.ThreadPool - 直接交付--新增线程 worker:Thread[Thread-1,5,main], 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@5fcfe4b2
15:11:41.890 [main] INFO com.example.juc.threadpool.ThreadPool - 直接交付--新增线程 worker:Thread[Thread-2,5,main], 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@5eb5c224
15:11:41.890 [main] INFO com.example.juc.threadpool.ThreadPool - 直接交付--新增线程 worker:Thread[Thread-3,5,main], 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@73a8dfcc
15:11:41.891 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@7e774085
15:11:41.891 [main] INFO com.example.juc.threadpool.BlockingQueue - 加入任务队列 com.example.juc.threadpool.Test$$Lambda$2/553264065@7e774085
15:11:41.891 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@3f8f9dd6
15:11:41.891 [main] INFO com.example.juc.threadpool.BlockingQueue - 加入任务队列 com.example.juc.threadpool.Test$$Lambda$2/553264065@3f8f9dd6
15:11:41.891 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@aec6354
15:11:41.891 [main] INFO com.example.juc.threadpool.BlockingQueue - 任务队列已满... 执行策略:com.example.juc.threadpool.Test$$Lambda$1/683287027@1c655221
15:11:41.891 [main] INFO com.example.juc.threadpool.Test - 让调用者自己执行任务
15:11:42.391 [Thread-1] INFO com.example.juc.threadpool.Test - 任务执行成功: 1
15:11:42.391 [Thread-0] INFO com.example.juc.threadpool.Test - 任务执行成功: 0
15:11:42.391 [Thread-2] INFO com.example.juc.threadpool.Test - 任务执行成功: 2
15:11:42.406 [main] INFO com.example.juc.threadpool.Test - 任务执行成功: 6
15:11:42.406 [Thread-3] INFO com.example.juc.threadpool.Test - 任务执行成功: 3
15:11:42.406 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@58d25a40
15:11:42.406 [Thread-1] INFO com.example.juc.threadpool.BlockingQueue - 从任务队列获取任务成功:com.example.juc.threadpool.Test$$Lambda$2/553264065@7e774085
15:11:42.406 [Thread-0] INFO com.example.juc.threadpool.BlockingQueue - 从任务队列获取任务成功:com.example.juc.threadpool.Test$$Lambda$2/553264065@3f8f9dd6
15:11:42.406 [main] INFO com.example.juc.threadpool.BlockingQueue - 加入任务队列 com.example.juc.threadpool.Test$$Lambda$2/553264065@58d25a40
15:11:42.406 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@1b701da1
15:11:42.406 [Thread-2] INFO com.example.juc.threadpool.BlockingQueue - 从任务队列获取任务成功:com.example.juc.threadpool.Test$$Lambda$2/553264065@58d25a40
15:11:42.406 [main] INFO com.example.juc.threadpool.BlockingQueue - 加入任务队列 com.example.juc.threadpool.Test$$Lambda$2/553264065@1b701da1
15:11:42.406 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@726f3b58
15:11:42.406 [Thread-3] INFO com.example.juc.threadpool.BlockingQueue - 从任务队列获取任务成功:com.example.juc.threadpool.Test$$Lambda$2/553264065@1b701da1
15:11:42.406 [main] INFO com.example.juc.threadpool.BlockingQueue - 加入任务队列 com.example.juc.threadpool.Test$$Lambda$2/553264065@726f3b58
15:11:42.406 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@442d9b6e
15:11:42.406 [main] INFO com.example.juc.threadpool.BlockingQueue - 加入任务队列 com.example.juc.threadpool.Test$$Lambda$2/553264065@442d9b6e
15:11:42.406 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@ee7d9f1
15:11:42.406 [main] INFO com.example.juc.threadpool.BlockingQueue - 任务队列已满... 执行策略:com.example.juc.threadpool.Test$$Lambda$1/683287027@1c655221
15:11:42.406 [main] INFO com.example.juc.threadpool.Test - 让调用者自己执行任务
15:11:42.920 [Thread-2] INFO com.example.juc.threadpool.Test - 任务执行成功: 7
15:11:42.920 [Thread-0] INFO com.example.juc.threadpool.Test - 任务执行成功: 5
15:11:42.920 [main] INFO com.example.juc.threadpool.Test - 任务执行成功: 11
15:11:42.920 [Thread-1] INFO com.example.juc.threadpool.Test - 任务执行成功: 4
15:11:42.920 [Thread-3] INFO com.example.juc.threadpool.Test - 任务执行成功: 8
15:11:42.920 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@15615099
15:11:42.920 [Thread-2] INFO com.example.juc.threadpool.BlockingQueue - 从任务队列获取任务成功:com.example.juc.threadpool.Test$$Lambda$2/553264065@726f3b58
15:11:42.920 [Thread-0] INFO com.example.juc.threadpool.BlockingQueue - 从任务队列获取任务成功:com.example.juc.threadpool.Test$$Lambda$2/553264065@442d9b6e
15:11:42.920 [main] INFO com.example.juc.threadpool.BlockingQueue - 加入任务队列 com.example.juc.threadpool.Test$$Lambda$2/553264065@15615099
15:11:42.920 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@1edf1c96
15:11:42.920 [Thread-1] INFO com.example.juc.threadpool.BlockingQueue - 从任务队列获取任务成功:com.example.juc.threadpool.Test$$Lambda$2/553264065@15615099
15:11:42.920 [main] INFO com.example.juc.threadpool.BlockingQueue - 加入任务队列 com.example.juc.threadpool.Test$$Lambda$2/553264065@1edf1c96
15:11:42.920 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@368102c8
15:11:42.920 [Thread-3] INFO com.example.juc.threadpool.BlockingQueue - 从任务队列获取任务成功:com.example.juc.threadpool.Test$$Lambda$2/553264065@1edf1c96
15:11:42.920 [main] INFO com.example.juc.threadpool.BlockingQueue - 加入任务队列 com.example.juc.threadpool.Test$$Lambda$2/553264065@368102c8
15:11:42.920 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@6996db8
15:11:42.920 [main] INFO com.example.juc.threadpool.BlockingQueue - 加入任务队列 com.example.juc.threadpool.Test$$Lambda$2/553264065@6996db8
15:11:42.920 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@1963006a
15:11:42.920 [main] INFO com.example.juc.threadpool.BlockingQueue - 任务队列已满... 执行策略:com.example.juc.threadpool.Test$$Lambda$1/683287027@1c655221
15:11:42.920 [main] INFO com.example.juc.threadpool.Test - 让调用者自己执行任务
15:11:43.434 [main] INFO com.example.juc.threadpool.Test - 任务执行成功: 16
15:11:43.434 [Thread-3] INFO com.example.juc.threadpool.Test - 任务执行成功: 13
15:11:43.434 [Thread-0] INFO com.example.juc.threadpool.Test - 任务执行成功: 10
15:11:43.434 [Thread-1] INFO com.example.juc.threadpool.Test - 任务执行成功: 12
15:11:43.434 [Thread-3] INFO com.example.juc.threadpool.BlockingQueue - 从任务队列获取任务成功:com.example.juc.threadpool.Test$$Lambda$2/553264065@368102c8
15:11:43.434 [Thread-2] INFO com.example.juc.threadpool.Test - 任务执行成功: 9
15:11:43.434 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@7fbe847c
15:11:43.434 [Thread-0] INFO com.example.juc.threadpool.BlockingQueue - 从任务队列获取任务成功:com.example.juc.threadpool.Test$$Lambda$2/553264065@6996db8
15:11:43.434 [main] INFO com.example.juc.threadpool.BlockingQueue - 加入任务队列 com.example.juc.threadpool.Test$$Lambda$2/553264065@7fbe847c
15:11:43.434 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@41975e01
15:11:43.434 [Thread-1] INFO com.example.juc.threadpool.BlockingQueue - 从任务队列获取任务成功:com.example.juc.threadpool.Test$$Lambda$2/553264065@7fbe847c
15:11:43.434 [main] INFO com.example.juc.threadpool.BlockingQueue - 加入任务队列 com.example.juc.threadpool.Test$$Lambda$2/553264065@41975e01
15:11:43.434 [main] INFO com.example.juc.threadpool.ThreadPool - 暂缓交付--加入任务队列 任务 task:com.example.juc.threadpool.Test$$Lambda$2/553264065@c2e1f26
15:11:43.434 [Thread-2] INFO com.example.juc.threadpool.BlockingQueue - 从任务队列获取任务成功:com.example.juc.threadpool.Test$$Lambda$2/553264065@41975e01
15:11:43.434 [main] INFO com.example.juc.threadpool.BlockingQueue - 加入任务队列 com.example.juc.threadpool.Test$$Lambda$2/553264065@c2e1f26
15:11:43.946 [Thread-1] INFO com.example.juc.threadpool.Test - 任务执行成功: 17
15:11:43.946 [Thread-0] INFO com.example.juc.threadpool.Test - 任务执行成功: 15
15:11:43.946 [Thread-3] INFO com.example.juc.threadpool.Test - 任务执行成功: 14
15:11:43.946 [Thread-2] INFO com.example.juc.threadpool.Test - 任务执行成功: 18
15:11:43.946 [Thread-1] INFO com.example.juc.threadpool.BlockingQueue - 从任务队列获取任务成功:com.example.juc.threadpool.Test$$Lambda$2/553264065@c2e1f26
15:11:44.458 [Thread-1] INFO com.example.juc.threadpool.Test - 任务执行成功: 19
15:11:44.953 [Thread-0] INFO com.example.juc.threadpool.ThreadPool - worker 被移除Thread[Thread-0,5,main]
15:11:44.953 [Thread-2] INFO com.example.juc.threadpool.ThreadPool - worker 被移除Thread[Thread-2,5,main]
15:11:44.953 [Thread-3] INFO com.example.juc.threadpool.ThreadPool - worker 被移除Thread[Thread-3,5,main]
15:11:45.463 [Thread-1] INFO com.example.juc.threadpool.ThreadPool - worker 被移除Thread[Thread-1,5,main]
Process finished with exit code 0