对于这三者首先我们看下源码,之后在分别写几个demo讲解下用法:
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
public class FutureTask<V> implements RunnableFuture<V> {}
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
V get() throws InterruptedException, ExecutionException;
}
关系一目了然,我们首先使用下FutureTask ,下面简单写一个demo
FutureTask<Boolean> futureTask = new FutureTask<>(() -> false);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Boolean> future = (Future<Boolean>) executor.submit(futureTask);
try {
System.out.println("future:"+future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
try {
boolean rs = futureTask.get(1, TimeUnit.MINUTES);
System.out.println(rs);
} catch (InterruptedException e) {
System.out.println("执行被中断");
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("执行过程出错");
e.printStackTrace();
} catch (TimeoutException e) {
System.out.println("时间超时");
e.printStackTrace();
}finally {
executor.shutdown();
}
}
输出结果:
future:null
false
为什么 Future<Boolean> future = (Future<Boolean>) executor.submit(futureTask); 拿不到执行结果呢, 我们知道 executor.submit()如果执行的是runable接口方法返回值是null,下面改写下代码:
FutureTask<Boolean> futureTask = new FutureTask<>(() -> false);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Boolean> future = executor.submit(() -> true);
executor.execute(futureTask);
executor.submit(futureTask);
try {
System.out.println("future:"+future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
try {
boolean rs = futureTask.get(3, TimeUnit.SECONDS);
System.out.println(rs);
} catch (InterruptedException e) {
System.out.println("执行被中断");
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("执行过程出错");
e.printStackTrace();
} catch (TimeoutException e) {
System.out.println("时间超时");
e.printStackTrace();
}finally {
executor.shutdown();
}
}
输出结果:
future:true
false
结合FutureTask源码,我们通过调式方法开逐步探讨执行步骤:
1:执行executor.submit(new runTask(run), run) 首选会执行Executors类的submit方法
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
2:在Executors类的内部静态类DelegatedExecutorService中,存在ExecutorService类,并在构造函数中初始化了 DelegatedExecutorService
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
}
3:然后调用AbstractExecutorService类的 submit方法:
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
4:在submit方法中调用了newTaskFor内部方法,newTaskFor方法内创建了FutureTask对象
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
FutureTask类:
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
在构造方法里,有调用了Executors类的callable方法
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
再回到AbstractExecutorService类的submit这个方法:
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
在这里调用了execute方法,这个方法在ThreadPoolExecutor 中有实现:
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
}
判断过程如下:
1:如果command为空不存在,直接抛出异常
2:如果正在运行的线程少于 corePoolSize,请尝试使用给定命令作为其第一个任务启动一个新线程。
对 addWorker 的调用以原子方式检查 runState 和 workerCount,从而通过返回 false 来防止在不应该添加线程时出现误报。
3:如果任务可以成功排队,那么我们仍然需要仔细检查是否应该添加一个线程(因为自上次检查以来现有线程已死亡)或进入此方法后关闭。
因此,我们重新检查状态,并在必要时在停止时回滚入队,如果没有则启动一个新线程。
4:如果我们无法排队任务,那么我们尝试添加一个新线程。 如果它失败了,我们知道我们已经关闭或饱和,因此拒绝该任务。
上面就是调用execute 或submit方法执行线程的过程。