1 JUC
JUC就是java.util .concurrent工具包的简称。这是一个处理线程的工具包,JDK 1.5开始出现的
1 传统的synchronized
public class Synchronized {
public static void main(String[] args) {
//并发 : 多个线程同时操作同一个资源类
// 使用lambda表达式进行解耦,将资源类丢入线程的执行
Person person = new Person();
new Thread(()->{
for (int i = 0 ;i < 60 ; i ++){
person.sale();
}
}).start();
new Thread(()->{
for (int i = 0 ;i < 60 ; i ++){
person.sale();
}
}).start();
new Thread(()->{
for (int i = 0 ;i < 60 ; i ++){
person.sale();
}
}).start();
}
}
// 加锁时需要判断条件,进行锁定
// 使用lambda表达式进行解耦
//资源类
class Person{
private int numbers =50 ;
public synchronized void sale(){
if(numbers > 0) {
System.out.println("" + Thread.currentThread().getName() + "买了第" + numbers-- + "还剩" + numbers + "张");
}
}
}
2 Lock 接口
常用 可重入锁
public class Synchronized {
public static void main(String[] args) {
//并发 : 多个线程同时操作同一个资源类
// 使用lambda表达式进行解耦,将资源类丢入线程的执行
Person person = new Person();
new Thread(()->{
for (int i = 0 ;i < 60 ; i ++){
person.sale();
}
}).start();
new Thread(()->{
for (int i = 0 ;i < 60 ; i ++){
person.sale();
}
}).start();
new Thread(()->{
for (int i = 0 ;i < 60 ; i ++){
person.sale();
}
}).start();
}
}
// 加锁时需要判断条件,进行锁定
// 使用lambda表达式进行解耦
//资源类
/*
Lock
* 1 new ReentrantLock();
2 加锁 lock.lock();
* 3 代码包裹在try块 之内
* 4 在 finally 块 解锁 lock.unlock();
*
* */
class Person{
private int numbers =50 ;
Lock lock = new ReentrantLock();
public synchronized void sale(){
lock.lock();
try {
if(numbers > 0) {
System.out.println("" + Thread.currentThread().getName() + "买了第" + numbers-- + "还剩" + numbers + "张");
}
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
Lock
- 1 new ReentrantLock();
2 加锁 lock.lock();
- 3 代码包裹在try块 之内
- 4 在 finally 块 解锁 lock.unlock();
两种锁机制的不同
1 synchronized 是Java内置关键字,Lock 是一个接口
2 synchronized 无法判断是否获取锁,Lock 可以判断是否获取锁
3 synchronized 可以自动释放锁,Lock 必须手动释放锁,如果不释放就会造成死锁
4 同一个锁对象,线程A synchronized获取之后,线程B只能等待,造成阻塞,Lock 并不会等待
5 synchronized 可重入锁,不可中断,非公平锁,Lock 可重入锁,可以判断,非公平锁(可设置)
6 synchronized 适合少量同步代码,Lock 适合大量同步代码
消费者和生产者问题
1 传统的synchronized
//消费生产三步骤
// 等待 业务 通知
public class SynchronizedPc {
public static void main(String[] args) {
Data d = new Data();
new Thread(()->{
try {
for (int i = 0; i <10 ; i++) {
d.increment();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
d.decrement();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
}
}
class Data{
private int number ;
public synchronized void increment() throws InterruptedException {
if(number != 0){
this.wait();
}
number++;
this.notifyAll();
System.out.println(Thread.currentThread().getName()+"->"+number);
}
public synchronized void decrement() throws InterruptedException {
if(number == 0){
this.wait();
}
number--;
this.notifyAll();
System.out.println(Thread.currentThread().getName()+"->"+number);
}
}
多个生产者和消费者的情况下:
等待时使用if判断,会造成虚假唤醒,因为if语句只判断一次,当生产者进行等待释放锁之后,消费者进行消费,消费完成之后,唤醒了所有生产者线程,生产者线程在释放锁之后的代码继续执行则跳过了if判断(wait()方法释放锁之后再获取锁会在释放锁之后的代码继续执行)
解决办法 : 使用while循环 ,进行循环判断
//消费生产三步骤
// 等待 业务 通知
class Data{
private int number ;
public synchronized void increment() throws InterruptedException {
while (number != 0){
this.wait();
}
number++;
this.notifyAll();
System.out.println(Thread.currentThread().getName()+"->"+number);
}
public synchronized void decrement() throws InterruptedException {
while (number == 0){
this.wait();
}
number--;
this.notifyAll();
System.out.println(Thread.currentThread().getName()+"->"+number);
}
}
Lock 实现
public class SynchronizedPc {
public static void main(String[] args) {
Data2 d = new Data2();
new Thread(()->{
try {
for (int i = 0; i <30 ; i++) {
d.increment();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
for (int i = 0; i < 30; i++) {
d.decrement();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
new Thread(()->{
try {
for (int i = 0; i <30 ; i++) {
d.increment();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"C").start();
new Thread(()->{
try {
for (int i = 0; i <30 ; i++) {
d.decrement();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"D").start();
}
}
class Data2{
private int number ;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void increment() throws InterruptedException {
//流程
lock.lock(); //1 加锁
try { //2 try块包裹业务代码
while (number != 0){
condition.await();
}
number++;
condition.signalAll();
System.out.println(Thread.currentThread().getName()+"->"+number);
} finally {
lock.unlock(); //3 finally块 解锁
}
}
public void decrement() throws InterruptedException {
lock.lock();
try {
while (number == 0){
condition.await();
}
number--;
condition.signalAll();
System.out.println(Thread.currentThread().getName()+"->"+number);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
Lock 锁比传统的synchronized 多精准唤醒功能
public class SynchronizedPc {
public static void main(String[] args) {
Data3 d = new Data3();
new Thread(()->{
try {
for (int i = 0; i <30 ; i++) {
d.incrementA();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
for (int i = 0; i < 30; i++) {
d.decrementB();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
new Thread(()->{
try {
for (int i = 0; i <30 ; i++) {
d.incrementC();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"C").start();
new Thread(()->{
try {
for (int i = 0; i <30 ; i++) {
d.decrementD();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"D").start();
}
}
class Data3{
private int number ;
private Lock lock = new ReentrantLock();
private Condition condition1= lock.newCondition();
private Condition condition2= lock.newCondition();
private Condition condition3 = lock.newCondition();
private Condition condition4 = lock.newCondition();
public void incrementA() throws InterruptedException {
//流程
lock.lock(); //1 加锁
try { //2 try块包裹业务代码
while (number != 0){
condition1.await();
}
number++;
condition2.signal();
System.out.println(Thread.currentThread().getName()+"->"+number);
} finally {
lock.unlock(); //3 finally块 解锁
}
}
public void decrementB() throws InterruptedException {
lock.lock();
try {
while (number == 0){
condition2.await();
}
number--;
condition3.signal();
System.out.println(Thread.currentThread().getName()+"->"+number);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void incrementC() throws InterruptedException {
//流程
lock.lock(); //1 加锁
try { //2 try块包裹业务代码
while (number != 0){
condition3.await();
}
number++;
condition4.signal();
System.out.println(Thread.currentThread().getName()+"->"+number);
} finally {
lock.unlock(); //3 finally块 解锁
}
}
public void decrementD() throws InterruptedException {
lock.lock();
try {
while (number == 0){
condition4.await();
}
number--;
condition1.signal();
System.out.println(Thread.currentThread().getName()+"->"+number);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
精准唤醒总结:有几个线程就有几个监视器,由不同的监视器来唤醒不同的线程;A个监视器在A线程等待时,在其他线程使用A监视器唤醒时就会唤醒等待的A线程
3 八锁现象
锁有两种
1 new 出来的实例对象
2 Class 类模板
synchronized :锁的是方法的调用者
static : 锁的是Class 类对象
总结:当线程之间的锁对象不同时,按照时间来进行判断那一个先执行;当线程之间的锁对象相同时,有阻塞现象产生,先获取锁对象的先执行
4 线程不安全集合解决方法
4.1 List
4.2 Set
4.3 Map
5 Callable 与 Thread
细节:
1 FutureTask是Runable接口的实现类,FutureTask可以接受Callable接口,所以FutureTask相当于Thread类和Callable接口的适配类
2 Callable 接口中的call()方法有返回值,也可以抛出异常
3 Callable接口的call()方法有缓存
4 返回值可能会阻塞
6 JUC辅助类(必会)
6.1
6.2
6.3
7 ReadWriteLock接口
ReadWriteLock(唯一实现类)维护一对关联的locks ,一个用于只读操作,一个用于写入。 read lock可以由多个阅读器线程同时进行,只要没有作者。 write lock是独家的
8 阻塞队列
BlockingQueue
使用场景:线程池,多线程并发处理
四组API
9 线程池
池化技术:某一个资源频繁的创建和销毁,非常耗费内存;
将资源重复利用,减少资源频繁的创建和销毁
优势
1 降低资源的消耗
2 提高响应速度
3 方便管理
9.1 三大方法
9.2 七大参数
源码分析
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
底层都是用的ThreadPoolExecuto类
七大参数
var1 :核心线程数
var2 :最大核心线程数
var3 :超时时间(超时了没人调用就会释放)
var5 :超时时间单位
var6 :阻塞队列
var7 :线程工厂 , 一般不变
var8 :拒绝策略
public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, ThreadFactory var7, RejectedExecutionHandler var8) {
this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
this.mainLock = new ReentrantLock();
this.workers = new HashSet();
this.termination = this.mainLock.newCondition();
if (var1 >= 0 && var2 > 0 && var2 >= var1 && var3 >= 0L) {
if (var6 != null && var7 != null && var8 != null) {
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = var1;
this.maximumPoolSize = var2;
this.workQueue = var6;
this.keepAliveTime = var5.toNanos(var3);
this.threadFactory = var7;
this.handler = var8;
} else {
throw new NullPointerException();
}
} else {
throw new IllegalArgumentException();
}
}
七大参数
var1 :核心线程数 ;一开始开启的线程数量
var2 :最大核心线程数 ;当阻塞队列满了之后,开启的剩下的核心线程
var3 :超时时间 ;剩下的核心线程数超时了没人调用就会释放
var5 :超时时间单位
var6 :阻塞队列
var7 :线程工厂
var8 :拒绝策略
var1 :核心线程数
var2 :最大核心线程数
选择策略
1 CPU 密集型 :电脑有多少核,最大核心线程数就写多少
// 获取系统的CPU核数
System.out.println(Runtime.getRuntime().availableProcessors());
2 IO 密集型 :判断程序中非常耗费IO的线程有多少个,最大核心线程数一般设为IO线程的两倍
10 四大函数式接口(必须掌握)
Function 接口使用
Predict 接口
实例
Consumer 接口
Supplier 接口
实例
Supplier<String> stringSupplier = new Supplier<String>() {
@Override
public String get() {
return Thread.currentThread().getName();
}
};
Supplier<String> stringSupplier2= ()->{
return "hello";
};
System.out.println(stringSupplier.get());
System.out.println(stringSupplier2.get());
作用 : 简化编程模型
11 Stream流式计算
实例
12 ForkJoin
JDK1.7之后,并发执行,高效率,大数据量
原理:把大任务拆分成小任务
特点:工作窃取,当一个线程先执行完成任务之后,此线程不会等待而是会将另一个线程的任务窃取过来执行
实例
import java.util.concurrent.RecursiveTask;
public class Fork extends RecursiveTask<Long> {
private Long start;
private Long end;
private Long temp;
private Long sum;
private Long mid;
public Fork(Long start, Long end, Long temp) {
this.start = start;
this.end = end;
this.temp = temp;
}
@Override
protected Long compute() {
if ((end - start) < temp){
Long sum =0l;
for (Long i = start; i <= end; i++) {
sum+=i;
}
return sum;
}else {
mid = (end + start) / 2 ;
Fork fork1 = new Fork(start, mid, temp);
fork1.fork(); //拆分任务,将任务加入方法栈
Fork fork2 = new Fork(mid+1, end, temp);
fork2.fork(); //拆分任务,将任务加入方法栈
return fork1.join()+fork2.join(); //每个小任务的结果返回
}
}
}
package com.kuangsheng.Forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class ForkTest {
private long sum = 0l;
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkTest test = new ForkTest();
//test.test1();
test.test2();
// test.test3();
}
public void test1(){
long l = System.currentTimeMillis();
for (Long i = 0l; i <= 10_0000_0000; i++) {
sum+=i;
}
long l1 = System.currentTimeMillis();
System.out.println("时间"+(l1 - l)+"------------->"+"sum"+sum);
}
public void test2() throws ExecutionException, InterruptedException {
long l = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
Fork fork = new Fork(0l, 10_0000_0000l, 10000l);
ForkJoinTask<Long> sub = pool.submit(fork);
sum = sub.get();
long l1 = System.currentTimeMillis();
System.out.println("时间"+(l1 - l)+"------------->"+"sum"+sum);
}
public void test3(){
long l = System.currentTimeMillis();
// Stream流 并行计算
sum = LongStream.rangeClosed(0l, 10_0000_0000).parallel().reduce(0, Long::sum);
long l1 = System.currentTimeMillis();
System.out.println("时间"+(l1 - l)+"------------->"+"sum"+sum);
}
}
13 异步回调
future: 对未来的方法进行建模
实例
1 没有返回值
1.1 当异步回调没有阻塞时,会按照程序顺序执行
1.2 当异步回调有阻塞时,会先对异步回调的线程发送请求,不要求立即返回结果,发送请求之后就往下执行,当需要异步回调的数据时再返回结果
2 有返回值
14 JMM
线程 工作内存 主内存
八种规定
JMM的八种交互操作(每个操作都为原子操作)
lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中
**
对八种操作的规则
不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
不允许一个线程将没有assign的数据从工作内存同步回主内存
一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
对一个变量进行unlock操作之前,必须把此变量同步回主内存
可见性
不保证原子性
不加Lock 或者 Synchronized 关键字 怎么保证原子性
可以通过原子类来进行原子性实现
禁止指令重排
内存屏障
内存屏障常用与单例模式
15 单例模式
15.1 双重检测
15.2 静态内部类
15.3 枚举
16 CAS
Unsafe类
ABA问题
解决 :引入原子引用(乐观锁原理)
17 各种锁
17.1 公平锁:线程调度之间必须遵循先来后到
public ReentrantLock() {
this.sync = new ReentrantLock.NonfairSync();
}
17.2 非公平锁:线程调度之间不必遵循先来后到,可以插队(默认:抢占式调度)
public ReentrantLock(boolean var1) {
this.sync = (ReentrantLock.Sync)(var1 ? new ReentrantLock.FairSync() : new ReentrantLock.NonfairSync());
}
17.3 可重入锁
实例
synchronized
lock
17,4 自旋锁
public class Mylock {
AtomicReference<Thread> ato = new AtomicReference<>();
public void mylock(){
String name = Thread.currentThread().getName();
System.out.println("获取锁"+name);
while (!ato.compareAndSet(null,Thread.currentThread())){
}
}
public void myunlock(){
System.out.println("解锁"+Thread.currentThread().getName());
ato.compareAndSet(Thread.currentThread(),null);
}
}
Test
public class MyLockTest {
public static void main(String[] args) throws InterruptedException {
Mylock lock = new Mylock();
new Thread(()->{
lock.mylock();
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("T1");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.myunlock();
}
},"T1").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
lock.mylock();
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("T2");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.myunlock();
}
},"T2").start();
}
}
结果
17.5 死锁
怎么解决死锁?
1 命令 jps -l 查看当前所有的线程号
2 jstack 进程号 命令,查看死锁问题(堆栈信息)
排查问题
1 查看日志
2 查看堆栈信息