JUC并发编程
要想学习JUC就必须了解 java.util concurrent 包的工具类,其中包含:
- java.util concurrent (并发包)
- java.util concurrent.atomic (并发原子包)
- java.util concurrent.locks (并发lock包)
1. volatile关键字——内存可见性
添加volatile关键字后,当多个线程同时操作共享线程时,可以保证内存中的数据可见。相较于synchronized是一种较为轻量级的同步策略。
【注意点:①volatile 不具备 “互斥性” ,即程序中如果有一个线程访问共享数据拿到锁时,另一个线程依然可以访问该共享数据。
②volatile 不能保证变量的 “原子性” 】
解决 volatile 中的不能保证原子性问题,可以用jdk5之后的 java.util concurrent.atomic 包下提供的常用原子变量:
1,volatile 保证内存可见性
2,CAS(Compare-And-Swap)算法保证数据的原子性 (乐观锁的实现方式之一)
CAS 算法是硬件对于并发操作共享数据的支持,包含了三个操作数:内存值V、预估值A、更新值B;当且仅当 V == A 时,才会执行 V = B ,否则将不做任何操作,即当多个线程并发的对共享数据进行修改时,有且只有一个会成功,就保证了变量的原子性。
(CAS算法的效率要高于synchronized锁,因为CAS算法在本次线程执行操作不成功后,依然可以立即对数据进行修改操作,不会阻塞,一般情况下是一个自旋操作,即不断的重试)
public class TestIcon {
public static void main(String[] args){
AtomicDemo atomicDemo = new AtomicDemo();
for (int x = 0;x < 10; x++){
new Thread(atomicDemo).start();
}
}
}
class AtomicDemo implements Runnable{
//创建原子变量对象
private AtomicInteger serialNum = new AtomicInteger();
public int getSerialNumber(){
return serialNum.getAndIncrement();//调用原子变量的方法:原子上增加一个当前值
}
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getI());
}
}
2.Lock锁实现线程精准通知唤醒机制(Condition接口)
在多线程基础篇已经详细讲解Lock锁的作用和用法,这里补充详细讲解一下Condition接口。
一个Lock对象中可以创建多个Condition实例(即对象监视器),一个Condition
实例本质上绑定到一个锁。通过 newCondition()获取特定Condition实例。方法 await() 使当前线程等或interrupted;方法 signal() 唤醒一个等待的线程,signalAll() 唤醒所有等待的线程。
//Condition实现线程精准通知和唤醒,让线程A,B,C依次轮流执行
public class Demo {
public static void main(String[] args) {
Date date = new Date();
new Thread(() -> {
for (int i = 0; i < 10; i++) date.mythreadA();
},"线程A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) date.mythreadB();
},"线程B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) date.mythreadC();
},"线程C").start();
}
}
class Date{
private int number = 0;//标记正在执行的线程
private final Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
public void mythreadA(){
lock.lock();
try {
while (number != 0) {//判断执行的线程
condition.await();
}
System.out.println(Thread.currentThread().getName()+"---A线程正在执行");
//唤醒指定的线程
number = 1;
condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void mythreadB(){
lock.lock();
try {
while (number != 1){
condition1.await();
}
System.out.println(Thread.currentThread().getName()+"---B线程正在执行");
number = 2;
condition2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void mythreadC(){
lock.lock();
try {
while (number != 2){
condition2.await();
}
System.out.println(Thread.currentThread().getName()+"---C线程正在执行");
number = 0;
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
【注意:在线程并发执行问题中有一个典型的线程虚假唤醒的情况,解决虚假唤醒问题的关键就在于判断线程是否等待或唤醒时应使用while循环判断,而不是if判断。原因是if只判断一次就继续向下执行,while会循环判断条件。】
3.同步容器类/并发容器类(JUC线程安全的集合)
1)ConcurrentHashMap
我们都知道HashMap是线程不安全的,简单说一下原因;而HashTable是线程安全的,因为其内部用到了synchronized锁,而且是锁整个hash表,所以当执行多线程操作时,效率非常低。jdk5后提供了ConcurrentHashMap同步容器类增加的一个线程安全的哈希表,对于多线程的操作介于hashMap和hashTable之间。
jdk7 中采用的是锁分段机制,将整个数据结构分段(默认为16段)进行存储,然后给每一段数据(segment)配一把锁(继承ReentrantLock),当一个线程占用锁访问其中一个段的数据的时候,其他段的数据仍然能被其他线程访问,能够实现真正的并发访问。结构如下图所示:
在jdk8中concurrentHashMap已经优化,其数据结构已经接近对应版本的HashMap。取消了Segment分段锁的数据结构,取而代之的是Node数组+链表+红黑树的结构,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率【Node类成员变量Node的元素val和指针next都标注volatile,目的是在多线程环境下线程A修改结点的val或者新增节点的时候是对线程B可见的】;JDK8采用CAS(读)+Synchronized(写)保证线程安全;对每个数组元素(Node)加锁;在链表节点数量大于8且当前数组长度大于64时,会将链表转化为红黑树进行存储;JDK8推荐使用mappingCount方法而不是size方法获取当前map表的大小,因为这个方法的返回值是long类型,size方法是返回值类型是int。下图是jdk8存储结构:
//线程安全的map集合
Map<String,String> map = new ConcurrentHashMap<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
2)CopyOnWriteArrayList/CopyOnWriteArraySet:写入并复制
当期望许多线程访问一个给定的collection时,ConcurrentHashMap 通常优于同步的hashMap,ConcurrentSkinListMap 通常优于同步的treeMap。当期望的读数和遍历远远大于列表的更新数时,CopyOnWriteArrayList 优于同步的ArrayList。
【注意:CopyOnWriteArrayList 在添加修改操作多时,效率低,因为每次添加修改时都会进行复制,开销很大;当并发迭代操作多时选择更好。】
/*
* 并发情况下,ArrayList线程不安全,报错java.util.ConcurrentModificationException并发修改异常
* 解决方法①:List<String> list = new Vector<>();
* ②:List<String> list = Collections.synchronizedList(new ArrayList<>());
* ③:List<String> list = new CopyOnWriteArrayList<>();
*/
// List<String> list = new ArrayList<>();
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
3)CountDownLatch:闭锁
允许一个或多个线程,等待其他N个线程(一组线程)完成某个事情之后才能继续执行的同步辅助类,通俗的讲就是在完成某些运算时,只有其他所有线程的运算全部完成,当前运算才继续执行,类似于倒计时器(锁存器)。
一个CountDownLatch
为一个计数的CountDownLatch用作一个简单的开/关锁存器,或者门:所有线程调用await()
在门口等待,直到被调用countDown()
的线程打开。
//构建一个给定的计数,总数是10,必须要执行任务的时候,再使用
CountDownLatch cdl = new CountDownLatch(10);
System.out.println("open door");
for (int i = 0; i < 10; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"Go out");
//减少锁存器(计数器)的计数,如果计数达到零,释放所有等待的线程
cdl.countDown();//每执行完一个就递减一个
},String.valueOf(i)).start();
}
//等待锁存器(计数器)归零,然后再向下执行
cdl.await();
System.out.println("close door");
4)CyclicBarrier
N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
两种构建方法:CyclicBarrier(int parties);CyclicBarrier(int parties,Runnable barrierAction)
public class CyclicBarrieDemo {
public static void main(String[] args) {
CyclicBarrier cbr = new CyclicBarrier(3);
for (int i = 1; i <= cbr.getParties(); i++) {
int tmp = i;
new Thread(() -> {
try {
System.out.println("学员1号:翻越第"+tmp+"号障碍");
//等待 让cbr中的所有线程都完成后才会继续下一步行动
cbr.await();//即在所有人翻越完第一个障碍之前都不能继续翻越其他障碍
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println("学员2号:翻越第"+tmp+"号障碍");
cbr.await();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println("学员3号:翻越第"+tmp+"号障碍");
cbr.await();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
System.out.println("main主线程finish");
}
}
CountDownLatch和CyclicBarrier的区别:
①CountDownLatch计算为0时释放所有等待的线程;CyclicBarrier计数达到指定值时释放所有等待线程。
②CountDownLatch计数为0时,无法重置;CyclicBarrier计数达到指定值时,计数置为0重新开始。
③CountDownLatch不可重复利用;CyclicBarrier可重复利用。
【总的来说:CountDownLatch 是计数器, 线程完成一个就记一个, 就像 报数一样, 只不过是递减的。
CyclicBarrier更像一个水闸, 线程执行就想水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流。】
5)Semaphore:计数信号量(信号灯)
Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。每个acquire()都会
阻塞,直到许可证可用,然后才能使用它。 每个release()
添加许可证,潜在地释放阻塞获取方。 但是,没有使用实际的许可证对象; Semaphore
只保留可用数量的计数,并相应地执行。
构造器构建方式:Semaphore(int permits)---
创建一个 Semaphore
与给定数量的许可证和非公平设置
Semaphore(int permits,boolean fair)----
创建一个 Semaphore
与给定数量的许可证和给定的公平设置
【是否公平,获得锁的顺序与线程启动顺序有关,就是公平,先启动的线程,先获得锁。fair 不能100% 保证公平,只能是大概率公平。fair 为 true,则表示公平,先启动的线程先获得锁.】
public class SemaphoreDemo {
public static void main(String[] args) {
//模拟资源类,限流,假设有三个空车位
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
//获得一个资源,会将当前信号量-1,假设如果已经满了,就等待到被释放为止
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢占到了车位");
Thread.sleep(2000);//抢占车位后停2秒
System.out.println(Thread.currentThread().getName()+"离开了车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放此资源,会将当前信号量+1,然后唤醒等待的线程
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
【小扩展:若将构建的Semaphore限流个数设置为1,等价于synchronized的用法,即资源类被synchronized锁住,哪个线程抢到就可以得到资源数据。那么可以变相实现多线程并发抢占资源类(锁),抢到的线程可以任意设置其抢占资源的时间。】
6)ReadWriteLock:读写锁
ReentrantLock(排他锁)具有完全互斥排他的效果,即同一时刻只允许一个线程访问,这样做虽然虽然保证了实例变量的线程安全性,但效率非常低下。ReentrantReadWriteLock实现了ReadWriteLock接口。ReentrantReadWriteLock里面提供了很多丰富的方法,不过最主要的有两个方法:readLock()和writeLock()用来获取读锁和写锁。读写锁维护了两个锁,一个是读操作相关的锁称为共享锁,一个是写操作相关的锁称为独占锁(排他锁)。通过分离读锁和写锁,其并发性比一般排他锁有了很大提升。
【多个读锁之间不互斥,读锁与写锁互斥,写锁与写锁互斥(只要出现写操作的过程就是互斥的)】
public class ReadWriteLockDemo {
public static void main(String[] args) {
ReadWriteLockThread rwl = new ReadWriteLockThread();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
rwl.write();
}
},"write").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
rwl.read();
}
},"read1").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
rwl.read();
}
},"read2").start();
}
}
class ReadWriteLockThread{
private int number = 0;
ReadWriteLock lock = new ReentrantReadWriteLock();
//读
public void read(){
lock.readLock().lock();//上锁
try {
System.out.println(Thread.currentThread().getName()+":"+number);
} finally {
lock.readLock().unlock();//释放锁
}
}
//写
public void write(){
lock.writeLock().lock();//上锁
try {
number = (int)( Math.random()*100+1);
System.out.println(Thread.currentThread().getName()+":"+number);
} finally {
lock.writeLock().unlock();//释放锁
}
}
}
7)BlockingQueue :阻塞队列
核心用法:
方法类型 |
抛出异常 |
特殊值 |
阻塞 |
超时 |
插入 |
add(e) |
offer(e) |
put(e) |
offer(e,time,unit) |
移除 |
remove() |
poll() |
take() |
poll(time,unit) |
检查 |
element() |
peek() |
不可用 |
不可用 |
四组不同操作解释:
抛出异常 |
当阻塞队列满时,再往队列中add插入元素会抛异常:Queue full 当阻塞队列空时,再往队列中remove移除元素会抛NOSuchElementException |
特殊值 |
插入方法,成功true,失败false 移除方法,成功返回出队列的元素,队列里没有就返回null |
一直阻塞 |
当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产者线程直到put数据or响应中断退出 当阻塞队列空时,消费者线程试图从队列中take元素,队列会一直阻塞消费者线程直到队列可用 |
超时退出 |
当阻塞对内满时,队列会阻塞生产者线程所设定的时间,超过显示后生产者线程会退出 |
BlockingQueue接口还有多种常用实现类:ArrayBlockingQueue,LinkBlockingQueue,SychronousQueue等,本篇文章没有介绍,可以参考https://www.cnblogs.com/KingIceMou/p/8075343.html
4.各种锁及其含义
公平锁,非公平锁,可重入锁(递归锁),自旋锁,独占锁(互斥锁,排他锁),共享锁,以及乐观锁和悲观锁
1)公平锁:是指多个线程按照申请锁的顺序来获取锁。在并发情况下,每个线程在获取锁时会查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有锁,否则就会加入到等待队列中,以后会按照FIFO的规则从队列中取到自己。
2)非公平锁:是指多个线程获取锁的顺序并不是按照申请锁的顺序,开始就直接尝试占有锁,如果尝试失败,就再采取类似公平锁那种方式,有可能后申请的线程先申请的线程优先获取锁。在高并发的情况下,有可能会造成优先级反转或者饥饿现象。
【ReentrantLock的创建可以指定构造函数的boolean类型来得到公平锁或非公平锁,默认是非公平锁。
非公平锁创建:ReentrantLock lock = new ReentrantLock(); 或 ReentrantLock lock = new ReentrantLock(false);
公平锁创建:ReentrantLock lock = new ReentrantLock(true); 非公平锁的优点在于吞吐量比公平锁大】
3)可重入锁(递归锁):指的是同一个线程外层函数获得锁之后,内层递归函数仍然能获取该锁的代码,在同一线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。也就是说,线程可以进入任何一个它已经拥有的锁中所有同步着的代码块或方法。可重入锁最大的作用是避免死锁。
4)自旋锁:是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下切换的消耗,缺点是循环会消耗CPU。底层是由CAS算法实现。
5)独占锁(写锁):指该锁一次只能被一个线程所持有。写操作:原子 + 独占
6)共享锁(读锁):指该锁可被多个线程所持有。共享锁可保证并发读是非常高效的。
7)乐观锁:总是认为不会产生并发问题,每次去取数据的时候总认为不会有其他线程对数据进行修改,因此不会上锁,但是在更新时会判断其他线程在这之前有没有对数据进行修改,一般会使用版本号机制或CAS操作实现。
【version方式:一般是在数据表中加上一个数据版本号version字段,表示数据被修改的次数,当数据被修改时,version值会加一。当线程A要更新数据值时,在读取数据的同时也会读取version值,在提交更新时,若刚才读取到的version值为当前数据库中的version值相等时才更新,否则重试更新操作,直到更新成功。】
8)悲观锁:总是假设最坏的情况,每次取数据时都认为其他线程会修改,所以都会加锁(读锁、写锁、行锁等),当其他线程想要访问数据时,都需要阻塞挂起。可以依靠数据库实现,如行锁、读锁和写锁等,都是在操作之前加锁。
【synchronized是非公平、可重入、独占的悲观锁;ReentrantLock是默认非公平、可重入,独占的悲观锁】
扩展:
/* 八锁问题:
* 1.两个普通同步方法,同一对象,分别用两个线程去调用---> one two
* 2.在getOne方法中加入Threa.sleep(),同一对象,再分别用两个线程去调用---> one two
* 3.新增普通静态方法getThree(),同一对象,三个线程分别调用--->three one two
* 4.两个普通同步方法,两个number对象,两个线程--->two one
* 5.修改getOne()方法为静态同步方法,同一对象,两线程调--->two one
* 6.修改两个方法均为静态同步方法,同一对象,两线程调--->one two
* 7.getOne()为静态同步,getTwo()为非静态同步,两个对象,两线程--->two one
* 8.两个均为静态同步方法,两个对象,两线程--->one two
*
* 线程8锁问题:①非静态方法的锁默认为this,静态方法的锁为Class实例
* ②某一时刻,只能有一个线程有锁,无论几个方法。
*/
public class EightLocksDemo {
public static void main(String[] args) {
GetNumberThread number = new GetNumberThread();
GetNumberThread number2 = new GetNumberThread();
new Thread(()-> number.getOne()).start();
new Thread(()->{
// number.getTwo();
number2.getTwo();
}).start();
// new Thread(()-> number.getThree()).start();
}
}
class GetNumberThread{
//同步方法1
// public synchronized void getOne(){
// try {
// Thread.sleep(200);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println("one");
// }
public static synchronized void getOne(){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("one");
}
//同步方法2
// public synchronized void getTwo(){
// System.out.println("two");
// }
public static synchronized void getTwo(){
System.out.println("two");
}
//普通静态方法
public static void getThree(){
System.out.println("three");
}
}