java晋级赛 深入并发编程

2023-11-10

根据黑马java并发编程学习做的笔记 传送门 https://www.bilibili.com/video/BV16J411h7Rd?p=15

java晋级赛 深入并发编程

一.多线程基础

进程与线程

进程
程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至 CPU,数据加载至内存。在指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理 IO 的。
当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程。
进程就可以视为程序的一个实例。大部分程序可以同时运行多个实例进程(例如记事本、画图、浏览器 等),也有的程序只能启动一个实例进程(例如网易云音乐、360 安全卫士等)
线程
一个进程之内可以分为一到多个线程。
一个线程就是一个指令流,将指令流中的一条条指令以一定的顺序交给 CPU 执行 。
Java 中,线程作为小调度单位,进程作为资源分配的小单位。 在 windows 中进程是不活动的,只是作 为线程的容器

并发与并行
并发是一个CPU在不同的时间去不同线程中执行指令。
并行是多个CPU同时处理不同的线程。:
并发是同一时间应对多件事情的能力
并行是同一时间动手做多件事情的能力

创建线程的方式及运行原理

创建线程的方式

a.通过继承Thread创建线程
请添加图片描述

2.实现Runnable接口 配合Thread使用
请添加图片描述

3.使用FutureTask (有返回值 通过get方法回放)

public class Test {
	public static void main(String[] args) throws ExecutionException, InterruptedException {
        //需要传入一个Callable对象
		FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>() {
			@Override
			public Integer call() throws Exception {
				System.out.println("线程执行!");
				Thread.sleep(1000);
				return 100;
			}
		});

		Thread r1 = new Thread(task, "t2");
		r1.start();
		//获取线程中方法执行后的返回结果
		System.out.println(task.get());
	}
}
线程运行原理

栈与栈帧
Java Virtual Machine Stacks (Java 虚拟机栈) 我们都知道 JVM 中由堆、栈、方法区所组成,其中栈内存是给谁用的呢?

其实就是线程,每个线程启动后,虚拟机就会为其分配一块栈内存
每个栈由多个栈帧(Frame)组成,对应着每次方法调用时所占用的内存
每个线程只能有一个活动栈帧,对应着当前正在执行的那个方法

如图所示 主线程每次方法的调用对应一次栈帧;
请添加图片描述
线程上下文切换 (原因)
a 因为以下原因导致cpu不再执行当前的线程,转而执行另外一个线程的代码
b 线程的cpu时间片用完
c 垃圾回收
d 更高优先级的线程需要运行
e 线程自己调用了Sleep ,yield wait join park synchronize lock 等方法

当 线程上下文切换 发生时,需要由操作系统保存当前线程的状态,并恢复另一个线程的状态,Java 中对应的概念 就是程序计数器(Program Counter Register),它的作用是记住下一条 jvm 指令的执行地址,是线程私有的

状态包括程序计数器、虚拟机栈中每个栈帧的信息,如局部变量、操作数栈、返回地址等
Context Switch 频繁发生会影响性能

查看进程线程的命令
请添加图片描述

线程的常见方法

1 .start与run 方法

thread.run()方法 直接调用实际上还是主线程调用的run(),必须要用start方法调用才是异步线程;

2.sleep (使线程阻塞)

调用sleep 会让线程从 running 状态 改为timed_waiting(阻塞)
可以使用interrupted方法打断真在睡眠的线程,这时 sleep 方法会抛出IntreruptedException异常
睡眠结束后的线程未必会立刻得到执行
建议用TimeUnit 的sleep替换Thread的sleep来获得更好的可读性

当线程中使用while(true)时,可以使用yield或sleep;来让出cpu的使用权给其他程序

//休眠一秒
TimeUnit.SECONDS.sleep(1);
//休眠一分钟
TimeUnit.MINUTES.sleep(1);
3.yield(让出当前线程)

调用 yield 会让当前线程从 Running 进入 Runnable 就绪状态(仍然有可能被执行),然后调度执行其他线程
具体的实现依赖于操作系统的任务调度器

线程优先级
如果cup比较忙,那么优先级搞的线程会获得更多的时间片.当cpu闲时,没啥用

4.join方法 同步等待 等待线程运行结束

t1.join(long n) 调用该方法的线程 等t1待线程运行结束 最多等待n秒
用于等待某个线程结束。哪个线程内调用join()方法,就等待哪个线程结束,然后再去执行其他线程。

如在主线程中调用ti.join(),则是主线程等待t1线程结束

join应用 同步
需要等待结果返回,才能继续运行的就是同步
不需要等待结果返回,就能继续运行就是异步

5.interrupt 方法
打断 sleep wait join的线程 可以用来较为优雅的停止线程

线程两阶段终止模式 见图 写代码
请添加图片描述

使用线程对象的stop() 方法停止线程 : 如果线程锁住了共享资源 ,那么当他被杀死后就再也没有机会释放锁,其他
线程也没有办法获取锁

主线程与守护线程
当除了守护线程外的线程 都停止了, 守护线程也会 停止了

//设置该线程为守护线程
Thread.setDaemon(true)

五种状态 操作系统层面 如图
请添加图片描述

六种状态 这是从Java api来描述的 如图

请添加图片描述

二.共享模型之管程

1.共享带来的问题

共享会带来什么问题?
上下文切换 在多线程 的环境中,会导致JVM字节码指令的交错 会出现问题 由此带出临界区

临界区

单个程序运行多个线程本身是没有问题的 有问题出现是在多个线程访问共享资源
多个线程 读共享资源 没有问题
在多个线程对同一共享资源读写操作时 发生指令交错,就会出现问题
一段代码块内如果存在对共享资源的多线程读写操作,称这段代码块为临界区

  static int monitor = 0;
        //临界区
        {
            monitor++;
        }

        //临界区
        {
            monitor--;
        }
竞态条件

多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件

2.synchronize解决方案

解决方案

为了避免临界区的竞态条件发生,有多种手段可以达到目的。
阻塞式的解决方案:synchronized,Lock
非阻塞式的解决方案:原子变量

语法
        synchronized (对象) //线程1,线程2(blcoked 阻塞状态)
        {
            临界区
        }

synchronize 实际上用了对象锁保证了临界区内代码的原子性;临界区内的代码对外是不可分割的,不会被线程切换所打断

synchronized加在方法上

加在成员方法上

public class Demo {
	//在方法上加上synchronized关键字
	public synchronized void test() {
	}
	//等价于
	public void test() {
		synchronized(this) {

		}
	}
}

加在静态方法上

public class Demo {
	//在静态方法上加上synchronized关键字
	public synchronized static void test() {
	}
	//等价于
	public void test() {
		synchronized(Demo.class) {
		}
	}
}
变量的线程安全分析

成员变量和静态变量是否线程安全?

如果它们没有共享,则线程安全

如果它们被共享了,根据它们的状态是否能够改变,又分两种情况

如果只有读操作,则线程安全

如果有读写操作,则这段代码是临界区,需要考虑线程安全

局部变量是否线程安全?

局部变量是线程安全的
但局部变量引用的对象则未必 (要看该对象是否被共享且被执行了读写操作)
如果该对象没有逃离方法的作用范围,它是线程安全的
如果该对象逃离方法的作用范围,需要考虑线程安全

局部变量是线程安全的——每个方法都在对应线程的栈中创建栈帧,不会被其他线程共享

在这里插入图片描述
如果调用的对象被共享,且执行了读写操作,则线程不安全
在这里插入图片描述

3.常见线程安全类

String
Integer
StringBuffer
Random
Vector
Hashtable
java.util.concurrent 包下的类

说他们是线程安全是指:多个线程调用他们同一个实例的某个方法时,是线程安全的。
它们的每个方法是原子的
包括他们多个方法的组合不是原子的,不是线程安全的 例如
在这里插入图片描述
不可变类线程安全性
String Integer 等都是不可变类(重新创建了一个新的对象),因为其内部的状态不可以改变,因此他们的方法都是线程安全的

4.Monitor

Java对象头

这里说到对象头 ,可以先了解一下 new Object()对象的结构 图解

请添加图片描述
new 出来一个新的Object对象的大小,跟虚拟机 与是否开启指针压缩有关 分为以下两种区别:
在开启指针压缩的情况下,markword 占用 8 字节,classpoint 占用 4 字节,Interface data 无数据,总共是 12 字节,由于对象需要为 8 的整数倍,Padding 会补充 4 个字节,总共占用 16 字节的存储空间。

在没有指针的情况下,markword 占用 8 字节,classpoint 占用 8 字节,Interface data 无数据,总共是 16 字节。

以32位虚拟机位为例 java 对象头结构图解
请添加图片描述
MarkWord请添加图片描述

以下为中文的MarkWord结构图:
请添加图片描述

Monitor(锁)

Monitor 为监视器 或者管程 结构图解请添加图片描述
当线程执行到临界区代码时,如果使用了synchronized,会先查询synchronized中所指定的对象(obj)是否绑定了Monitor。

如果没有绑定,则会先去去与Monitor绑定,并且将Owner设为当前线程。
如果已经绑定,则会去查询该Monitor是否已经有了Owner
如果没有,则Owner与将当前线程绑定
如果有,则放入EntryList,进入阻塞状态(blocked)
当Monitor的Owner将临界区中代码执行完毕后,Owner便会被清空,此时EntryList中处于阻塞状态的线程会被叫醒并竞争,此时的竞争是非公平的

注意:
对象在使用了synchronized后与Monitor绑定时,会将对象头中的Mark Word置为Monitor指针。
每个对象都会绑定一个唯一的Monitor,如果synchronized中所指定的对象(obj)不同,则会绑定不同的Monito

synchronize 原理
a.轻量级锁

使用场景: 如果一个对象虽然有多线程访问,但多线程访问的时间是错开的(没有竞争),那么可以使用轻量级锁来优化
轻量级锁对使用者是透明的,即语法还是synchronize

创建锁记录(Lock Record)对象,每个线程的栈帧都会包含一个锁记录对象,内部可以存储锁定对象的mark word(不再一开始就使用Monitor)
在这里插入图片描述

让锁记录中的Object reference指向锁对象(Object),并尝试用cas去替换Object中的mark word,将此mark word放入lock record中保存
在这里插入图片描述

如果cas替换成功,则将Object的对象头替换为锁记录的地址和状态 00(轻量级锁状态),并由该线程给对象加锁
在这里插入图片描述

b.锁膨胀

如果一个线程在给一个对象加轻量级锁时,cas替换操作失败(因为此时其他线程已经给对象加了轻量级锁),此时该线程就会进入锁膨胀过程
在这里插入图片描述
然后便会给对象加上重量级锁(使用Monitor)将对象头的Mark Word改为Monitor的地址,并且状态改为01(重量级锁)
并且该线程放入入EntryList中,并进入阻塞状态(blocked)
在这里插入图片描述

c.自旋优化

重量级锁竞争时,还可以使用自选来优化,如果当前线程在自旋成功(使用锁的线程退出了同步块,释放了锁),这时就可以避免线程进入阻塞状态。
第一种
在这里插入图片描述
第二种
在这里插入图片描述

d.偏向锁 (用于优化轻量级锁重入)

在轻量级锁在没有竞争时,每次重入(该线程执行的方法中再次锁住该对象)操作仍需要cas替换操作,这样是会使性能降低的。因此引入了偏向锁对性能进行优化:在第一次cas时会将线程的ID写入对象的Mark Word中。此后发现这个线程ID就是自己的,就表示没有竞争,就不需要再次cas,以后只要不发生竞争,这个对象就归该线程所有。
在这里插入图片描述
在这里在补充一下 偏向状态 以及 mark Word 的组成部分图解
在这里插入图片描述
Normal:一般状态,没有加任何锁,前面62位保存的是对象的信息,最后2位为状态(01),倒数第三位表示是否使用偏向锁(未使用:0) 最后三位 001
Biased:偏向状态,使用偏向锁,前面54位保存的当前线程的ID,最后2位为状态(01),倒数第三位表示是否使用偏向锁(使用:1) 最后三位 101
Lightweight:使用轻量级锁,前62位保存的是锁记录的指针,最后两位为状态(00
Heavyweight:使用重量级锁,前62位保存的是Monitor的地址指针,后两位为状态(10)

需要注意
如果开启了偏向锁(默认开启),在创建对象时,对象的Mark Word后三位应该是101
但是偏向锁默认是有延迟的,不会再程序一启动就生效,而是会在程序运行一段时间(几秒之后),才会对创建的对象设置为偏向状态

撤销偏向
调用对象的hashCode方法
多个线程使用该对象
调用了wait/notify方法(调用wait方法会导致锁膨胀而使用重量级锁)

批量重偏向
如果对象虽然被多个线程访问,但是线程间不存在竞争,这时偏向T1的对象仍有机会重新偏向T2
重偏向会重置Thread ID
当撤销超过20次后(超过阈值),JVM会觉得是不是偏向错了,这时会在给对象加锁时,重新偏向至加锁线程。

批量撤销
当撤销偏向锁的阈值超过40以后,就会将整个类的对象都改为不可偏向的

锁消除
JIT 即时编译器 会将没有用的锁优化,可以使用命令 不使用锁消除

5.Wait 与 Notify

a 原理图

在这里插入图片描述
锁对象调用wait方法(obj.wait),就会使当前线程进入WaitSet中,变为WAITING状态。

处于BLOCKED和WAITING状态的线程都为阻塞状态,CPU都不会分给他们时间片。但是有所区别:
BLOCKED状态的线程是在竞争对象时,发现Monitor的Owner已经是别的线程了,此时就会进入EntryList中,并处于BLOCKED状态

WAITING状态的线程是获得了对象的锁,但是自身因为某些原因需要进入阻塞状态时,锁对象调用了wait方法而进入了WaitSet中,处于WAITING状态

BLOCKED状态的线程会在锁被释放的时候被唤醒,但是处于WAITING状态的线程只有被锁对象调用了notify方法(obj.notify/obj.notifyAll),才会被唤醒。

在这里插入图片描述

注:只有当对象被锁以后,才能调用wait和notify方法

public class Test02 {
	final static Object LOCK = new Object();
	public static void main(String[] args) throws InterruptedException {
        //只有在对象被锁住后才能调用wait方法
		synchronized (LOCK) {
			LOCK.wait();
		}
	}
}
b、Wait与Sleep的区别

不同点

Sleep是Thread类的静态方法,Wait是Object的方法,Object又是所有类的父类,所以所有类都有Wait方法。
Sleep在阻塞的时候不会释放锁,而Wait在阻塞的时候会释放锁
Sleep不需要与synchronized一起使用,而Wait需要与synchronized一起使用(对象被锁以后才能使用)
相同点

阻塞状态都为TIMED_WAITING

c、优雅地使用wait/notify

什么时候适合使用wait

当线程不满足某些条件,需要暂停运行时,可以使用wait。这样会将对象的锁释放,让其他线程能够继续运行。如果此时使用sleep,会导致所有线程都进入阻塞,导致所有线程都没法运行,直到当前线程sleep结束后,运行完毕,才能得到执行。
使用wait/notify需要注意什么

当有多个线程在运行时,对象调用了wait方法,此时这些线程都会进入WaitSet中等待。如果这时使用了notify方法,可能会造成虚假唤醒(唤醒的不是满足条件的等待线程),这时就需要使用notifyAll方法
synchronized (LOCK) {
while(//不满足条件,一直等待,避免虚假唤醒) {
LOCK.wait();
}
//满足条件后再运行
}

synchronized (LOCK) {
//唤醒所有等待线程
LOCK.notifyAll();
}

6.模式之保护性暂停

在这里插入图片描述
join源码——使用保护性暂停模式 --也可以自己模拟写一个 需要注意避免虚假唤醒

public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
}

7.异步模式之生产者与消费者

在这里插入图片描述

8.park/unpark

a 基本使用

park/unpark都是LockSupport类中的的方法

//暂停线程运行
LockSupport.park;

//恢复线程运行
LockSupport.unpark(thread);
b 特点

与wait/notify的区别

wait,notify 和 notifyAll 必须配合Object Monitor一起使用,而park,unpark不必
park ,unpark 是以线程为单位来阻塞和唤醒线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么精确
park & unpark 可以先 unpark,而 wait & notify 不能先 notify
park不会释放锁,而wait会释放锁

c 原理

每个线程都有一个自己的Park对象,并且该对象_counter, _cond,__mutex组成

先调用park再调用unpark时

先调用park

线程运行时,会将Park对象中的_counter的值设为0;
调用park时,会先查看counter的值是否为0,如果为0,则将线程放入阻塞队列cond中
放入阻塞队列中后,会再次将counter设置为0
然后调用unpark

调用unpark方法后,会将counter的值设置为1

去唤醒阻塞队列cond中的线程

线程继续运行并将counter的值设为0
在这里插入图片描述

在这里插入图片描述
先调用unpark,再调用park
调用unpark
会将counter设置为1(运行时0)
调用park方法
查看counter是否为0
因为unpark已经把counter设置为1,所以此时将counter设置为0,但不放入阻塞队列cond中
在这里插入图片描述

9.线程中的状态转换

在这里插入图片描述

10 多把锁

class BigRoom {
    //额外创建对象来作为锁
	private final Object studyRoom = new Object();
	private final Object bedRoom = new Object();
}

将锁的粒度细分 需要注意多个线程不会相互关联
坏处:如果一个线程获取到多把锁 容易死锁
好处:可以争强并发

11 活跃性

定义

因为某种原因,使得代码一直无法执行完毕,这样的现象叫做活跃性

死锁

有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁

如:t1线程获得A对象 锁,接下来想获取B对象的锁t2线程获得B对象锁,接下来想获取A对象的锁

 public static void main(String[] args) {
        final Object A = new Object();
        final Object B = new Object();
        new Thread(()->{
            synchronized (A) {
                System.out.println("lock A");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (B) {
                    System.out.println("lock B");
                    System.out.println("操作");
                }
            }
        }).start();

        new Thread(()->{
            synchronized (B) {
                System.out.println("lock B");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (A) {
                    System.out.println("lock A");
                    System.out.println("操作");

                }
            }
        }).start();
    }

发生死锁的必要条件
互斥条件
在一段时间内,一种资源只能被一个进程所使用
请求和保持条件
进程已经拥有了至少一种资源,同时又去申请其他资源。因为其他资源被别的进程所使用,该进程进入阻塞状态,并且不释放自己已有的资源
不可抢占条件
进程对已获得的资源在未使用完成前不能被强占,只能在进程使用完后自己释放
循环等待条件
发生死锁时,必然存在一个进程——资源的循环链

定位死锁的方法
jps+jstack ThreadID

在JAVA控制台中的Terminal中输入jps指令可以查看运行中的线程ID,使用jstack ThreadID可以查看线程状态。

在这里插入图片描述

F:\Thread_study>jps
20672 RemoteMavenServer36
22880 Jps
4432 Launcher
5316 Test5
20184 KotlinCompileDaemon
11132

F:\Thread_study>jstack 5316

jconsole检测死锁
在这里插入图片描述
在这里插入图片描述
哲学家就餐问题

在这里插入图片描述
避免死锁的方法
在线程使用锁对象时,顺序加锁即可避免死锁
在这里插入图片描述

活锁

活锁出现在两个线程互相改变对方的结束条件,后谁也无法结束。

避免活锁的方法
在线程执行时,中途给予不同的间隔时间即可。

死锁与活锁的区别
死锁是因为线程互相持有对象想要的锁,并且都不释放,最后到时线程阻塞,停止运行的现象。
活锁是因为线程间修改了对方的结束条件,而导致代码一直在运行,却一直运行不完的现象。

饥饿

某些线程因为优先级太低,导致一直无法获得资源的现象。

在使用顺序加锁时,可能会出现饥饿现象

12 ReentrantLock

与synchronized相比具有的的特点
可中断
可以设置超时时间
可以设置为公平锁 (先到先得)
支持多个条件变量( 具有多个waitset)

基本语法

public class Test05 {
   private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {
        //获取锁
        lock.lock();
        try {
        //需要执行的代码

        } finally {
            //释放锁
            lock.unlock();
        }
        
    }
}

可重入
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁
如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡

可打断
如果某个线程处于阻塞状态,可以调用其interrupt方法让其停止阻塞,获得锁失败

简而言之就是 :处于阻塞状态的线程,被打断了就不用阻塞了,直接停止运行

 public static void main(String[] args) throws InterruptedException {
       Thread t1 = new Thread(() -> {
            try {
                //如果没有竞争那么此方法就会获取 lock 对象锁
                //如果有竞争就进入阻塞对象,可以被其他线程用 Interrup 打断
                System.out.println("尝试获取锁");
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println("没有获取锁,返回");
                return;
            }
            try {
                System.out.println("获取到锁");
            } finally {
                //释放锁
                lock.unlock();
            }

        },"t1");
        lock.lock();
        t1.start();
        sleep(1);
        System.out.println("打断 t1");
        t1.interrupt();
        }

锁超时
使用lock.tryLock方法会返回获取锁是否成功。如果成功则返回true,反之则返回false。

并且tryLock方法可以指定等待时间,参数为:tryLock(long timeout, TimeUnit unit), 其中timeout为最长等待时间,TimeUnit为时间单位

简而言之就是:获取失败了、获取超时了或者被打断了,不再阻塞,直接停止运行

不设置等待时间 代码

public static void main(String[] args) throws InterruptedException {
   ReentrantLock lock = new ReentrantLock();
   Thread t1 = new Thread(() -> {
       //未设置等待时间,一旦获取失败,直接返回false
       if (!lock.tryLock()){
           System.out.println("获取失败");
           return;
       }
       System.out.println("有了锁");
       lock.unlock();

    },"t1");
    lock.lock();

    try {
        t1.start();
        sleep(300);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }finally {
      

  lock.unlock();
        }

设置等待时间

    public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();
        Thread t1 = new Thread(() -> {

            try {
                //未设置等待时间,一旦获取失败,直接返回false
                if (!lock.tryLock(5, TimeUnit.SECONDS)) {
                    System.out.println("获取失败");
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("有了锁");
            lock.unlock();

        }, "t1");
        lock.lock();

        try {
            t1.start();
            sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

公平锁
在线程获取锁失败,进入阻塞队列时,先进入的会在锁被释放后先获得锁。这样的获取方式就是公平的

//默认是不公平锁,需要在创建时指定为公平锁
ReentrantLock lock = new ReentrantLock(true);

条件变量
synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入waitSet 等待

ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比
a、synchronize是 所以不满足条件的线程在一起休息
b、ReentrantLock支持多个地方休息

使用要点
await 前需要获取锁
await 执行后,会释放锁,进入 conditionObejct 等待
await 的线程被唤醒 (或打断、或超时)取重新竞争lock锁
竞争lock锁成功后。从await后执行

 static Boolean flag = false;

    public static void main(String[] args) throws InterruptedException {
         ReentrantLock lock = new ReentrantLock();
        //获取条件变量
        Condition condition = lock.newCondition();
        new Thread(() -> {
            lock.lock();
            try {
                while (!flag) {
                    System.out.println("不满足条件 等...");
                    condition.await();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("执行完毕");
                lock.unlock();
            }
        }, "t1").start();

        new Thread(() -> {
            lock.lock();
            try {
                Thread.sleep(3);
                flag = true;
                //释放
                condition.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }, "t2").start();
    }

13 同步模式之顺序控制

Wait/Notify

static final Object Lock = new Object();
    //判断先执行的内容是否执行完毕
    static Boolean flag = false;
    public static void main(String[] args) throws InterruptedException {

        new Thread(() -> {
            synchronized (Lock) {
                while (!flag) {
                    try {
                        Lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("2");
            }
        }).start();
        new Thread(() -> {
            synchronized (Lock) {
                try {
                    Thread.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("1");
                flag = true;
                //执行完毕,唤醒所有等待的线程
                Lock.notify();
            }
        }).start();
    }

交替输出
wait/notify 版本

 static Nub nub = new Nub();
    public static void main(String[] args) throws InterruptedException {

        new Thread(() -> {
            nub.run("a",1,2);
        }).start();

        new Thread(() -> {
            nub.run("b",2,3);

        }).start();
        new Thread(() -> {

            nub.run("c",3,1);
        }).start();

    }

    public static class Nub {
        public synchronized void run(String str, int flag, int nextFlag) {
            for(int i=0; i<loopNumber; i++) {
                while(flag != this.flag) {
                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(str+flag);
                //设置下一个运行的线程标记
                this.flag = nextFlag;
                //唤醒所有线程
                this.notifyAll();
            }
        }

        /**
         * 线程的执行标记, 1->a 2->b 3->c
         */
        private int flag = 1;
        private int loopNumber = 5;

        public int getFlag() {
            return flag;
        }

        public void setFlag(int flag) {
            this.flag = flag;
        }

        public int getLoopNumber() {
            return loopNumber;
        }

        public void setLoopNumber(int loopNumber) {
            this.loopNumber = loopNumber;
        }
    }

await/signal版本

public static void main(String[] args) throws InterruptedException {
           Nub nub = new Nub();
          Condition conditionA = nub.newCondition();
          Condition conditionB = nub.newCondition();
          Condition conditionC = nub.newCondition();
         ReentrantLock lock = new ReentrantLock();
         //涉及思路为 一个线程一个休息室
        new Thread(() -> {
            nub.run("a",conditionA,conditionB);
        }).start();

        new Thread(() -> {
            nub.run("b",conditionB,conditionC);

        }).start();
        new Thread(() -> {

            nub.run("c",conditionC,conditionA);
        }).start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        nub.lock();
        try {
            //唤醒一个等待的线程
            System.out.println("开始。。。");
            conditionA.signal();
        }finally {
            nub.unlock();
        }

    }

    public static class Nub  extends ReentrantLock{
        public  void run(String str, Condition thiscondition,  Condition nextcondition) {
            for(int i=0; i<loopNumber; i++) {
                lock();
                    try {
                        //全部进入等待状态
                        thiscondition.await();
                        System.out.println(str);
                        nextcondition.signal();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        unlock();
                    }
            }
        }

        /**
         * 线程的执行标记, 1->a 2->b 3->c
         */
        private int flag = 1;
        private int loopNumber = 5;

        public int getFlag() {
            return flag;
        }

        public void setFlag(int flag) {
            this.flag = flag;
        }

        public int getLoopNumber() {
            return loopNumber;
        }

        public void setLoopNumber(int loopNumber) {
            this.loopNumber = loopNumber;
        }
    }

14 ThreadLocal 与 InheritableThreadLocal

ThreadLocal

作用
ThreadLocal是JDK包提供的,ThreadLocal的作用主要是做数据隔离填充的数据只属于当前线程,变量的数据对别的线程而言是相对隔离的,在多线程环境下,如何防止自己的变量被其它线程篡改。

使用场景
Spring采用Threadlocal的方式,来保证单个线程中的数据库操作使用的是同一个数据库连接,同时,采用这种方式可以使业务层使用事务时不需要感知并管理connection对象,通过传播级别,巧妙地管理多个事务配置之间的切换,挂起和恢复。

Spring框架里面就是用的ThreadLocal来实现这种隔离,主要是在TransactionSynchronizationManager这个类里面

private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);

 private static final ThreadLocal<Map<Object, Object>> resources =
   new NamedThreadLocal<>("Transactional resources");

 private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
   new NamedThreadLocal<>("Transaction synchronizations");

 private static final ThreadLocal<String> currentTransactionName =
   new NamedThreadLocal<>("Current transaction name");

Spring的事务主要是ThreadLocal和AOP去做实现的,这里提一下,大家知道每个线程自己的链接是靠ThreadLocal保存的就好了

public static void main(String[] args) throws 
InterruptedException {
        //创建ThreadLocal变量
        ThreadLocal<String> stringThreadLocal = new ThreadLocal<>();
        ThreadLocal<Nub> nubThreadLocal = new ThreadLocal<>();

        //创建两个线程 分别使用 两个Thr 变量
        Thread t1 = new Thread(() -> {
            stringThreadLocal.set("t1 start");
            stringThreadLocal.set("t1 end");
            stringThreadLocal.set("1111");
            nubThreadLocal.set(new Nub(1, 2));
            //取值
            System.out.println(stringThreadLocal.get());
            System.out.println(nubThreadLocal.get());
            //移除
            nubThreadLocal.remove();
            System.out.println(nubThreadLocal.get());

        });

        Thread t2 = new Thread(() -> {
            try {
                Thread.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            stringThreadLocal.set("t2 start");
            stringThreadLocal.set("t2 end");
            stringThreadLocal.set("t2 33333");
            nubThreadLocal.set(new Nub(3, 4));
            //取值
            System.out.println(stringThreadLocal.get());
            System.out.println(nubThreadLocal.get().toString());

        });

        //启动线程
        t1.start();
        t2.start();

    }

    static class Nub {

        private int flag = 1;
        private int loopNumber = 5;

        public Nub(int flag, int loopNumber) {
            this.flag = flag;
            this.loopNumber = loopNumber;
        }

        public int getFlag() {
            return flag;
        }

        public void setFlag(int flag) {
            this.flag = flag;
        }

        public int getLoopNumber() {
            return loopNumber;
        }

        public void setLoopNumber(int loopNumber) {
            this.loopNumber = loopNumber;
        }

        @Override
        public String toString() {
            return "Nub{" +
                    "flag=" + flag +
                    ", loopNumber=" + loopNumber +
                    '}';
        }
    }
1111
Nub{flag=1, loopNumber=2}
null
t2 33333
Nub{flag=3, loopNumber=4}

由此可知:
每个线程中的ThreadLocal变量是每个线程私有的,而不是共享的
ThreadLocal其实就相当于其泛型类型的一个变量,只不过是每个线程私有的
stringThreadLocal被赋值了多次,保存最后一次赋值的结果
ThreadLocal可以进行以下几个操作
set 设置值
get 取出值
remove 移除值

原理
Thread中的threadLocals

public class Thread implements Runnable {
 ...

 ThreadLocal.ThreadLocalMap threadLocals = null;
 ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
 ...
}
static class ThreadLocalMap {
    static class Entry extends WeakReference<ThreadLocal<?>> {
        /** The value associated with this ThreadLocal. */
        Object value;

        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }

Thread类中有一个threadLocals和一个inheritableThreadLocals,它们都是ThreadLocalMap类型的变量,而ThreadLocalMap是一个定制化的Hashmap。在默认情况下,每个线程中的这两个变量都为null。此处先讨论threadLocals,inheritableThreadLocals放在后面讨论

ThreadLocal中的方法
set方法

public void set(T value) {
    // 获取当前线程
    Thread t = Thread.currentThread();
    
    // 获得ThreadLocalMap对象 
    // 这里的get会返回Thread类中的threadLocals
    ThreadLocalMap map = getMap(t);
    
    // 判断map是否已经创建,没创建就创建并放入值,创建了就直接放入
    if (map != null)
        // ThreadLocal自生的引用作为key,传入的值作为value
        map.set(this, value);
    else
        createMap(t, value);
}
如果未创建

void createMap(Thread t, T firstValue) {
    // 创建的同时设置想放入的值
    // hreadLocal自生的引用作为key,传入的值作为value
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

get方法

public T get() {
    // 获取当前线程
    Thread t = Thread.currentThread();
	// 获取当前线程的threadLocals变量
    ThreadLocalMap map = getMap(t);
    
    // 判断threadLocals是否被初始化了
    if (map != null) {
        // 已经初始化则直接返回
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    // 否则就创建threadLocals
    return setInitialValue();
}
private T setInitialValue() {
    // 这个方法返回是null
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    
    // 无论map创建与否,最终value的值都为null
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
    return value;
}
protected T initialValue() {
    return null;
}

remove方法

public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        // 如果threadLocals已经被初始化,则移除
        m.remove(this);
}

在每个线程内部都有一个名为threadLocals的成员变量,该变量的类型为HashMap,其中key为我们定义的ThreadLocal变量的this引用,value则为我们使用set方法设置的值。每个线程的本地变量存放在线程自己的内存变量threadLocals中

只有当前线程第一次调用ThreadLocal的set或者get方法时才会创建threadLocals(inheritableThreadLocals也是一样)。其实每个线程的本地变量不是存放在ThreadLocal实例里面,而是存放在调用线程的threadLocals变量里面

底层结构

static class ThreadLocalMap {

        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }
        ……
    }

通过Map可知他的数据结构其实是很像HashMap的,但是看源码可以发现,它并未实现Map接口,而且他的Entry是继承WeakReference(弱引用)的,也没有看到HashMap中的next,所以不存在链表了。
请添加图片描述

InheritableThreadLocal

在ThreadLocal的源码可以看出,无论是set、get、还是remove,都是相对于当前线程操作的

由此ThreadLocal无法从父线程传向子线程,所以InheritableThreadLocal出现了,它能够让父线程中ThreadLocal的值传给子线程。

废话不多说上码

      public static void main(String[] args) throws InterruptedException {
        //创建ThreadLocal变量
        ThreadLocal<String> stringThreadLocal = new ThreadLocal<>();
        InheritableThreadLocal<String> InThreadLocal = new InheritableThreadLocal<>();
        stringThreadLocal.set("stringThreadLocal");
        InThreadLocal.set("InheritableThreadLocal");

        //创建两个线程 分别使用 两个Thr 变量
        new Thread(() -> {
            //取值
            System.out.println(stringThreadLocal.get());
            System.out.println(InThreadLocal.get());

        }).start();
    }

结论

null
InheritableThreadLocal

InheritableThreadLocal的值成功从主线程传入了子线程,而ThreadLocal则没有

原理

public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    // 传入父线程中的一个值,然后直接返回
    protected T childValue(T parentValue) {
        return parentValue;
    }

  	// 返回传入线程的inheritableThreadLocals
    // Thread中有一个inheritableThreadLocals变量
    // ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

 	// 创建一个inheritableThreadLocals
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

由如上代码可知,InheritableThreadLocal继承了ThreadLocal,并重写了三个方法。InheritableThreadLocal重写了createMap方法,那么现在当第一次调用set方法时,创建的是当前线程的inheritableThreadLocals变量的实例而不再是threadLocals。当调用getMap方法获取当前线程内部的map变量时,获取的是inheritableThreadLocals而不再是threadLocals

childValue(T parentValue)方法的调用
在主函数运行时,会调用Thread的默认构造函数(创建主线程,也就是父线程),所以我们先看看Thread的默认构造函数

public Thread() {
    init(null, null, "Thread-" + nextThreadNum(), 0);
}
private void init(ThreadGroup g, Runnable target, String name,
                  long stackSize, AccessControlContext acc,
                  boolean inheritThreadLocals) {
   	...
        
	// 获得当前线程的,在这里是主线程
    Thread parent = currentThread();
   
    ...
    
    // 如果父线程的inheritableThreadLocals存在
    // 我们在主线程中调用set和get时,会创建inheritableThreadLocals
    if (inheritThreadLocals && parent.inheritableThreadLocals != null)
        // 设置子线程的inheritableThreadLocals
        this.inheritableThreadLocals =
            ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
    
    /* Stash the specified stack size in case the VM cares */
    this.stackSize = stackSize;

    /* Set thread ID */
    tid = nextThreadID();
}
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
    return new ThreadLocalMap(parentMap);
}

在createInheritedMap内部使用父线程的inheritableThreadLocals变量作为构造函数创建了一个新的ThreadLocalMap变量,然后赋值给了子线程的inheritableThreadLocals变量

private ThreadLocalMap(ThreadLocalMap parentMap) {
    Entry[] parentTable = parentMap.table;
    int len = parentTable.length;
    setThreshold(len);
    table = new Entry[len];

    for (int j = 0; j < len; j++) {
        Entry e = parentTable[j];
        if (e != null) {
            @SuppressWarnings("unchecked")
            ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
            if (key != null) {
                // 这里调用了 childValue 方法
                // 该方法会返回parent的值
                Object value = key.childValue(e.value);
                
                Entry c = new Entry(key, value);
                int h = key.threadLocalHashCode & (len - 1);
                while (table[h] != null)
                    h = nextIndex(h, len);
                table[h] = c;
                size++;
            }
        }
    }
}

在该构造函数内部把父线程的inheritableThreadLocals成员变量的值复制到新的ThreadLocalMap对象中

结论
InheritableThreadLocal类通过重写getMap和createMap,让本地变量保存到了具体线程的inheritableThreadLocals变量里面,那么线程在通过InheritableThreadLocal类实例的set或者get方法设置变量时,就会创建当前线程的inheritableThreadLocals变量。

当父线程创建子线程时,构造函数会把父线程中inheritableThreadLocals变量里面的本地变量复制一份保存到子线程的inheritableThreadLocals变量里面。

三.共享模型之内存

1.Java内存模型

JMM 即 Java Memory Model,它定义了主存(共享内存)、工作内存(线程私有)抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、 CPU 指令优化等。

JMM体现在以下几个方面

  • 原子性 - 保证指令不会受到线程上下文切换的影响
  • 可见性 - 保证指令不会受 cpu 缓存的影响
  • 有序性 - 保证指令不会受 cpu 指令并行优化的影响

2.可见性

退不出的循环
代码:

static boolean flag = true;
public static void main(String[] args) throws InterruptedException {
   Thread t1 = new Thread(()->{
       while (flag){
           //.. 如果为 run为真,则一直执行
           System.out.println();
       }
   });
    t1.start();
    Thread.sleep(100);
    System.out.println("改变run的值为false");
    flag = false;
}

为什么无法退出该循环

  • 初始状态, t 线程刚开始从主内存读取了 run 的值到工作内存。
    在这里插入图片描述
  • 因为 t 线程要频繁从主内存中读取 run 的值,JIT 编译器会将 run 的值缓存至自己工作内存中的高速缓存中, 减少对主存中 run的访问,提高效率
    在这里插入图片描述
  • 1 秒之后,main 线程修改了 run 的值,并同步至主存,而 t 是从自己工作内存中的高速缓存中读取这个变量 的值,结果永远是旧值
    在这里插入图片描述
    解决方法
  • 使用volatile易变关键字,它可以用来修饰成员变量和静态成员变量(放在主存中的变量),他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作volatile 变量都是直接操作主存
volatile static boolean flag = true;
public static void main(String[] args) throws InterruptedException {
   Thread t1 = new Thread(()->{
       while (flag){
           //.. 如果为 run为真,则一直执行
           System.out.println();
       }
   });
    t1.start();
    Thread.sleep(100);
    System.out.println("改变run的值为false");
    flag = false;
}

可见性与原子性

  • 前面例子体现的实际就是可见性,它保证的是在多个线程之间,一个线程对volatile变量的修改对另一个线程可见,不能保证原子性,仅用在一个写线程,多个读线程的情况
  • 注意 synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性。
  • 但缺点是 synchronized 是属于重量级操作,性能相对更低。

如果在前面示例的死循环中加入 System.out.println() 会发现即使不加 volatile 修饰符,线程 t 也能正确看到 对 run 变量的修改了,想一想为什么?
因为使用了synchronized关键字

public void println(String y) {
		//使用了synchronized关键字
        synchronized (this) {
            print(y);
            newLine();
        }
    }

同步模式之犹豫模式
定义

Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做 了,直接结束返回

  • 用一个标记来判断该任务是否已经被执行过了
  • 需要避免线程安全问题
  • 加锁的代码块要尽量的小,以保证性能
 
 public class Test15 {
    public static void main(String[] args) throws InterruptedException {
        Monitor monitor = new Monitor();
        monitor.start();
        monitor.start();
        Thread.sleep(3500);
        monitor.stop();
    }
}

class Monitor {

    Thread monitor;
    //设置标记,用于判断是否被终止了
    private volatile boolean stop = false;
    //设置标记,用于判断是否已经启动过了
    private boolean starting = false;
    /**
     * 启动监控器线程
     */
    public void start() {
        //上锁,避免多线程运行时出现线程安全问题
        synchronized (this) {
            if (starting) {
                //已被启动,直接返回
                return;
            }
            //启动监视器,改变标记
            starting = true;
        }
        //设置线控器线程,用于监控线程状态
        monitor = new Thread() {
            @Override
            public void run() {
                //开始不停的监控
                while (true) {
                    if(stop) {
                        System.out.println("处理后续任务");
                        break;
                    }
                    System.out.println("监控器运行中...");
                    try {
                        //线程休眠
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        System.out.println("被打断了");
                    }
                }
            }
        };
        monitor.start();
    }

    /**
     * 	用于停止监控器线程
     */
    public void stop() {
        //打断线程
        monitor.interrupt();
        stop = true;
    }
}

3、有序性

指令重排
  • JVM 会在不影响正确性的前提下,可以调整语句的执行顺序
    在这里插入图片描述

这种特性称之为『指令重排』,多线程下『指令重排』会影响正确性。

指令重排序优化

事实上,现代处理器会设计为一个时钟周期完成一条执行时间长的 CPU 指令。为什么这么做呢?可以想到指令还可以再划分成一个个更小的阶段,例如,每条指令都可以分为: 取指令 - 指令译码 - 执行指令 - 内存访问 - 数据写回 这5 个阶段

在这里插入图片描述

不改变程序结果的前提下,这些指令的各个阶段可以通过重排序和组合来实现指令级并行

指令重排的前提是,重排指令不能影响结果,例如

// 可以重排的例子 
int a = 10; 
int b = 20; 
System.out.println( a + b );

// 不能重排的例子 
int a = 10;
int b = a - 5;
支持流水线的处理器

现代 CPU 支持多级指令流水线,例如支持同时执行 取指令 - 指令译码 - 执行指令 - 内存访问 - 数据写回 的处理器,就可以称之为五级指令流水线。这时 CPU 可以在一个时钟周期内,同时运行五条指令的不同阶段(相当于一 条执行时间长的复杂指令),IPC = 1,本质上,流水线技术并不能缩短单条指令的执行时间,但它变相地提高了指令地吞吐率。
在这里插入图片描述
在多线程环境下,指令重排序可能导致出现意料之外的结果

解决办法
volatile 修饰的变量,可以禁用指令重排
禁止的是加volatile关键字变量之前的代码被重排序

内存屏障

可见性

  • 写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
  • 读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中新数据

有序性

  • 写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
  • 读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
volatile 原理

volatile的底层实现原理是内存屏障,Memory Barrier(Memory Fence)

  • 对 volatile 变量的写指令后会加入写屏障
  • 对 volatile 变量的读指令前会加入读屏障
如何保证可见性

写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
在这里插入图片描述

读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中新数据
在这里插入图片描述

如何保证有序性

写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
在这里插入图片描述

读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
在这里插入图片描述
在这里插入图片描述
不能解决指令交错问题,也就是说不能保障原子性

  1. 写屏障仅仅是保证之后的读能够读到新的结果,但不能保证读跑到它前面去
  2. 而有序性的保证也只是保证了本线程内相关代码不被重排序
实现原理之Lock前缀

有volatile变量修饰的共享变量进行写操作的时候会多出第二行汇编代码,通过查IA-32架构软件开发者手册可知,Lock前缀的指令在多核处理器下会引发了两件事

Lock前缀指令会引起处理器缓存回写到内存

Lock前缀指令导致在执行指令期间,声言处理器的LOCK#信号。在多处理器环境中,LOCK#信号确保在声言该信号期间,处理器可以独占任何共享内存。但是,在最近的处理器里,LOCK #信号一般不锁总线,而是锁缓存,毕竟锁总线开销的比较大。使用缓存一致性机制来确保修改的原子性,此操作被称为“缓存锁定”,缓存一致性机制会阻止同时修改由两个以上处理器缓存的内存区域数据

一个处理器的缓存回写到内存会导致其他处理器的缓存无效

在多核处理器系统中进行操作的时候,IA-32和Intel 64处理器能嗅探其他处理器访问系统内存和它们的内部缓存。处理器使用嗅探技术保证它的内部缓存、系统内存和其他处理器的缓存的数据在总线上保持一致

有个练习题 觉得可以防着有空可以多看看就放着
请添加图片描述

四.共享模型之无锁

1、无锁解决线程安全问题

可以使用原子整数 AtomicInteger
AtomicInteger i = new AtomicInteger(0);

interface Account {
	Integer getBalance();

	void withdraw(Integer amount);

	/**
	 * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作     * 如果初始余额为 10000 那么正确的结果应当是 0
	 */
	static void demo(Account account) {
		List<Thread> ts = new ArrayList<>();
		long start = System.nanoTime();
		for (int i = 0; i < 1000; i++) {
			ts.add(new Thread(() -> {
				account.withdraw(10);
			}));
		}
		ts.forEach(Thread::start);
		ts.forEach(t -> {
			try {
				t.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		});
		long end = System.nanoTime();
		System.out.println(account.getBalance() + " cost: " + (end - start) / 1000_000 + " ms");
	}
}

//线程不安全的做法
class AccountUnsafe implements Account {
	private Integer balance;

	public AccountUnsafe(Integer balance) {
		this.balance = balance;
	}


	@Override
	public Integer getBalance() {
		return this.balance;
	}

	@Override
	public synchronized void withdraw(Integer amount) {
		balance -= amount;
	}

	public static void main(String[] args) {
		Account.demo(new AccountUnsafe(10000));
		Account.demo(new AccountCas(10000));
	}
}

//线程安全的做法
class AccountCas implements Account {
	//使用原子整数
	private AtomicInteger balance;

	public AccountCas(int balance) {
		this.balance = new AtomicInteger(balance);
	}

	@Override
	public Integer getBalance() {
		//得到原子整数的值
		return balance.get();
	}

	@Override
	public void withdraw(Integer amount) {
		while(true) {
			//获得修改前的值
			int prev = balance.get();
			//获得修改后的值
			int next = prev-amount;
			//比较并设值
			if(balance.compareAndSet(prev, next)) {
				break;
			}
		}
	}
}

2、CAS与volatile

使用 AtomicInteger 可以解决线程安全问题,它没有用锁来保护共享变量的线程安全,让我们来看看他的原理吧

原理是 compareAndSwap(比较并设置值),它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。
在这里插入图片描述
代码逻辑
当一个线程要去修改Account对象中的值时,先获取值pre(调用get方法),然后再将其设置为新的值next(调用cas方法)。在调用cas方法时,会将pre与Account中的余额进行比较。
如果两者相等,就说明该值还未被其他线程修改,此时便可以进行修改操作。
如果两者不相等,就不设置值,重新获取值pre(调用get方法),然后再将其设置为新的值next(调用cas方法),直到修改成功为止。
重点

  • CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交换】的原子性。
  • 在多核状态下,某个核执行到带 lock 的指令时,CPU会让总线锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。

volatile
获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。
它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取 它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。

重点
volatile 仅仅保证了共享变量的可见性,让其它线程能够看到新值,但不能解决指令交错问题(不能保证原子性)
CAS 必须借助 volatile 才能读取到共享变量的新值来实现【比较并交换】的效果

效率问题
一般情况下,使用无锁比使用加锁的效率更高。

CAS特点
结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。

  • CAS 是基于乐观锁的思想:乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。 synchronized是基于悲观锁的思想:悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
  • CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思因为没有使用
    synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一,但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响

3、原子整数

J.U.C 并发包里有
AtomicBoolean
AtomicInteger
AtomicLong

这里用AtomicInteger为例

AtomicInteger i = new AtomicInteger(0);
System.out.println(i.incrementAndGet());//++i 1
System.out.println(i.getAndIncrement());//i++ 1
System.out.println(i.decrementAndGet());//--i
System.out.println(i.getAndDecrement());//i--

    System.out.println(i.getAndAdd(5));//获取并加值 (i = 0, 结果 i = 5, 返回 0)
    System.out.println(i.getAndAdd(-5)); //加值并获取(i = 5, 结果 i = 0, 返回 0)
   // 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
   // 函数中的操作能保证原子,但函数需要无副作用
    i.getAndUpdate(p -> p - 2);

    // 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
    // 函数中的操作能保证原子,但函数需要无副作用
    System.out.println(i.updateAndGet(p -> p + 2));
    // 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
    // 其中函数中的操作能保证原子,但函数需要无副作用 // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
    // getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是
     System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));

    // 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0)
    // 其中函数中的操作能保证原子,但函数需要无副作用
    System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));

4、原子引用

public interface DecimalAccount {
	BigDecimal getBalance();

	void withdraw(BigDecimal amount);

	/**
	 * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作    
     * 如果初始余额为 10000 那么正确的结果应当是 0
	 */
	static void demo(DecimalAccountImpl account) {
		List<Thread> ts = new ArrayList<>();
		long start = System.nanoTime();
		for (int i = 0; i < 1000; i++) {
			ts.add(new Thread(() -> {
				account.withdraw(BigDecimal.TEN);
			}));
		}
		ts.forEach(Thread::start);
		ts.forEach(t -> {
			try {
				t.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		});
		long end = System.nanoTime();
		System.out.println(account.getBalance() + " cost: " + (end - start) / 1000_000 + " ms");
	}
}

class DecimalAccountImpl implements DecimalAccount {
	//原子引用,泛型类型为小数类型
	AtomicReference<BigDecimal> balance;

	public DecimalAccountImpl(BigDecimal balance) {
		this.balance = new AtomicReference<BigDecimal>(balance);
	}

	@Override
	public BigDecimal getBalance() {
		return balance.get();
	}

	@Override
	public void withdraw(BigDecimal amount) {
		while(true) {
			BigDecimal pre = balance.get();
			BigDecimal next = pre.subtract(amount);
			if(balance.compareAndSet(pre, next)) {
				break;
			}
		}
	}

	public static void main(String[] args) {
		DecimalAccount.demo(new DecimalAccountImpl(new BigDecimal("10000")));
	}
}

5、ABA问题

public class Test18 {
	static AtomicReference<String> str = new AtomicReference<>("A");
	public static void main(String[] args) {
		new Thread(() -> {
			String pre = str.get();
			System.out.println("change");
			try {
				other();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			//把str中的A改为C
			System.out.println("change A->C " + str.compareAndSet(pre, "C"));
		}).start();
	}

	static void other() throws InterruptedException {
		new Thread(()-> {
			System.out.println("change A->B " + str.compareAndSet("A", "B"));
		}).start();
		Thread.sleep(500);
		new Thread(()-> {
			System.out.println("change B->A " + str.compareAndSet("B", "A"));
		}).start();
	}
}

在这里插入图片描述

主线程仅能判断出共享变量的值与初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程希望:
只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号

AtomicStampedReference
public class Test19 {
    //指定版本号
    static AtomicStampedReference<String> str = new AtomicStampedReference<>("A", 0);
    public static void main(String[] args) {
        new Thread(() -> {
            String pre = str.getReference();
            //获得版本号
            int stamp = str.getStamp();
            System.out.println("change");
            try {
                other();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //把str中的A改为C,并比对版本号,如果版本号相同,就执行替换,并让版本号+1
            System.out.println("change A->C stamp " + stamp + str.compareAndSet(pre, "C", stamp, stamp+1));
        }).start();
    }

    static void other() throws InterruptedException {
        new Thread(()-> {
            int stamp = str.getStamp();
            System.out.println("change A->B stamp " + stamp + str.compareAndSet("A", "B", stamp, stamp+1));
        }).start();
        Thread.sleep(500);
        new Thread(()-> {
            int stamp = str.getStamp();
            System.out.println("change B->A stamp " + stamp +  str.compareAndSet("B", "A", stamp, stamp+1));
        }).start();
    }
}
AtomicMarkableReference

AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A -> C ,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。
但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了 AtomicMarkableReference

public class Test19{
	//指定版本号
	static AtomicMarkableReference<String> str = new AtomicMarkableReference<>("A", true);
	public static void main(String[] args) {
		new Thread(() -> {
			String pre = str.getReference();
			System.out.println("change");
			try {
				other();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			//把str中的A改为C,并比对版本号,如果版本号相同,就执行替换,并让版本号+1
			System.out.println("change A->C mark " +  str.compareAndSet(pre, "C", true, false));
		}).start();
	}

	static void other() throws InterruptedException {
		new Thread(() -> {
			System.out.println("change A->A mark " + str.compareAndSet("A", "A", true, false));
		}).start();
	}
}

两者的区别

  • AtomicStampedReference 需要我们传入整型变量作为版本号,来判定是否被更改过
  • AtomicMarkableReference需要我们传入布尔变量作为标记,来判断是否被更改过

6、原子数组

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

7、原子更新器

  • AtomicReferenceFieldUpdater // 域 字段
  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdate

原子更新器用于帮助我们改变某个对象中的某个属性

public class Test {
   public static void main(String[] args) {
      Student student = new Student();
       
      // 获得原子更新器
      // 泛型
      // 参数1 持有属性的类 参数2 被更新的属性的类
      // newUpdater中的参数:第三个为属性的名称
      AtomicReferenceFieldUpdater<Student, String> updater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
       
      // 修改
      updater.compareAndSet(student, null, "Nyima");
      System.out.println(student);
   }
}

class Student {
   volatile String name;

   @Override
   public String toString() {
      return "Student{" +
            "name='" + name + '\'' +
            '}';
   }
}

原子更新器初始化过程

从上面的例子可以看出,原子更新器是通过newUpdater来获取实例的。其中传入了三个参数

  • 拥有属性的类的Class
  • 属性的Class
  • 属性的名称
    大概可以猜出来,初始化过程用到了反射,让我们看看源码来验证一下这个猜测。

newUpdater方法

public static <U,W> AtomicReferenceFieldUpdater<U,W> newUpdater(Class<U> tclass,
                                                                Class<W> vclass,
                                                                String fieldName) {
    // 返回了一个AtomicReferenceFieldUpdaterImpl实例
    return new AtomicReferenceFieldUpdaterImpl<U,W>
        (tclass, vclass, fieldName, Reflection.getCallerClass());
}

内部实现类

在这里插入图片描述
AtomicReferenceFieldUpdater为抽象类,该类内部有一个自己的实现类AtomicReferenceFieldUpdaterImpl
在这里插入图片描述
构造方法

tomicReferenceFieldUpdaterImpl(final Class<T> tclass,
                                final Class<V> vclass,
                                final String fieldName,
                                final Class<?> caller) {
    // 用于保存要被修改的属性
    final Field field;
    
    // 属性的Class
    final Class<?> fieldClass;
    
    // field的修饰符
    final int modifiers;
    try {
        // 反射获得属性
        field = AccessController.doPrivileged(
            new PrivilegedExceptionAction<Field>() {
                public Field run() throws NoSuchFieldException {
                    // tclass为传入的属性的Class,可以通过它来获得属性
                    return tclass.getDeclaredField(fieldName);
                }
            });
        
        // 获得属性的修饰符,主要用于判断
        // 1、vclass 与 属性确切的类型是否匹配
        // 2、是否为引用类型
        // 3、被修改的属性是否加了volatile关键字
        modifiers = field.getModifiers();
        sun.reflect.misc.ReflectUtil.ensureMemberAccess(
            caller, tclass, null, modifiers);
        ClassLoader cl = tclass.getClassLoader();
        ClassLoader ccl = caller.getClassLoader();
        if ((ccl != null) && (ccl != cl) &&
            ((cl == null) || !isAncestor(cl, ccl))) {
            sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
        }
        
        // 获得属性类的Class
        fieldClass = field.getType();
    } catch (PrivilegedActionException pae) {
        throw new RuntimeException(pae.getException());
    } catch (Exception ex) {
        throw new RuntimeException(ex);
    }

    if (vclass != fieldClass)
        throw new ClassCastException();
    if (vclass.isPrimitive())
        throw new IllegalArgumentException("Must be reference type");

    if (!Modifier.isVolatile(modifiers))
        throw new IllegalArgumentException("Must be volatile type");

    // Access to protected field members is restricted to receivers only
    // of the accessing class, or one of its subclasses, and the
    // accessing class must in turn be a subclass (or package sibling)
    // of the protected member's defining class.
    // If the updater refers to a protected field of a declaring class
    // outside the current package, the receiver argument will be
    // narrowed to the type of the accessing class.
 	// 对类中的属性进行初始化
    this.cclass = (Modifier.isProtected(modifiers) &&
                   tclass.isAssignableFrom(caller) &&
                   !isSamePackage(tclass, caller))
                  ? caller : tclass;
    this.tclass = tclass;
    this.vclass = vclass;
    // 获得偏移量
    this.offset = U.objectFieldOffset(field);
}

原子引用更新器确实使用了反射

8、LongAdder原理

原理之伪共享

在这里插入图片描述

缓存行伪共享得从缓存说起
缓存与内存的速度比较
在这里插入图片描述
在这里插入图片描述
因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)
缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中
CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效

在这里插入图片描述
因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因 此缓存行可以存下 2 个的 Cell 对象。这样问题来了:

  • Core-0 要修改 Cell[0]
  • Core-1 要修改 Cell[1]
    无论谁修改成功,都会导致对方 Core 的缓存行失效,

比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加 Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效

@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的 padding(空白),从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效
在这里插入图片描述
累加主要调用以下方法

public void add(long x) {
       Cell[] as; long b, v; int m; Cell a;
       if ((as = cells) != null || !casBase(b = base, b + x)) {
           boolean uncontended = true;
           if (as == null || (m = as.length - 1) < 0 ||
               (a = as[getProbe() & m]) == null ||
               !(uncontended = a.cas(v = a.value, v + x)))
               longAccumulate(x, null, uncontended);
       }
   }

累加流程图
在这里插入图片描述

9、Unsaf

Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得

public class Test19 {
    //指定版本号
    static AtomicStampedReference<String> str = new AtomicStampedReference<>("A", 0);
    public static void main(String[] args) {
        new Thread(() -> {
            String pre = str.getReference();
            //获得版本号
            int stamp = str.getStamp();
            System.out.println("change");
            try {
                other();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //把str中的A改为C,并比对版本号,如果版本号相同,就执行替换,并让版本号+1
            System.out.println("change A->C stamp " + stamp + str.compareAndSet(pre, "C", stamp, stamp+1));
        }).start();
    }

    static void other() throws InterruptedException {
        new Thread(()-> {
            int stamp = str.getStamp();
            System.out.println("change A->B stamp " + stamp + str.compareAndSet("A", "B", stamp, stamp+1));
        }).start();
        Thread.sleep(500);
        new Thread(()-> {
            int stamp = str.getStamp();
            System.out.println("change B->A stamp " + stamp +  str.compareAndSet("B", "A", stamp, stamp+1));
        }).start();
    }
}

五、共享模型之不可变

1、不可变

如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改。

2、不可变设计

String类中不可变的体现

public final class String
    implements java.io.Serializable, Comparable<String>, CharSequence {
    /** The value is used for character storage. */
    private final char value[];

    /** Cache the hash code for the string */
    private int hash; // Default to 0
    
   //....
  }
}
final 的使用

发现该类、类中所有属性都是 final

属性用 final 修饰保证了该属性是只读的,不能修改
类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性

保护性拷贝
使用字符串时,也有一些跟修改相关的方法啊,比如 substring 等,那么下面就看一看这些方法是 如何实现的,就以 substring 为例

public String substring(int beginIndex) {
        if (beginIndex < 0) {
            throw new StringIndexOutOfBoundsException(beginIndex);
        }
        int subLen = value.length - beginIndex;
        if (subLen < 0) {
            throw new StringIndexOutOfBoundsException(subLen);
        }
    	//返回的是一个新的对象
        return (beginIndex == 0) ? this : new String(value, beginIndex, subLen);
    }

发现其内部是调用 String 的构造方法创建了一个新字符串

public String(char value[], int offset, int count) {
        if (offset < 0) {
            throw new StringIndexOutOfBoundsException(offset);
        }
        if (count <= 0) {
            if (count < 0) {
                throw new StringIndexOutOfBoundsException(count);
            }
            if (offset <= value.length) {
                this.value = "".value;
                return;
            }
        }
        // Note: offset or count might be near -1>>>1.
        if (offset > value.length - count) {
            throw new StringIndexOutOfBoundsException(offset + count);
        }
        this.value = Arrays.copyOfRange(value, offset, offset+count);
    }

构造新字符串对象时,会生成新的 char[] value,对内容进行复制 。这种通过创建副本对象来避免共享的手段称之为保护性拷贝(defensive copy)

六、共享模型之工具 (线程池与JUC并发包)

1.自定义线程池

在这里插入图片描述

  • 阻塞队列中维护了由主线程(或者其他线程)所产生的的任务
  • 主线程类似于生产者,产生任务并放入阻塞队列中
  • 线程池类似于消费者,得到阻塞队列中已有的任务并执
public class Test01 {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2,  TimeUnit.SECONDS, 1, 4);
        for (int i = 0; i < 10; i++) {
            threadPool.execute(()->{
                try {
                    TimeUnit.SECONDS.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务正在执行!");
            });
        }
    }
}


/**
 * 自定义线程池
 */
class ThreadPool {
    /**
     * 自定义阻塞队列
     */
    private BlockingQueue<Runnable> blockingQueue;

    /**
     * 核心线程数
     */
    private int coreSize;

    private HashSet<Worker> workers = new HashSet<>();

    /**
     * 用于指定线程最大存活时间
     */
    private TimeUnit timeUnit;
    private long timeout;

    /**
     * 工作线程类
     * 内部封装了Thread类,并且添加了一些属性
     */
    private class Worker extends Thread {
        Runnable task;

        public Worker(Runnable task) {
            System.out.println("初始化任务");
            this.task = task;
        }

        @Override
        public void run() {
            // 如果有任务就执行
            // 如果阻塞队列中有任务,就继续执行
            while (task != null || (task = blockingQueue.take()) != null) {
                try {
                    System.out.println("执行任务");
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 任务执行完毕,设为空
                    System.out.println("任务执行完毕");
                    task = null;
                }
            }
            // 移除任务
            synchronized (workers) {
                System.out.println("移除任务");
                workers.remove(this);
            }
        }
    }

    public ThreadPool(int coreSize, TimeUnit timeUnit, long timeout, int capacity) {
        this.coreSize = coreSize;
        this.timeUnit = timeUnit;
        blockingQueue = new BlockingQueue<>(capacity);
        this.timeout = timeout;
    }

    public void execute(Runnable task) {
        synchronized (workers) {
            // 创建任务
            // 池中还有空余线程时,可以运行任务
            // 否则阻塞
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                System.out.println("线程池中线程已用完,请稍等");
                blockingQueue.put(task);
            }
        }
    }
}

/**
 * 阻塞队列
 * 用于存放主线程或其他线程产生的任务
 */
class BlockingQueue<T> {
    /**
     * 阻塞队列
     */
    private  Deque<T> blockingQueue;

    /**
     * 阻塞队列容量
     */
    private int capacity;

    /**
     * 锁
     */
    private ReentrantLock lock;

    /**
     * 条件队列
     */
    private Condition fullQueue;
    private Condition emptyQueue;


    public BlockingQueue(int capacity) {
        blockingQueue = new ArrayDeque<>(capacity);
        lock = new ReentrantLock();
        fullQueue = lock.newCondition();
        emptyQueue = lock.newCondition();
        this.capacity = capacity;
    }

    /**
     * 获取任务的方法
     */
    public T take() {
        // 加锁
        lock.lock();
        try {
            // 如果阻塞队列为空(没有任务),就一直等待
            while (blockingQueue.isEmpty()) {
                try {
                    emptyQueue.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 获取任务并唤醒生产者线程
            T task = blockingQueue.removeFirst();
            fullQueue.signalAll();
            return task;
        } finally {
            lock.unlock();
        }
    }

    public T takeNanos(long timeout, TimeUnit unit) {
        // 转换等待时间
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout);
            while (blockingQueue.isEmpty()) {
                try {
                    // awaitNanos会返回剩下的等待时间
                    nanos = emptyQueue.awaitNanos(nanos);
                    if (nanos < 0) {
                        return null;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T task = blockingQueue.removeFirst();
            fullQueue.signalAll();
            return task;
        } finally {
            lock.unlock();
        }
    }

    /**
     * 放入任务的方法
     * @param task 放入阻塞队列的任务
     */
    public void put(T task) {
        lock.lock();
        try {
            while (blockingQueue.size() == capacity) {
                try {
                    System.out.println("阻塞队列已满");
                    fullQueue.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            blockingQueue.add(task);
            // 唤醒等待的消费者
            emptyQueue.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public int getSize() {
        lock.lock();
        try {
            return blockingQueue.size();
        } finally {
            lock.unlock();
        }
    }
}
2. ThreadPoolExecutor

继承图
请添加图片描述

a.线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
请添加图片描述
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作
进行赋值

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }
b.线程池属性
// 工作线程,内部封装了Thread
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    ...
}

// 阻塞队列,用于存放来不及被核心线程执行的任务
private final BlockingQueue<Runnable> workQueue;

// 锁
private final ReentrantLock mainLock = new ReentrantLock();

//  用于存放核心线程的容器,只有当持有锁时才能够获取其中的元素(核心线程)
private final HashSet<Worker> workers = new HashSet<Worker>();
c.构造方法及参数
构造方法
public ThreadPoolExecutor(int corePoolSize,
 int maximumPoolSize,
 long keepAliveTime,
 TimeUnit unit,
 BlockingQueue<Runnable> workQueue,
 ThreadFactory threadFactory,
 RejectedExecutionHandler handler)
 
// 阻塞队列,用于存放来不及被核心线程执行的任务
private final BlockingQueue<Runnable> workQueue;

// 锁
private final ReentrantLock mainLock = new ReentrantLock();

//  用于存放核心线程的容器,只有当持有锁时才能够获取其中的元素(核心线程)
private final HashSet<Worker> workers = new HashSet<Worker>();
  • corePoolSize 核心线程数目 (最多保留的线程数)
  • maximumPoolSize 最大线程数目
  • keepAliveTime生存时间 - 针对救急线程
  • unit 时间单位 - 针对救急线程
  • workQueue 阻塞队列
    有界阻塞队列 ArrayBlockingQueue
    无界阻塞队列 LinkedBlockingQueue
    最多只有一个同步元素的 SynchronousQueue
    优先队列 PriorityBlockingQueue
  • threadFactory 线程工厂 -可以为线程创建时起个好名字
  • handler 拒绝策略
工作方式

当一个任务传给线程池以后,可能有以下几种可能
将任务分配给一个核心线程来执行
核心线程都在执行任务,将任务放到阻塞队列workQueue中等待被执行
阻塞队列满了,使用救急线程来执行任务
救急线程用完以后,超过生存时间(keepAliveTime)后会被释放
任务总数大于了 最大线程数(maximumPoolSize)与阻塞队列容量的最大值(workQueue.capacity),使用拒接策略

拒绝策略

如果线程达到maximumPoolSize 仍然有新任务 这时会执行拒绝策略。拒绝策略JDK提供了4种实现,其他框架也有自己的现实
JDK:

  • AbortPolicy 让调用者抛出RejectedExecutioException 异常(默认策略)
  • CallerRunsPolicy 让调用者运行任务
  • DiscardPolicy 放弃本次任务
  • DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之

其他:
Dubbo的实现,在抛出RejectedExecutioException 异常之前会记录日志。并dump 线程栈信息,方便定位问题

ActiveMQ的实现,带超时等待(60s)尝试放入队列。

FixedThreadPool(固定大小线程池)
public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads,
 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue<Runnable>());
}

特点

  • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
  • 阻塞队列是无界的,可以放任意数量的任务

评价 适用于任务量已知,相对耗时的任务

代码如下

public class TestFixedThreadPool {
    public static void main(String[] args) {
        // 自定义线程工厂
        ThreadFactory factory = new ThreadFactory() {
            AtomicInteger atomicInteger = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "myThread_" + atomicInteger.getAndIncrement());
            }
        };
        // 创建核心线程数量为3的线程池
        // 通过 ThreadFactory可以给线程添加名字
        ExecutorService executorService = Executors.newFixedThreadPool(3, factory);

        // 任务
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName());
                System.out.println("this is fixedThreadPool");
            }
        };

        executorService.execute(runnable);
    }

}
CachedThreadPool(带缓存线程池)
public static ExecutorService newCachedThreadPool() {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
 60L, TimeUnit.SECONDS,
 new SynchronousQueue<Runnable>());
}
ExecutorService executorService = Executors.newCachedThreadPool();

特点
核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
全部都是救急线程(60s 后可以回收)
救急线程可以无限创建队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
评价 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线
程。 适合任务数比较密集,但每个任务执行时间较短的情况

阻塞队列使用的是SynchronousQueue
SynchronousQueue是一种特殊的队列没有容量,没有线程来取是放不进去的 只有当线程取任务时,才会将任务放入该阻塞队列中

newSingleThreadExecutor(单线程池)
public static ExecutorService newSingleThreadExecutor() {
 return new FinalizableDelegatedExecutorService
 (new ThreadPoolExecutor(1, 1,
 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue<Runnable>()));
}

特点:
内部调用了new ThreadPoolExecutor的构造方法,传入的corePoolSize和maximumPoolSize都为1。
然后将该对象传给了FinalizableDelegatedExecutorService。该类修饰了ThreadPoolExecutor,
让外部无法调用ThreadPoolExecutor内部的某些方法来修改所创建的线程池的大小。
使用场景
希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程
也不会被释放。

提交任务

请添加图片描述
execute()方法

execute(Runnable command)

传入一个Runnable对象,执行其中的run方法

submit()方法

Future<T> submit(Callable<T> task)

传入一个Callable对象,用Future来捕获返回值

关闭线程池

shutdown()

/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
public void shutdown() {
 final ReentrantLock mainLock = this.mainLock;
 mainLock.lock();
 try {
 checkShutdownAccess();
 // 修改线程池状态
 advanceRunState(SHUTDOWN);
 // 仅会打断空闲线程
 interruptIdleWorkers();
 onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
 } finally {
 mainLock.unlock();
 }
 // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
 tryTerminate();
}

shutdownNow()

/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
public List<Runnable> shutdownNow() {
 List<Runnable> tasks;
 final ReentrantLock mainLock = this.mainLock;
 mainLock.lock();
 try {
 checkShutdownAccess();
 // 修改线程池状态
 advanceRunState(STOP);
 // 打断所有线程
 interruptWorkers();
 // 获取队列中剩余任务
 tasks = drainQueue();
 } finally {
 mainLock.unlock();
 }
 // 尝试终结
 tryTerminate();
 return tasks; }

其他方法

// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事
情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

Tomcat 线程池

在Tomcat 中Connector 连接器部分使用到了线程池 如图
请添加图片描述

  • LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
  • Acceptor只负责【接收新的 socket 连接】
  • Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
  • 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
  • Executor线程池中的工作线程最终负责【处理请求】

Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同
如果总线程数达到 maximumPoolSize这时不会立刻抛 RejectedExecutionException 异常
而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常

Connector 配置
请添加图片描述
Executor 线程配置
请添加图片描述

3.J.U.C(java并发工具包)
a. AQS 原理

请添加图片描述
AQS 的基本思想其实很简单
获取锁的逻辑

while(state 状态不允许获取) {
 if(队列中还没有此线程) {
 入队并阻塞
 }
}
当前线程出队

释放锁的逻辑

if(state 状态允许了) {
 恢复阻塞的线程(s) 
 }

要点

  • 原子维护 state 状态
  • 阻塞及恢复线程
  • 维护队列

state 设计

  • state 使用 volatile 配合 cas 保证其修改时的原子性
  • state 使用了 32bit int来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想
b. ReentrantLock 原理

公平锁:多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。

优点:所有的线程都能得到资源,不会饿死在队列中。
缺点:吞吐量会下降很多,队列里面除了第一个线程,其他的线程都会阻塞,cpu唤醒阻塞线程的开销会很大。

非公平锁:多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。
优点:可以减少CPU唤醒线程的开销,整体的吞吐效率会高点,CPU也不必取唤醒所有线程,会减少唤起线程的数量。
缺点:你们可能也发现了,这样可能导致队列中间的线程一直获取不到锁或者长时间获取不到锁,导致饿死。

ReentrantLock非公平锁实现原理(默认为非公平锁)

加锁解锁流程先从构造器开始看,默认为非公平锁实现 NonfairSync 继承自 AQS

c. 读写锁
ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。 类似于数据库中的 select …
from … lock in share mode
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
 private ReentrantReadWriteLock.ReadLock r = rw.readLock();
 private ReentrantReadWriteLock.WriteLock w = rw.writeLock();

注意事项

  • 读锁不支持条件变量
  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
  • 重入时降级支持:即持有写锁的情况下去获取读锁
StampedLock

该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用

加解读锁

long stamp = lock.readLock();
lock.unlockRead(stamp);

加解写锁

long stamp = lock.writeLock();
lock.unlockWrite(stamp);

乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通
过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
 // 锁升级
}
d. 线程安全集合类概述

请添加图片描述

线程安全集合类可以分为三大类:
遗留的线程安全集合如 Hashtable , Vector
使用 Collections 装饰的线程安全集合,如:

  • Collections.synchronizedCollection
  • Collections.synchronizedList
  • Collections.synchronizedMap
  • Collections.synchronizedSet
  • Collections.synchronizedNavigableMap
  • Collections.synchronizedNavigableSet
  • Collections.synchronizedSortedMap
  • Collections.synchronizedSortedSet

java.util.concurrent.*
重点介绍 java.util.concurrent.* 下的线程安全集合类,可以发现它们有规律,里面包含三类关键词:
Blocking、CopyOnWrite、Concurrent
Blocking 大部分实现基于锁,并提供用来阻塞的方法
CopyOnWrite 之类容器修改开销相对较重
Concurrent 类型的容器
内部很多操作使用 cas 优化,一般可以提供较高吞吐量

缺点:
弱一致性
遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍
历,这时内容是旧的
求大小弱一致性,size 操作未必是 100% 准确
读取弱一致性
遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出
ConcurrentModificationException,不再继续遍历

关于JUC并发包方面 写的比较少 有空还会补充起来

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

java晋级赛 深入并发编程 的相关文章

随机推荐

  • 最大流算法 - 标号法

    标号法求最大流 图论中网络的相关概念见上篇博客 算法基本思想 从某个初始流开始 重复地增加流的值到不能再改进为止 则最后所得的流将是一个最大流 为此 不妨将每条边上的流量设置为0作为初始流量 为了增加给定流量的值 我们必须找出从发点到收点的
  • python3.11安装, 解决pip is configured with locations that require TLS/SSL问题

    系统 centos7 4 虚拟机 python版本 本机自带的2 7 5 以及参考python安装的python3 11 pip版本 本机自带的8 1 2 参考pip安装 升级升级到了20 3 4 pip3版本为22 3 1 openssl
  • FPGA实战小项目2

    基于FPGA的贪吃蛇游戏 基于FPGA的贪吃蛇游戏 基于fpga的数字密码锁ego1 基于fpga的数字密码锁ego1 基于fpga的数字时钟 basys3 基于fpga的数字时钟 basys3
  • 正点原子STM32(基于HAL库)4

    目录 ADC 实验 ADC 简介 单通道ADC 采集实验 ADC 寄存器 硬件设计 程序设计 下载验证 单通道ADC 采集 DMA 读取 实验 ADC DMA 寄存器 硬件设计 程序设计 下载验证 多通道ADC 采集 DMA 读取 实验 A
  • 图的邻接矩阵与邻接表的建立,c/c++描述

    图里数据节点是多对多的关系 一个节点有多个前驱 也有多个后继 甚至还有无向图 不区分前驱和后继 只需要节点之间有邻接关系即可 因此 描述这种数据关系 需要新的数据结构 图有顶点集和边集组成 顶点代表一个数据节点 边代表数据顶点间的邻接关系
  • impdp参数+impdp交互模式的命令列表

    impdp参数 1 help 是否显示用于导入的联机帮助 2 exclude 排除特定的对象豢型 3 directory 让转储文件 日志文件和sql文件使用的目录对象 4 dumpfile 需要导入的转储文件的列表 5 include 包
  • c语言如何定义标识符 常量 变量,标识符、常量和变量

    该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 1 单选题 C语言主要是借助以下哪个功能来实现程序模块化 A 定义函数 B 定义常量和外部变量 C 三种基本结构语句 D 丰富的数据类型 参考答案 A 参考解析 C语言用函数实现软件的模块化设计
  • 小程序原理

    开发过一段时间小程序了 对于我们现在使用的业务来说 使用小程序开发上手很快 所以反思了一下 那么小程序的原理到底是怎么样的呢 我自己总结一下 小程序的架构 官网原话 当小程序基于 WebView 环境下时 WebView 的 JS 逻辑 D
  • deepfake教程

    https github com iperov DeepFaceLab 首先下载根据不同系统不同显卡分类下载对应版本 能在文件夹下看到以下命令 一 clear workspace 重置 一 提取帧 extract images from v
  • k8s占用的端口号用 kubectl get svc 和lsof -i、netstat 命令都查不到

    如果你使用了 kubectl get svc 和 lsof i 或 netstat 命令查看端口 却没有查到 Kubernetes 的服务 有可能是因为 Kubernetes 服务运行在容器内部 在这种情况下 你可以通过以下步骤来查看 Ku
  • 音视频 ffmpeg ffplay ffprobe命令行

    ffmpeg工具 命令格式 ffmpeg 全局选项 输入选项 i input url 输出选项 output url 帮助命令 查看解封装帮助 dhav ffmpeg4 2才有 ffmpeg h demuxer dhav ffmpeg h
  • 黄广斌谈ELM进展:为深度学习提供理论支持, 将勾连生物学习

    强大的深度神经网络 仍有很多待解决的问题 超限学习机 ELM 发明人 新加坡南阳理工大学副教授黄广斌认为 ELM能够有效地拓展神经网络的理论和算法 近日 黄广斌发表文章 超限学习机 筑梦普适学习和普适智能 Extreme learning
  • 【数据结构】堆的向上调整和向下调整以及相关方法

    文章目录 一 堆的概念 二 堆的性质 三 堆的分类 1 大根堆 2 小根堆 四 说明 五 堆的结构 六 堆的向上调整 1 图示 2 代码实现 3 时间复杂度分析 七 堆的向下调整 1 思路 2 代码实现 八 删除根 1 思路 2 代码实现
  • CentOS6 YUM 源失效问题解决办法

    问题描述 Yum 源失效 无法正常使用 Yum 错误信息如下 http mirrors aliyun com centos 6 updates x86 64 repodata repomd xml Errno 14 PYCURL ERROR
  • CentOs 6.5下java 安装

    我们下载jdk 的rpm包到要安装的服务器上 然后要进行下面的工作 1 移除系统自带的jdk 1 查找系统自带的jdk版本 输入命令 rpm qa grep jdk 2 移除系统自带的jdk 输入命令 yum y remove java 1
  • fatal: Not a git repository (or any of the parent directories): .git

    问题描述 解决方案
  • js之事件委托

    在js的事件流模型中 事件的触发分为3个阶段 1 捕获阶段 由外向内传播 寻找目标元素 2 目标阶段 找到事件触发的目标元素 3 冒泡阶段 事件由内向外冒泡 事件委托也被称为事件代理 那么是事件委托呢 用一个例子来说明 div div di
  • 万字详解:Activiti 工作流引擎

    点击上方 芋道源码 选择 设为星标 管她前浪 还是后浪 能浪的浪 才是好浪 每天 10 33 更新文章 每天掉亿点点头发 源码精品专栏 原创 Java 2021 超神之路 很肝 中文详细注释的开源项目 RPC 框架 Dubbo 源码解析 网
  • BART论文要点解读:看这篇就够了

    全称 Denoising Sequence to Sequence Pre training for Natural Language Generation Translation and Comprehension BART来源于Bidi
  • java晋级赛 深入并发编程

    根据黑马java并发编程学习做的笔记 传送门 https www bilibili com video BV16J411h7Rd p 15 java晋级赛 深入并发编程 一 多线程基础 进程与线程 创建线程的方式及运行原理 创建线程的方式