JUC并发编程学习
1. 什么是JUC
1.1 JUC简介
- JUC就是java.util.concurrent工具包的简称,是一个处理线程的工具包,JDK1.5开始出现的。
1.2 进程与线程
-
进程(Process) 是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体,是计算机中程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单温,是操作系统结构的基础。
-
线程(thread) 是操作系统能够运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每个线程并行执行不同的任务。
总的来说:
- 进程:指在系统中正在运行的一个应用程序;程序一旦运行就是进程;进程是资源分配的最小单位
- 线程:系统分配处理器时间资源的基本单位,或者说进程之内独立执行的一个单元执行流,线程是程序执行的最小单位
1.3 线程的状态
1.3.1 线程状态Thread.State(枚举类)
public enum State {
// 新建
NEW,
// 准备就绪
RUNNABLE,
// 阻塞
BLOCKED,
// 不见不散
WAITING,
// 过时不候
TIMED_WAITING,
// 终结
TERMINATED;
}
1.3.2 wait/sleep 区别
- sleep是Thread的静态方法,wait是Object的方法,任何对象实例化都能调用。
- sleep不会释放锁,它也不需要占用锁。wait会释放锁,但调用它的前提是当前线程占有锁(即代码要在synchronized中)
- 它们都可以被interrupted方法中断
1.4 并发与并行
1.4.1 串行模式
- 串行表示所有任务都是一一按照先后顺序进行。串行是一次只能取一个任务,并执行这个任务。
1.4.2 并行模式
- 并行意味着可以同时取得多个任务,并同时去执行所取得的这些任务。并行模式相当于将长长的一条队列,划分成多个短队列,所以并行缩短了任务队列的长度,并行的效率从代码层次上依赖于多进程、多线程代码,从硬件角度上依赖于多核CPU。
1.4.3 并发
- 并发(concurrent)指的是多个程序可以同时运行的现象,更细化的是多个进程可以同时运行或多指令可以同时运行。
1.5 管程(Monitor)
- 监视器(锁),是一种同步机制,保证同一个时间,只有一个线程访问被保护数据或代码
- JVM同步基于进入和退出,使用管程对象实现的
1.6 用户线程&守护线程
- 用户线程:自定义线程,主线程结束了,用户线程还在运行,JVM存活
- 守护线程:比如垃圾回收,没有用户线程了,都是守护线程,JVM结束
2. Lock 接口
2.1 Synchronized
2.1.1 Synchronized关键字回顾
synchronoized 是Java中的关键字,是一种同步锁。它修饰的对象有一下几种:
- 修饰一个代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号{}括起来的代码,作用的对象是调用这个代码的对象。
- 修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象。
- 虽然可以使用synchronized来定义方法,但synchronized并不属于方法定义的一部分。因此,synchronized关键字不能被继承。如果在父类中的某个方法使用了synchronized关键字,而在子类中覆盖了这个方法,在子类中的这个方法默认情况下并不是同步的,而必须显示在子类这个方法加上synchronized关键字才可以。当然还可以在子类方法中调用父类中相应的方法,这个虽然子类中的方法不是同步的,但子类调用了父类的同步方法,因此,子类的方法也就相当于同步了。
- 修饰一个静态方法,其作用的范围是整个静态方法,作用的对象是这个类的所有对象
- 修饰一个类,其作用范围是synchronized后面括号括起来的部分,作用主要的对象是这个类的所有对象
2.1.2 售票案例(3个售票员,卖30张票)
package study.sync;
/**
* @Author: xujinshan361@163.com
* 1.创建资源类,定义属性和操作方法
*/
class Ticket{
// 票数
private int number =30;
// 操作方法:卖票
public synchronized void sale(){
// 判断:是否有票
if (number>0) {
// 加入延迟模拟效果
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":卖出:"+(number--)+"剩下:"+number);
}
}
}
public class SaleTicket {
// 2. 创建多个线程,调用资源类的操作方法
public static void main(String[] args) {
// 创建Ticket对象
Ticket ticket = new Ticket();
// 创建三个线程 -1
new Thread(new Runnable() {
@Override
public void run() {
// 调用买票方法
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}
},"AA").start();
// 线程2
new Thread(()->{
// 调用买票方法
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"BB").start();
// 线程3
new Thread(()->{
// 调用买票方法
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"BB").start();
}
}
2.2 什么是Lock
Lock锁提供了比使用同步方法和语句更广泛的锁操作。它允许许多更灵活的结果,可具有非常多不同的属性,并且可支持多个关联条件的对象。Lock提供了比synchronized更多的功能。
Lock与Synchronized的区别
- Lock不是Java语言内置的,synchronized是Java语言的关键字,是内置的。Lock是一个类,通过这个类实现同步访问
- Lock和synchronized最大的区别,采用synchronized不需要用户去手动释放锁,当synchronized方法或synchronized代码块执行完之后,系统会自动让线程释放对锁的占用;而Lock则需要用户去手动释放锁,如果没有主动释放锁,就可能导致死锁的现象
2.2.1 Lock 接口
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
下面逐个讲述Lock接口中的每个方法使用
2.2.2 lock
- lock()方法是平常使用最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待
- 采用 Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用 Lock 必须在 try{}catch{}块中进行,并且将释放锁的操作放在finally 块中进行,以保证锁一定被被释放,防止死锁的发生。通常使用 Lock来进行同步的话,是以下面这种形式去使用的:
Lock lock = ...;
lock.lock();
try{
//处理任务
}catch(Exception ex){
}finally{
lock.unlock(); //释放锁
}
2.2.3 newCondition
- 关键字 synchronized 与 wait()/notify()这两个方法一起使用可以实现等待/通知模式, Lock 锁的 newContition()方法返回 Condition 对象,Condition 类也可以实现等待/通知模式
- 用 notify()通知时,JVM 会随机唤醒某个等待的线程, 使用 Condition 类可以进行选择性通知, Condition 比较常用的两个方法:
- await()会使当前线程等待,同时会释放锁,当其他线程调用 signal()时,线程会重新获得锁并继续执行
- signal()用于唤醒一个等待的线程
注意:
- 在调用 Condition 的 await()/signal()方法前,也需要线程持有相关的 Lock 锁,调用 await()后线程会释放这个锁,在 singal()调用后会从当前Condition 对象的等待队列中,唤醒 一个线程,唤醒的线程尝试获得锁, 一旦获得锁成功就继续执行
2.3 RentrantLock
ReentrantLock 是唯一实现了 Lock 接口的类,并且 ReentrantLock 提供了更多的方法。下面通过一些实例看具体看一下如何使用
public class Test {
private ArrayList<Integer> arrayList = new ArrayList<Integer>();
public static void main(String[] args) {
final Test test = new Test();
new Thread() {
public void run() {
test.insert(Thread.currentThread());
}
;
}.start();
new Thread() {
public void run() {
test.insert(Thread.currentThread());
}
;
}.start();
}
public void insert(Thread thread) {
Lock lock = new ReentrantLock(); //注意这个地方
lock.lock();
try {
System.out.println(thread.getName() + "得到了锁");
for (int i = 0; i < 5; i++) {
arrayList.add(i);
}
} catch (Exception e) {
} finally {
System.out.println(thread.getName() + "释放了锁");
lock.unlock();
}
}
}
使用ReentrantLock实现卖票问题
// 第一步 创建资源类,定义属性和操作方法
class LTicket{
// 票数
private int number =30;
// 创建可重入锁
private final ReentrantLock lock = new ReentrantLock();
// 卖票方法
public void sale(){
// 上锁
lock.lock();
try {
// 判断是否有票可卖
if (number>0) {
System.out.println(Thread.currentThread().getName()+":卖出"+(number--)+"剩余:"+number);
}
}finally {
// 解锁
lock.unlock();
}
}
}
public class LSaleTicket {
public static void main(String[] args) {
// 第二步,创建多个线程,调用资源类的操作方法
LTicket ticket = new LTicket();
// 创建三个线程
new Thread(()->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"AA").start();
new Thread(()->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"BB").start();
new Thread(()->{
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"CC").start();
}
}
2.4 ReadWriteLock
- ReadWriteLock 也是一个接口,里面只定义了两个方法
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}
- 一个用来获取读锁,一个用来获取写锁。也就是说将文件的读写操作分开, 分成两个锁来分配给线程,从而使得线程可以同时进行读操作。下面的ReentrantReadWriteLock实现了ReadWriteLock接口
- ReentrantReadWriteLock里面提供了很多丰富的方法,不过最主要的有两个放个readLock()和writeLock用来获取读锁和写锁
- 下面通过几个例子来看下ReetrantReadWriteLock具体用法
- 加入有多个线程要同时进行读操作的话, 先看下synchronized达到的效果:
public class Test {
private ReentrantReadWriteLock rwl = new
ReentrantReadWriteLock();
public static void main(String[] args) {
final Test test = new Test();
new Thread() {
public void run() {
test.get(Thread.currentThread());
};
}.start();
new Thread() {
public void run() {
test.get(Thread.currentThread());
};
}.start();
}
public synchronized void get(Thread thread) {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName() + "正在进行读操作");
}
System.out.println(thread.getName() + "读操作完毕");
}
}
改成读写锁
public class Test {
private ReentrantReadWriteLock rwl = new
ReentrantReadWriteLock();
public static void main(String[] args) {
final Test test = new Test();
new Thread() {
public void run() {
test.get(Thread.currentThread());
}
;
}.start();
new Thread() {
public void run() {
test.get(Thread.currentThread());
}
;
}.start();
}
public void get(Thread thread) {
rwl.readLock().lock();
try {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName() + "正在进行读操作");
}
System.out.println(thread.getName() + "读操作完毕");
} finally {
rwl.readLock().unlock();
}
}
}
-说明:thread1和thread2 在同时进行读操作,这样就大大提升了读操作效率
注意:
- 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁
- 如果有一个线程已经占用了写锁,则其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁
2.5 小结:
Lock 和 synchronized 有以下几点不同:
- Lock 是一个接口,而 synchronized 是 Java 中的关键字,synchronized 是内置的语言实现
- synchronized 在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而 Lock 在发生异常时,如果没有主动通过 unLock()去释放锁,则很可能造成死锁现象,因此使用 Lock 时需要在 finally 块中释放锁
- Lock 可以让等待锁的线程响应中断,而 synchronized 却不行,使用synchronized 时,等待的线程会一直等待下去,不能够响应中断
- 通过 Lock 可以知道有没有成功获取锁,而 synchronized 却无法办到
- Lock 可以提高多个线程进行读操作的效率。
在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时 Lock 的性能要远远优于synchronized。
3 线程间通信
线程间通信的模型由两种:共享内存和消息传递,以下方式都是基于这两种模式来实现的
场景–两个线程,一个线程对当前数加一,另一个线程对当前数减一,要求用线程间通信
3.1 synchronized方案
//第一步 创建资源类,定义属性和操作方法
class Share {
//初始值
private int number = 0;
//+1的方法
public synchronized void incr() throws InterruptedException {
//第二步 判断 干活 通知
while(number != 0) { //判断number值是否是0,如果不是0,等待
this.wait(); //在哪里睡,就在哪里醒
}
//如果number值是0,就+1操作
number++;
System.out.println(Thread.currentThread().getName()+" :: "+number);
//通知其他线程
this.notifyAll();
}
//-1的方法
public synchronized void decr() throws InterruptedException {
//判断
while(number != 1) {
this.wait();
}
//干活
number--;
System.out.println(Thread.currentThread().getName()+" :: "+number);
//通知其他线程
this.notifyAll();
}
}
public class ThreadDemo1 {
//第三步 创建多个线程,调用资源类的操作方法
public static void main(String[] args) {
Share share = new Share();
//创建线程
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
share.incr(); //+1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"AA").start();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
share.decr(); //-1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"BB").start();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
share.incr(); //+1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"CC").start();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
share.decr(); //-1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"DD").start();
}
}
虚假唤醒问题
wait()方法API解释
3.2 Lock 方案
// 第一步,创建资源类,定义属性和操作方法
class Share {
private int number = 0;
// 创建Lock
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
// +1
public void incr() throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断
while (number != 0) {
condition.await();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "::" + number);
// 通知
condition.signalAll();
} finally {
// 解锁
lock.unlock();
}
}
// -1
public void decr() throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断
while (number != 1) {
condition.await();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "::" + number);
// 通知
condition.signalAll();
} finally {
// 解锁
lock.unlock();
}
}
}
public class ThreadDemo2 {
public static void main(String[] args) {
Share share = new Share();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"AA").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"BB").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"CC").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"DD").start();
}
}
3.4 线程间定制化通信
问题:AA线程打印5次AA,BB线程打印10次BB,CC线程打印15次CC,此顺序循环10次
实现流程
// 第一步,创建资源类
class ShareResource {
// 定义标志位
private int flag = 1; // 1:AA 2:BB 3:CC
// 创建Lock锁
Lock lock = new ReentrantLock();
// 创建三个condition
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();
// 打印5次,参数第几轮
public void print5(int loop) throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断
while (flag != 1) {
// 等待
c1.await();
}
// 干活
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i + "::轮数:" + loop);
}
// 通知
flag = 2; // 修改标志位
c2.signal(); // 通知BB线程
} finally {
// 释放锁
lock.unlock();
}
}
// 打印10次,参数第几轮
public void print10(int loop) throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断
while (flag != 2) {
// 等待
c2.await();
}
// 干活
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i + "::轮数:" + loop);
}
// 通知
flag = 3; // 修改标志位
c3.signal(); // 通知CC线程
} finally {
// 释放锁
lock.unlock();
}
}
// 打印15次,参数第几轮
public void print15(int loop) throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断
while (flag != 3) {
// 等待
c3.await();
}
// 干活
for (int i = 0; i < 15; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i + "::轮数:" + loop);
}
// 通知
flag = 1; // 修改标志位
c1.signal(); // 通知CC线程
} finally {
// 释放锁
lock.unlock();
}
}
}
public class ThreadDemo3 {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
shareResource.print5(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
shareResource.print10(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "BB").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
shareResource.print15(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "CC").start();
}
}
4 集合的线程安全
4.1 集合的线程不安全演示
public class ThreadDemo4 {
public static void main(String[] args) {
// 创建ArrayList
List<String> list = new ArrayList<>();
for (int i = 0; i < 30; i++) {
new Thread(()->{
// 向集合中添加类容
list.add(UUID.randomUUID().toString().substring(0,8));
// 从集合中获取类容
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
- 多运行几次就会出现
出现异常原因分析
查看ArrayList 的add方法源码
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
modCount++;
add(e, elementData, size);
return true;
}
4.2 解决方案-Vector
- Vector 是矢量队列,是JDK1.0 版本添加的类。继承了AbstractList,实现了List,RandomAccess,Cloneable这些接口。Vector继承了AbstractList,实现了List;所以,它是一个队列,支持相关的添加,删除,修改,遍历等功能。Vector实现了RandomAccess接口,提供了随机访问功能。RandomAccess是Java中用来被List实现,为List提供快速访问功能的。在Vector 中可以通过元素的需要快速获取元素对象;这就是快速随机访问。Vector 实现了Cloneable接口,即实现了clone()函数。
和ArrayList不同,Vector中操作都是线程安全的
public class ThreadDemo4 {
public static void main(String[] args) {
// 创建ArrayList
// List<String> list = new ArrayList<>();
// 创建Vector- 线程安全
List<String> list = new Vector<>();
for (int i = 0; i < 30; i++) {
new Thread(()->{
// 向集合中添加类容
list.add(UUID.randomUUID().toString().substring(0,8));
// 从集合中获取类容
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
没有运行时出现异常,查看Vector的add方法
/**
* Appends the specified element to the end of this Vector.
*
* @param e element to be appended to this Vector
* @return {@code true} (as specified by {@link Collection#add})
* @since 1.2
*/
public synchronized boolean add(E e) {
modCount++;
add(e, elementData, elementCount);
return true;
}
add方法被synchronized 同步修饰,线程安全,因此没有并发异常
4.3 Collections
Collections提供了方法synchronizedList保证list是同步线程安全的
public class ThreadDemo4 {
public static void main(String[] args) {
// 创建ArrayList
// List<String> list = new ArrayList<>();
// 创建Vector- 线程安全
// List<String> list = new Vector<>();
// Collections工具类
List<String> list = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < 30; i++) {
new Thread(()->{
// 向集合中添加类容
list.add(UUID.randomUUID().toString().substring(0,8));
// 从集合中获取类容
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
没有异常,查看方法源码
/**
* Returns a synchronized (thread-safe) list backed by the specified
* list. In order to guarantee serial access, it is critical that
* <strong>all</strong> access to the backing list is accomplished
* through the returned list.<p>
*
* It is imperative that the user manually synchronize on the returned
* list when traversing it via {@link Iterator}, {@link Spliterator}
* or {@link Stream}:
* <pre>
* List list = Collections.synchronizedList(new ArrayList());
* ...
* synchronized (list) {
* Iterator i = list.iterator(); // Must be in synchronized block
* while (i.hasNext())
* foo(i.next());
* }
* </pre>
* Failure to follow this advice may result in non-deterministic behavior.
*
* <p>The returned list will be serializable if the specified list is
* serializable.
*
* @param <T> the class of the objects in the list
* @param list the list to be "wrapped" in a synchronized list.
* @return a synchronized view of the specified list.
*/
public static <T> List<T> synchronizedList(List<T> list) {
return (list instanceof RandomAccess ?
new SynchronizedRandomAccessList<>(list) :
new SynchronizedList<>(list));
}
4.4 CopyOnWriteArrayList
相当于线程安全的ArrayList。和ArrayList一样,是个可变数组;但是和ArrayList不同的是,具有以下特性:
- 最适合具有以下特征的应用程序:List大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突
- 是线程安全的
- 因为通常需要复制整个基础数组,所以可变操作(add()、set()和remove()等)的开销很大
- 迭代器支持 hasNext(), next()等不可变操作,但不支持可变 remove()等操作
- 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照
- 独占锁效率低:采用读写分离思想解决
- 写线程获取到锁,其他写线程阻塞
- 复制思想:
当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。
这时候会抛出来一个新的问题,也就是数据不一致的问题。如果写线程还没来得及写会内存,其他的线程就会读到了脏数据
public class ThreadDemo4 {
public static void main(String[] args) {
// 创建ArrayList
// List<String> list = new ArrayList<>();
// 创建Vector- 线程安全
// List<String> list = new Vector<>();
// Collections工具类
// List<String> list = Collections.synchronizedList(new ArrayList<>());
// CopyOnWriteArrayList
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 30; i++) {
new Thread(()->{
// 向集合中添加类容
list.add(UUID.randomUUID().toString().substring(0,8));
// 从集合中获取类容
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
下面从“动态数组”和“线程安全”两个方面进一步对CopyOnWriteArrayList 的原理进行说明。
“动态数组”机制
- 它内部有个“volatile 数组”(array)来保持数据。在“添加/修改/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给“volatile 数组”, 这就是它叫做 CopyOnWriteArrayList 的原因
- 由于它在“添加/修改/删除”数据时,都会新建数组,所以涉及到修改数据的操作,CopyOnWriteArrayList 效率很低;但是单单只是进行遍历查找的话,效率比较高。
“线程安全”机制
- 通过 volatile 和互斥锁来实现的。
- 通过“volatile 数组”来保存数据的。一个线程读取 volatile 数组时,总能看到其它线程对该 volatile 变量最后的写入;就这样,通过 volatile 提供了“读取到的数据总是最新的”这个机制的保证。
- 通过互斥锁来保护数据。在“添加/修改/删除”数据时,会先“获取互斥锁”,再修改完毕之后,先将数据更新到“volatile 数组”中,然后再“释放互斥锁”,就达到了保护数据的目的。
add方法源码
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
synchronized (lock) {
Object[] es = getArray();
int len = es.length;
es = Arrays.copyOf(es, len + 1);
es[len] = e;
setArray(es);
return true;
}
}
5 多线程锁
5.1 锁的八个问题演示
class Phone {
public static synchronized void sendSMS() throws Exception {
//停留 4 秒
TimeUnit.SECONDS.sleep(4);
System.out.println("------sendSMS");
}
public synchronized void sendEmail() throws Exception {
System.out.println("------sendEmail");
}
public void getHello() {
System.out.println("------getHello");
}
}
1 标准访问,先打印短信还是邮件
------sendSMS
------sendEmail
2 停 4 秒在短信方法内,先打印短信还是邮件
------sendSMS
------sendEmail
3 新增普通的 hello 方法,是先打短信还是 hello
------getHello
------sendSMS
4 现在有两部手机,先打印短信还是邮件
------sendEmail
------sendSMS
5 两个静态同步方法,1 部手机,先打印短信还是邮件
------sendSMS
------sendEmail
6 两个静态同步方法,2 部手机,先打印短信还是邮件
------sendSMS
------sendEmail
7 1 个静态同步方法,1 个普通同步方法,1 部手机,先打印短信还是邮件
------sendEmail
------sendSMS
8 1 个静态同步方法,1 个普通同步方法,2 部手机,先打印短信还是邮件
------sendEmail
------sendSMS
结论:
- 一个对象里面如果有多个 synchronized 方法,某一个时刻内,只要一个线程去调用其中的一个 synchronized 方法了,其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些
- synchronized 方法锁的是当前对象 this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized 方法
- 加个普通方法后发现和同步锁无关换成两个对象后,不是同一把锁了,情况立刻变化。
- synchronized 实现同步的基础:Java 中的每一个对象都可以作为锁
具体表现为以下 3 种形式。
- 对于普通同步方法,锁是当前实例对象。
- 对于静态同步方法,锁是当前类的 Class 对象。
- 对于同步方法块,锁是 Synchonized 括号里配置的对象
- 当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁,可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁,所以毋须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。
- 所有的静态同步方法用的也是同一把锁——类对象本身,这两把锁是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。
- 但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同步方法之间,只要它们同一个类的实例对象!
5.2 公平锁和非公平锁
非公平锁:线程饿死,效率高
公平锁:效率相对低
5.3 可重入锁
synchronized 和lock都是可重入锁
synchronized 关键字
// 可重入锁
public class SyncLockDemo {
// 栈溢出,递归调用,可重入锁
public synchronized void add(){
add();
}
public static void main(String[] args) {
// synchronized
Object o = new Object();
new Thread(()->{
synchronized (o){
System.out.println(Thread.currentThread().getName()+":外层");
synchronized (o){
System.out.println(Thread.currentThread().getName()+":中层");
synchronized (o){
System.out.println(Thread.currentThread().getName()+":内层");
}
}
}
},"t1").start();
}
}
Lock
// 可重入锁
public class SyncLockDemo {
public static void main(String[] args) {
// Lock演示可重入锁
Lock lock = new ReentrantLock();
// 创建线程
new Thread(() -> {
try {
// 上锁
lock.lock();
System.out.println(Thread.currentThread().getName() + ":外层");
try {
// 上锁
lock.lock();
System.out.println(Thread.currentThread().getName() + ":内层");
} finally {
// 释放锁
lock.unlock();
}
} finally {
// 释放锁
lock.unlock();
}
}, "t1").start();
}
}
5.4 死锁
/**
* 演示死锁
*/
public class DeadLock {
// 创建两个对象
static Object a = new Object();
static Object b = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (a) {
System.out.println(Thread.currentThread().getName() + "持有锁a,视图获取锁b");
// 增加延迟,模拟效果
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (b) {
System.out.println(Thread.currentThread().getName() + "获取锁b");
}
}
}, "AA").start();
new Thread(() -> {
synchronized (b) {
System.out.println(Thread.currentThread().getName() + "持有锁b,视图获取锁a");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (a) {
System.out.println(Thread.currentThread().getName() + "获取锁a");
}
}
}, "BB").start();
}
}
6 Callable &Future 接口
6.1 Callable接口
创建线程的方法:一种是通过创建Thread类,另一种是使用Runnable创建线程,但是,Runnable缺少的一项功能是,当线程终止是(即run()完成时),无法使线程返回结果,为了支持此功能,提供了Callable接口
Callable接口特点
- 为了实现Runnable,需要实现不返回任何内容的run()方法,而对于的Callable,需要实现在完成时返回结果的call()方法
- call()方法可以引发异常,而run()则不能
- 实现Callable必须重写call方法
- 不能直接替换Runnable,因为Thread类的构造方法根本没有Callable
class MyThread01 implements Runnable{
@Override
public void run() {
}
}
class MyThread02 implements Callable<Integer>{
@Override
public Integer call() throws Exception {
return 200;
}
}
6.2 Future 接口
当 call()方法完成时,结果必须存储在主线程已知的对象中,以便主线程可以知道该线程返回的结果。为此,可以使用 Future 对象
将 Future 视为保存结果的对象–它可能暂时不保存结果,但将来会保存(一旦Callable 返回)。Future 基本上是主线程可以跟踪进度以及其他线程的结果的一种方式。要实现此接口,必须重写 5 种方法,这里列出了重要的方法,如下:
- public boolean cancel(boolean mayInterrupt):用于停止任务,如果尚未启动,它将停止任务。如果已启动,则仅在 mayInterrupt 为 true时才会中断任务
- public Object get()抛出 InterruptedException,ExecutionException:用于获取任务的结果。如果任务完成,它将立即返回结果,否则将等待任务完成,然后返回结果。
- public boolean isDone():如果任务完成,则返回 true,否则返回 false可以看到 Callable 和 Future 做两件事-Callable 与 Runnable 类似,因为它封装了要在另一个线程上运行的任务,而 Future 用于存储从另一个线程获得的结果。实际上,future 也可以与 Runnable 一起使用
要创建线程,需要 Runnable。为了获得结果,需要 future。
6.3 FutureTask
Java库具有具体的FutureTask类型,该类型实现了Runable了和Future,并方便地将两种功能组合在一起。可以通过为其构造函数提供Callable来创建FutureTask。然后,将FutureTask对象提供给Thread的构造函数以创建Thread对象。因此间接地使用Callable创建线程
核心原理
在主线程需要执行比较耗时的操作时,但是由不想阻塞主线程时,可以把这些作业交给Future对象在后台完成
- 当主线程将要需要时,可以通过Future对象获得后台作业的计算结果或执行状态
- 一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果
- 仅在计算完成时才能检索结果,如果计算尚未完成,则阻塞get方法
- 一旦计算完成,就不能重新开始或取消计算
- get方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或抛出异常
- get只计算一次,因此get方法放在最后
6.4 使用Callable和Future
class MyThread01 implements Runnable{
@Override
public void run() {
}
}
class MyThread02 implements Callable<Integer>{
@Override
public Integer call() throws Exception {
return 200;
}
}
public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Runnable接口创建方法
new Thread(new MyThread01(),"AA").start();
// FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread02());
// 简化lam表达式
FutureTask<Integer> task = new FutureTask<>(()->{
System.out.println(Thread.currentThread().getName()+" come in callable");
return 200;
});
// 创建一个线程
new Thread(task,"AA").start();
while (!task.isDone()){
System.out.println("wait......");
}
// 调用FutureTask的get方法
System.out.println(task.get());
System.out.println(task.get()); // 第二次调用直接返回
System.out.println(Thread.currentThread().getName()+"come over");
// FutureTask 原理,未来任务
}
}
6.5 小结
- 在主线程中需要执行比较耗时的操作时,但是又不想阻塞主线程,可以把这些作业交给Future对象在后台完成,当主线程将来需要是,通过Future对象获得后台作业的计算结果或执行状态
- 一般FutureTask多用于耗时计算,主线程可以在完成自己的任务后,再去获取结果
- 仅在计算完成时才能检索结果,如果尚未完成,则阻塞get方法。一旦计算完成,就不能再重新开始或取消计算。get方法获取结果只有在计算完成时,否则会一直阻塞直到任务转入完成状态,然后会返回结果或抛出异常
- 只计算一次
7 JUC三大辅助类
JUC 提供了三种常用的辅助类,通过这些辅助类可以很好的解决线程数量过多的Lock锁的频繁操作,这三种辅助类为:
- CountDownLatch:减少计数
- CyclicBarrier:循环栅栏
- Semaphore:信号灯
7.1 减少计数 CountDownLatch
CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行减 1 的操作,使用 await 方法等待计数器不大于 0,然后继续执行 await方法法之后的语句。
- CountDownLatch 主要有两个方法,当一个或多个线程调用 await 方法时,这些线程会阻塞
- 其它线程调用 countDown 方法会将计数器减 1(调用 countDown 方法的线程不会阻塞)
- 当计数器的值变为 0 时,因 await 方法阻塞的线程会被唤醒,继续执行
场景:6个同学陆续离开教室后值班同学才可以关门
//演示 CountDownLatch
public class CountDownLatchDemo {
//6个同学陆续离开教室之后,班长锁门
public static void main(String[] args) throws InterruptedException {
//创建CountDownLatch对象,设置初始值
CountDownLatch countDownLatch = new CountDownLatch(6);
//6个同学陆续离开教室之后
for (int i = 1; i <=6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" 号同学离开了教室");
//计数 -1
countDownLatch.countDown();
},String.valueOf(i)).start();
}
//等待
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+" 班长锁门走人了");
}
}
7.2 循环栅栏 CyclicBarrier
CyclicBarrier 看英文单词可以看出大概就是循环阻塞的意思,在使用中CyclicBarrier 的构造方法第一个参数是目标障碍数,每次执行 CyclicBarrier 一次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后的语句。可以将 CyclicBarrier 理解为加 1 操作
** 场景:集齐7颗龙珠可以召唤神龙**
//集齐7颗龙珠就可以召唤神龙
public class CyclicBarrierDemo {
//创建固定值
private static final int NUMBER = 7;
public static void main(String[] args) {
//创建CyclicBarrier
CyclicBarrier cyclicBarrier =
new CyclicBarrier(NUMBER,()->{
System.out.println("*****集齐7颗龙珠就可以召唤神龙");
});
//集齐七颗龙珠过程
for (int i = 1; i <=7; i++) {
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" 星龙被收集到了");
//等待
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
7.3 信号灯Seamphore
Semaphore 的构造方法中传入的第一个参数是最大信号量(可以看成最大线程池),每个信号量初始化为一个最多只能分发一个许可证。使用 acquire 方法获得许可证,release 方法释放许可
场景:抢车位,6部汽车3个停车位
//6辆汽车,停3个车位
public class SemaphoreDemo {
public static void main(String[] args) {
//创建Semaphore,设置许可数量
Semaphore semaphore = new Semaphore(3);
//模拟6辆汽车
for (int i = 1; i <=6; i++) {
new Thread(()->{
try {
//抢占
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+" 抢到了车位");
//设置随机停车时间
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName()+" ------离开了车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
8 读写锁
8.1 介绍
现实中有一种场景:对共享资源的读和写的操作,且写操作没有读操作那么频繁,在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了
针对这种场景,Java的并发包提供了读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁
- 线程进入读锁的前提条件:
- 没有其他线程的写锁
- 没有写请求,或者有写请求,但调用线程和持有锁的线程是同一个(可重入锁)
- 线程进入写锁的前提条件:
读写锁有以下三个重要的特性:
- 公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平
- 重进入:读锁和写锁都支持线程重进入
- 锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁
8.2 ReentrantReadWriteLock 类结构
public class ReentrantReadWriteLock implements ReadWriteLock,
java.io.Serializable {
/**
* 读锁
*/
private final ReentrantReadWriteLock.ReadLock readerLock;
/**
* 写锁
*/
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
/**
* 使用默认(非公平)的排序属性创建一个新的
* ReentrantReadWriteLock
*/
public ReentrantReadWriteLock() {
this(false);
}
/**
* 使用给定的公平策略创建一个新的 ReentrantReadWriteLock
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/**
* 返回用于写入操作的锁
*/
public ReentrantReadWriteLock.WriteLock writeLock() {
return
writerLock;
}
/**
* 返回用于读取操作的锁
*/
public ReentrantReadWriteLock.ReadLock readLock() {
return
readerLock;
}
abstract static class Sync extends AbstractQueuedSynchronizer {
}
static final class NonfairSync extends Sync {
}
static final class FairSync extends Sync {
}
public static class ReadLock implements Lock, java.io.Serializable {
}
public static class WriteLock implements Lock, java.io.Serializable {
}
}
可以看到,ReentrantReadWriteLock 实现了ReadWriteLock接口,ReadWriteLock接口定义了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable接口,表示可以序列化,在源代码中可以看到ReentrantReadWriteLock实现了自己的序列化逻辑
8.3 入门案例
场景:使用ReentrantReadWriteLock对一个HashMap进行读和写操作
实案例:
//资源类
class MyCache {
//创建map集合
private volatile Map<String,Object> map = new HashMap<>();
//创建读写锁对象
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
//放数据
public void put(String key,Object value) {
//添加写锁
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+" 正在写操作"+key);
//暂停一会
TimeUnit.MICROSECONDS.sleep(300);
//放数据
map.put(key,value);
System.out.println(Thread.currentThread().getName()+" 写完了"+key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放写锁
rwLock.writeLock().unlock();
}
}
//取数据
public Object get(String key) {
//添加读锁
rwLock.readLock().lock();
Object result = null;
try {
System.out.println(Thread.currentThread().getName()+" 正在读取操作"+key);
//暂停一会
TimeUnit.MICROSECONDS.sleep(300);
result = map.get(key);
System.out.println(Thread.currentThread().getName()+" 取完了"+key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放读锁
rwLock.readLock().unlock();
}
return result;
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) throws InterruptedException {
MyCache myCache = new MyCache();
//创建线程放数据
for (int i = 1; i <=5; i++) {
final int num = i;
new Thread(()->{
myCache.put(num+"",num+"");
},String.valueOf(i)).start();
}
TimeUnit.MICROSECONDS.sleep(300);
//创建线程取数据
for (int i = 1; i <=5; i++) {
final int num = i;
new Thread(()->{
myCache.get(num+"");
},String.valueOf(i)).start();
}
}
}
8.4 锁降级
示例:
//演示读写锁降级
public class Demo1 {
public static void main(String[] args) {
//可重入读写锁对象
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();//读锁
ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();//写锁
//锁降级
//1 获取写锁
writeLock.lock();
System.out.println("----write");
//2 获取读锁
readLock.lock();
System.out.println("---read");
//3 释放写锁
//writeLock.unlock();
//4 释放读锁
//readLock.unlock();
}
}
8.5 小结
- 在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。
- 在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。
原因: 当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。
9 阻塞队列
9.1 BlockingQueue简介
Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来了极大的便利。
阻塞队列,顾名思义,首先它是一个队列,通过一个共享的队列,可以使得数据由队列的一端输入,另一端输出。
- 当队列是空,从队列中获取元素的操作将被阻塞
- 当队列为满,从队列中添加元素的操作将被阻塞
- 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
- 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
常用的队列主要有以下两种:
- 先进先出(FIFO):先插入的队列元素也是最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性
- 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件(栈)
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
为什么需要BlockingQueue
- 好处是不需要关心什么时候需要阻塞线程,什么什么需要唤醒线程,因为这一切BlockingQueue都给包办了
- 在Concurrent包发布之前,在多线程环境下,程序员必须自己去控制这些细节,尤其还要兼顾效率和线程安全,而这种给我们的程序带来了不小的复杂度。
多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和 “消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然
9.2 BlockingQueue核心方法
放入数据
- offer(anObject):表示如果可能的话,将 anObject 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则返回 false.(本方法不阻塞当前执行方法的线程)
- offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入 BlockingQueue,则返回失败
- put(anObject):把 anObject 加到 BlockingQueue 里,如果 BlockQueue 没有空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续
获取数据
- poll(time): 取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等time 参数规定的时间,取不到时返回 null
- poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败
- take(): 取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 BlockingQueue 有新的数据被加入
- drainTo(): 一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁
9.3 入门案例
//阻塞队列
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
//创建阻塞队列
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
//第一组
// System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
//System.out.println(blockingQueue.element());
//System.out.println(blockingQueue.add("w"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//第二组
// System.out.println(blockingQueue.offer("a"));
// System.out.println(blockingQueue.offer("b"));
// System.out.println(blockingQueue.offer("c"));
// System.out.println(blockingQueue.offer("www"));
//
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
//第三组
// blockingQueue.put("a");
// blockingQueue.put("b");
// blockingQueue.put("c");
// //blockingQueue.put("w");
//
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
//第四组
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("w",3L, TimeUnit.SECONDS));
}
}
9.4 常见的BlockingQueue
9.4.1 ArrayBlockingQueue(常用)
- 基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置
- ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue 完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea 之所以没这样去做,也许是因为 ArrayBlockingQueue 的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue 和LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC 的影响还是存在一定的区别。而在创建 ArrayBlockingQueue 时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁
总结:由数组结构组成的有界阻塞队列
9.4.2 LinkedBlockingQueue(常用)
- 基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue 可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能
ArrayBlockingQueue 和 LinkedBlockingQueue 是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个
类足以。
总结: 由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列
9.4.3 DelayQueue
- DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列汇总获取到该元素,DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只是获取数据操作(消费者)才会被阻塞
10 ThreadPool线程池
- 线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
- 例子: 10 年前单核 CPU 电脑,假的多线程,像马戏团小丑玩多个球,CPU 需要来回切换。 现在是多核电脑,多个线程各自跑在独立的 CPU 上,不用切换效率高。
- 线程池的优势: 线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
主要特点:
- 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗
- 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行
- 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
- Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors,
ExecutorService,ThreadPoolExecutor 这几个类
10.2 线程池参数说明
10.2.1 常用参数
- corePoolSize线程池的核心线程数
- maximumPoolSize 能容纳的最大线程数
- keepAliveTime 空闲线程存活时间
- unit 存活的时间单位
- workQueue 存放提交但未执行任务的队列
- threadFactory 创建线程的工厂类
- handler 等待队列满后的拒绝策略
线程池中,有三个重要的参数,决定影响了拒绝策略:corePoolSize ,核心线程数,也即最小线程数。workQueue,阻塞队列。maximumPoolSize,最大线程数
当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。
总结起来,也就是一句话,当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。
10.2.2 拒接策略
-
CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
-
AbortPolicy: 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
-
DiscardPolicy: 直接丢弃,其他啥都没有DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
10.3 线程池的种类与创建
10.3.1 newCachedThreadPool(常用)
作用: 创建一个可缓存线程池,如果线程池长度超过处理长度,可灵活回收空闲线程,若无可回收,则新建线程
特点:
- 线程池中数量没有固定,可达到最大值(Integer.MAX_VALUE)
- 线程池中的线程可以进行缓存重复利用和回收(回收默认时间为1分钟)
- 当线程中,没有可用线程,会重新创建一个线程
创建方式
/**
* 可缓存线程池
* @return
*/
public static ExecutorService newCachedThreadPool(){
/**
* corePoolSize 线程池的核心线程数
* maximumPoolSize 能容纳的最大线程数
* keepAliveTime 空闲线程存活时间
* unit 存活的时间单位
* workQueue 存放提交但未执行任务的队列
* threadFactory 创建线程的工厂类:可以省略
* handler 等待队列满后的拒绝策略:可以省略
*/
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}
**场景:**适用于创建一个可无限扩大的线程池,服务器负载压力较轻,执行时间较短,任务多的场景
10.3.2 newFixedThreadPool(常用)
**作用:**创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在
特征:
• 线程池中的线程处于一定的量,可以很好的控制线程的并发量
• 线程可以重复被使用,在显示关闭之前,都将一直存在
• 超出一定量的线程被提交时候需在队列中等待
创建方式:
/**
* 固定长度线程池
* @return
*/
public static ExecutorService newFixedThreadPool(){
/**
* corePoolSize 线程池的核心线程数
* maximumPoolSize 能容纳的最大线程数
* keepAliveTime 空闲线程存活时间
* unit 存活的时间单位
* workQueue 存放提交但未执行任务的队列
* threadFactory 创建线程的工厂类:可以省略
* handler 等待队列满后的拒绝策略:可以省略
*/
return new ThreadPoolExecutor(10,
10,
0L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}
**场景:**适用于可以预测线程数量的业务中,或者服务器负载较重,对线程数有严格限制的场景
10.3.3 newSingleThreadExecutor(常用)
**作用:**创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的newFixedThreadPool 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。
特征: 线程池中最多执行 1 个线程,之后提交的线程活动将会排在队列中以此执行
创建方式:
/**
* 单一线程池
* @return
*/
public static ExecutorService newSingleThreadExecutor(){
/**
* corePoolSize 线程池的核心线程数
* maximumPoolSize 能容纳的最大线程数
* keepAliveTime 空闲线程存活时间
* unit 存活的时间单位
* workQueue 存放提交但未执行任务的队列
* threadFactory 创建线程的工厂类:可以省略
* handler 等待队列满后的拒绝策略:可以省略
*/
return new ThreadPoolExecutor(1, 1,
0L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}
场景: 适用于需要保证顺序执行各个任务,并且在任意时间点,不会同时有多个
线程的场景
10.4 线程池工作原理
- 在创建了线程池后,线程池中的线程数为零
- 当调用 execute()方法添加一个请求任务时,线程池会做出如下判断:
- 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
- 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
- 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
- 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行
- 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
- 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。
- 所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
10.5 注意事项
- 项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都有一定问题,原因是 FixedThreadPool 和 SingleThreadExecutor 底层都是用LinkedBlockingQueue 实现的,这个队列最大长度为 Integer.MAX_VALUE,容易导致 OOM。所以实际生产一般自己通过 ThreadPoolExecutor 的 7 个参数,自定义线程池
- 创建线程池推荐适用 ThreadPoolExecutor 及其 7 个参数手动创建
- corePoolSize 线程池的核心线程数
- maximumPoolSize 能容纳的最大线程数
- keepAliveTime 空闲线程存活时间
- unit 存活的时间单位
- workQueue 存放提交但未执行任务的队列
- threadFactory 创建线程的工厂类
- handler 等待队列满后的拒绝策略
- 为什么不允许适用不允许 Executors.的方式手动创建线程池,如下图
11 Fork/Join
Fork/Join可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join 框架要完成两件事情:
- Fork:把一个复杂的任务进行拆分,大事化小
- Join:把拆分任务的结果进行合并
**任务分割:**首先 Fork/Join 框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割
执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。
在 Java 的 Fork/Join 框架中,使用两个类完成上述操作
-
ForkJoinTask: Fork/Join 框架,首先需要创建一个 ForkJoin 任务。该类提供了在任务中执行 fork 和 join 的机制。通常情况下不需要直接集成 ForkJoinTask 类,只需要继承它的子类,Fork/Join 框架提供了两个子类:
- a.RecursiveAction:用于没有返回结果的任务
- b.RecursiveTask:用于有返回结果的任务
-
ForkJoinPool: ForkJoinTask 需要通过 ForkJoinPool 来执行
-
RecursiveTask: 继承后可以实现递归(自己调自己)调用的任务
Fork/Join 框架的实现原理
- ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成,
- ForkJoinTask 数组负责将存放以及将程序提交给 ForkJoinPool,而
- ForkJoinWorkerThread 负责执行这些任务
11.2 Fork 方法
Fork方法的实现原理: 当调用ForkJoinTask的fork方法时,程序会把任务放到ForkJoinWorkerThread的pushTask的workQueue中,异步地执行这个任务,然后立即返回结果
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
pushTask方法把当前任务存放到ForkJoinTask数组队列里,然后再调用ForkJoinPool的singleWork()方法唤醒或创建一个工作线程来执行任务。代码如下:
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a;
int s = top, d, cap, m;
ForkJoinPool p = pool;
if ((a = array) != null && (cap = a.length) > 0) {
QA.setRelease(a, (m = cap - 1) & s, task);
top = s + 1;
if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 &&
p != null) { // size 0 or 1
VarHandle.fullFence();
p.signalWork();
}
else if (d == m)
growArray(false);
}
}
11.3 join方法
Join方法的主要作用是阻塞当前线程并等待获取结果。ForkJoinTask的join方法实现,代码如下:
public final V join() {
int s;
if (((s = doJoin()) & ABNORMAL) != 0)
reportException(s);
return getRawResult();
}
首先调用doJoin方法,通过doJoin方法得到当前任务的状态来判断返回什么结果,任务状态有四种:
- 已完成 NO RMAL
- 被取消 CANCELLED
- 信号 SIGNAL
- 出现异常EXECPTIONAL
如果任务状态是已完成,则直接返回任务结果
如果任务状态是被取消,则直接抛出CancellactionExecption
如果任务状态是抛出异常,则直接抛出对应的异常
doJoin方法实现:
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
在 doJoin()方法流程如下:
- 首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接
返回任务状态;
- 如果没有执行完,则从任务数组里取出任务并执行。
- 如果任务顺利执行完成,则设置任务状态为 NORMAL,如果出现异常,则记
录异常,并将任务状态设置为 EXCEPTIONAL。
11.4 Fork/Join框架的异常处理
ForkJoinTask 在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以 ForkJoinTask 提供了 isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask 的getException 方法获取异常。
getException 方法返回 Throwable 对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回 null。
11.5 案例
场景:生成一个计算任务,计算1+2+3+……+1000,每100个数切分一个子任务
/**
* 递归累加
*/
class TaskExample extends RecursiveTask<Long> {
private int start;
private int end;
private long sum;
/**
* 构造函数
*
* @param start
* @param end
*/
public TaskExample(int start, int end) {
this.start = start;
this.end = end;
}
/**
* The main computation performed by this task.
*
* @return the result of the computation
*/
@Override
protected Long compute() {
System.out.println("任务" + start + "=========" + end + "累加开始");
//大于 100 个数相加切分,小于直接加
if (end - start <= 100) {
for (int i = start; i <= end; i++) {
//累加
sum += i;
}
} else {
//切分为 2 块
int middle = start + 100;
//递归调用,切分为 2 个小任务
TaskExample taskExample1 = new TaskExample(start, middle);
TaskExample taskExample2 = new TaskExample(middle + 1, end);
//执行:异步
taskExample1.fork();
taskExample2.fork();
//同步阻塞获取执行结果
sum = taskExample1.join() + taskExample2.join();
}
//加完返回
return sum;
}
}
/**
* 分支合并案例
*/
public class ForkJoinPoolDemo {
/**
* 生成一个计算任务,计算 1+2+3.........+1000
*
* @param args
*/
public static void main(String[] args) {
//定义任务
TaskExample taskExample = new TaskExample(1, 1000);
//定义执行对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
//加入任务执行
ForkJoinTask<Long> result = forkJoinPool.submit(taskExample);
//输出结果
try {
System.out.println(result.get());
} catch (Exception e) {
e.printStackTrace();
} finally {
forkJoinPool.shutdown();
}
}
}
12 CompleteableFutrue
12.1 CompleteabelFutrue简介
CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。
CompletableFuture 实现了 Future, CompletionStage 接口,实现了 Future接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture 类。
12.2 Future和CompleteableFuture
Futrue 在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个 Futrue,在 Future 里面有 isDone 方法来 判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Future 的主要缺点:
不支持手动完成
- 提交一个任务,但是执行太慢了,通过其它路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所有必须主动取消或已知等待他执行完成
不支持进一步的非阻塞调用
- 通过Future的get方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为Future不支持回调函数,所以无法实现这个功能
不支持链式调用
- 对于Future的执行结果,想要继续传到下一个Future处理使用,从而形成一个链式的pipline调用,在Future中是无法实现的
不支持多个Future合并
- 比如由10个Future并行执行,想在所有的Future运行完成之后,执行某些函数,是没法通过Future实现的
不支持异常处理
- Future的API没有任何的异常处理的api,所以在异步运行时,如果出现了问题是不好定位的
12.3 CompleteableFuture 入门
12.3.1 使用CompleteableFuture
场景:主线程里面创建一个CompleteableFuture,然后主线程调用get方法会阻塞,最后在一个子线程中使其终止
/**
* 主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们
* 在一个子线程中使其终止
*
* @param args
*/
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "子线程开始干活");
//子线程睡 5 秒
Thread.sleep(5000);
//在子线程中完成主线程
future.complete("success");
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();
//主线程调用 get 方法阻塞
System.out.println("主线程调用 get 方法获取结果为: " + future.get());
System.out.println("主线程完成,阻塞结束!!!!!!");
}
12.3.2 没有返回值的异步任务&有返回值的异步任务
//异步调用和同步调用
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
//同步调用
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + " : CompletableFuture1");
});
completableFuture1.get();
//mq消息队列
//异步调用
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " : CompletableFuture2");
//模拟异常
int i = 10 / 0;
return 1024;
});
completableFuture2.whenComplete((t, u) -> {
System.out.println("------t=" + t);
System.out.println("------u=" + u);
}).get();
}
}
12.3 线程依赖
当一个线程依赖另一个线程时,可以使用thenApply方法来把这两个线程串行化
public class Test {
private static Integer num = 10;
/**
* 先对一个数加 10,然后取平方
*
* @param args
*/
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
CompletableFuture<Integer> future =
CompletableFuture.supplyAsync(() -> {
try {
System.out.println("加 10 任务开始");
num += 10;
} catch (Exception e) {
e.printStackTrace();
}
return num;
}).thenApply(integer -> {
return num * num;
});
Integer integer = future.get();
System.out.println("主线程结束, 子线程的结果为:" + integer);
}
}
12.3.4 消费处理结果
thenAccept消费处理结果,接收任务的处理结果,并消费处理,无返回结果
public class Test {
private static int num =10;
public static void main(String[] args) throws Exception{
System.out.println("主线程开始");
CompletableFuture.supplyAsync(() -> {
try {
System.out.println("加 10 任务开始");
num += 10;
} catch (Exception e) {
e.printStackTrace();
}
return num;
}).thenApply(integer -> {
return num * num;
// thenAccept消费
}).thenAccept(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" +
integer);
}
});
}
}
12.3.5 异常处理
exceptionally异常处理,出现异常时触发
public class Test {
private static int num =10;
public static void main(String[] args) throws Exception{
System.out.println("主线程开始");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 添加exception
int i= 1/0;
System.out.println("加 10 任务开始");
num += 10;
return num;
}).exceptionally(ex -> {
System.out.println(ex.getMessage());
return -1;
});
System.out.println(future.get());
}
}
handle类似于thenAccept/thenRun方法,是最后一步的处理调用,但是同时可以处理异常
public class Test {
private static int num =10;
public static void main(String[] args) throws Exception{
System.out.println("主线程开始");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("加 10 任务开始");
num += 10;
return num;
}).handle((i,ex) ->{
System.out.println("进入 handle 方法");
if(ex != null){
System.out.println("发生了异常,内容为:" + ex.getMessage());
return -1; }else{
System.out.println("正常完成,内容为: " + i);
return i; }
});
System.out.println(future.get());
}
}
12.3.6 结果合并
thenCompose 合并两个有依赖关系的CompleteableFuture的执行结果
public class Test {
private static int num =10;
public static void main(String[] args) throws Exception{
System.out.println("主线程开始");
//第一步加 10
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("加 10 任务开始");
num += 10;
return num;
});
//合并
CompletableFuture<Integer> future1 = future.thenCompose(i ->
//再来一个 CompletableFuture
CompletableFuture.supplyAsync(() -> {
return i + 1;
}));
System.out.println(future.get());
System.out.println(future1.get());
}
}
thenCombine合并两个没有依赖的CompleteableFuture任务
// 执行任务调度不一样,导致不一样的结果,纯属测试合并
public class Test {
private static int num =10;
public static void main(String[] args) throws Exception{
System.out.println("主线程开始");
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
System.out.println("加 10 任务开始");
num += 10;
return num;
});
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
System.out.println("乘以 10 任务开始");
num = num * 10;
return num;
});
//合并两个结果
CompletableFuture<Object> future = job1.thenCombine(job2, new
BiFunction<Integer, Integer, List<Integer>>() {
@Override
public List<Integer> apply(Integer a, Integer b) {
List<Integer> list = new ArrayList<>();
list.add(a);
list.add(b);
return list; }
});
System.out.println("合并结果为:" + future.get());
}
}