并发编程-共享模型之管程
4.1共享资源问题
临界区
多个线程读共享资源没有问题,但是多线程对共享资源进行读写操作,就会有问题
竞态条件
多个线程再临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件
4.2synchronized解决方案
应用之互斥
为了避免临界区的竞态条件发生,有多种手段可以达到目的。
本次采用阻塞式方案:synchronized,即【对象锁】,它采用互斥的方式让同一时刻至多只有一个线程能持有【对象锁】,其他线程获取锁时会被阻塞 。
虽然java中互斥和同步都可以采用synchronized关键字来完成,但还是有区别
static int counter = 0;
static final Object room = new Object();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
// synchronized (this)
synchronized (room) {
counter++;
}
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
// synchronized (this)
synchronized (room) {
counter--;
}
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(counter);
}
上面代码优化
public class test3 {
public static void main(String[] args) throws InterruptedException {
Room room = new Room();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (room) {
room.increment();
}
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (room) {
room.increment();
}
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(room.getCounter());
}
}
class Room {
public int counter = 0;
public synchronized void increment() {
counter++;
}
public void decrement() {
synchronized (this) {
counter--;
}
}
public int getCounter() {
synchronized (this) {
return counter;
}
}
}
4.3方法上的synchronized
加在成员方法上
class Room {
public synchronized void increment() {
}
//等价于
class Room {
public void increment() {
synchronized (this) {
}
}
加在静态方法上
class Room {
public synchronized static void increment() {
}
//等价于
class Room {
public static void increment() {
synchronized (Room.class) {
}
}
线程八锁
其他略,着重情况8
包含static synchronized的2个方法,都是锁的同一个类对象,互斥。如果其中一个不是静态的都是并行,锁的不是同一个对象。
4.4线程安全分析
成员变量和静态变量是否线程安全
如果只有读操作,则线程安全
如果有读写操作,则这段代码是临界区,需要考虑线程安全
局部变量是否线程安全?
如果该对象没有逃离方法的作用访问,它是线程安全的
如果该对象逃离方法的作用范围,则需要考虑线程安全
局部变量安全分析
public static void test(){
int i = 10;
i++;
}
如果是2个线程调用test()方法时局部变量i,会在每个线程的栈帧中被创建多份,因此不存在共享
引用对象则有所不同
1.如果是成员变量
分析:
2.如果是局部变量
分析
3.如果是局部变量且有子类继承,局部变量暴露给外部
有线程安全问题,父类成员方法method3,此时有个线程,子类方法中又开了一个线程,重写方法method3方法,开了一个新的线程,此时用的局部变量就是共享资源,但是如果method3被private修饰,那么不能被子类覆盖,就是安全的,或者加finally修饰
常见线程安全类
这里说它们线程安全的是指,多个线程调用它们同一个实例的某个方法时,是线程安全的。也可以理解为
线程安全类方法的组合
get方法和put方法内部整体是线程安全的,但是组合就不安全了
线程1执行到table.get(key) == null,但是还没有put,时间片切到线程2,且线程2,put操作执行完毕,此时又切回线程1,线程1做了put操作
HashTable table = new HashTable()
//线程1,线程2
if(table.get(key) == null){
table.put("key",value)
}
不可变类线程安全
String、Integer等都是不可变类,因为其内部的状态不可以改变,因此它们的方法都是线程安全的
那么,String由replace,substring等方法【可以】改变值,如何保证线程安全?
4.5 习题
略
4.6Monitor概念
java对象头
Monitor(锁)
Monitor被翻译为监视器或管程
每个java对象都可以关联一个Monitor对象,如果使用synchronized给对象上锁(重量级)之后,该对象头的Mark Word中就被设置指向Monitor对象的指针
Monitor结构如下
注意:
synchronized必须是进入同一个对象的monitor才有上述的效果
不加synchronized的对象不会关联监视器,不遵守以上规则
synchronized进阶原理
1.轻量级锁 00
使用场景:如果一个对象虽然有多线程访问,但多线程访问的时间是错开的(也就是没有竞争),那么可以使用轻量级锁来优化
轻量级锁对使用者是透明的,语法仍然是synchronized
假设有2个方法同步块,利用同一个对象加锁
static final Object obj = new Object();
public static void method1(){
synchronized(obj){
//同步块A
method2()
}
}
public static void method2(){
synchronized(obj){
//同步块B
}
}
2.锁膨胀
如果在尝试加轻量级锁的过程中,CAS操作无法成功,这时一种情况就是有其他线程为此对象加上了轻量级锁(有竞争),这时需要膨胀加锁,将轻量级锁变为重量级锁
static Object obj = new Object();
public static void method1(){
synchronized(obj){
//同步块
}
}
3.自旋优化
cpu多核情况才有意义
4.偏向锁 101
偏向状态
撤销偏向状态-调用hashCode方法
注意:
当一个可偏向的对象调用了hashcode方法,会撤销该对象的偏向状态。恢复到Normal状态
撤销偏向状态-其他线程使用
当有其他线程使用偏向锁对象时,会将偏向锁升级为轻量级锁
撤销偏向状态-调用wait/notify
不管是偏向锁还是轻量级锁,都会变更重量级锁
批量重偏向
如果对象虽然被多个线程访问,但是没有竞争,这时偏向了线程T1的对象仍有机会重新偏向T2,重偏向会重置对象的Thread ID
当撤销偏向锁阈值找过20次后,jvm会这样觉得,我是不是偏向错了呢,于是会在给这些对象加锁时重新偏向至加锁线程
批量撤销
当撤销偏向锁阈值超过40次以后,jvm会这样觉得,自己确实偏向错了,根本就不该偏向。于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的
5.锁消除
即时编译器会优化掉b方法的synchronized,idea默认打开
4.7wait notify
原理之wait notify
API介绍
obj.wait()让进入object监视器的线程到waitSet等待
obj.wait(long n)让进入object监视器的线程到waitSet等待n毫秒
obj.notify()在object上正在waitSet等待的线程中挑一个唤醒(随机)
obj.notifyAll()让object上正在waitSet等待的线程全部唤醒
上述都是线程之间进行协作的手段,都属于Object对象方法。必须获得此对象的锁,才能调用这几个方法
4.8wait notify的正确姿势
sleep(long n)和wait(long n)的区别
synchronized(lock){
while(条件不成立){
lock.wait();
}
//程序继续运行
}
//另一个线程
synchronized(lock){
lock.notifyAll();
}
4.8.1同步模式之保护性暂停
即Guarded Suspension,用在一个线程等待另一个线程执行结果
要点
实现案例
public class test4 {
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(()->{
//等待结果
List list = (List<String>)guardedObject.get();
log.info("结果为:{}",list.size());
},"t1").start();
new Thread(()->{
log.info("执行下载");
try {
List<String> download = Downloder.download();
guardedObject.createResponse(download);
} catch (IOException e) {
e.printStackTrace();
}
},"t3").start();
}
}
class GuardedObject {
//结果
private Object response;
//获取结果
public synchronized Object get() {
while (response == null) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
//产生结果
public synchronized void createResponse(Object response){
//给结果成员变量赋值
this.response=response;
this.notifyAll();
}
}
增加超时效果
class GuardedObject {
//结果
private Object response;
//获取结果
//timeout表示要等多久
public synchronized Object get(long timeout) {
//记录开始时间
long begin = System.currentTimeMillis();
//记录经历的时间
long passedTime = 0;
while (response == null) {
if (passedTime>=timeout){
//经历时间超过最大等待时间,退出循环
break;
}
try {
this.wait(timeout-passedTime); //考虑虚假唤醒
} catch (InterruptedException e) {
e.printStackTrace();
}
//求得经历时间
passedTime= System.currentTimeMillis()- begin;
}
return response;
}
//产生结果
public synchronized void createResponse(Object response){
//给结果成员变量赋值
this.response=response;
this.notifyAll();
}
}
4.8.1.扩展-解耦等待和生产
//一个人必须得有一个送信人(生产者和消费者一一对应)
@Slf4j
public class test4 {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
Thread.sleep(1000);
for (Integer id : MailBoxes.getIds()) {
new Postman(id,"内容"+id).start();
}
}
}
@Slf4j
class People extends Thread{
@Override
public void run() {
//收信
GuardedObject guardedObject = MailBoxes.createGuardedObject();
log.info("开始收信 id:{}",guardedObject.getId());
Object mail = guardedObject.get(5000);
log.info("收到信 id:{},内容:{}",guardedObject.getId(),mail);
}
}
@Slf4j
class Postman extends Thread{
//信件的id
private int id;
//信件的内容
private String mail;
public Postman( int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = MailBoxes.getGuardedObject(id);
log.info("开始送信 id:{}",guardedObject.getId());
guardedObject.createResponse(mail);
}
}
class MailBoxes{
private static Map<Integer,GuardedObject>boxes=new Hashtable<>();
private static int id=1;
//产生唯一id
private static synchronized int generateId(){
return id ++;
}
public static GuardedObject createGuardedObject(){
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(),go);
return go;
}
public static GuardedObject getGuardedObject(int id){
return boxes.remove(id);
}
public static Set<Integer> getIds(){
return boxes.keySet();
}
}
class GuardedObject {
//唯一标识
private int id;
public GuardedObject(int id) {
this.id=id;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
//结果
private Object response;
//获取结果
//timeout表示要等多久
public synchronized Object get(long timeout) {
//记录开始时间
long begin = System.currentTimeMillis();
//记录经历的时间
long passedTime = 0;
while (response == null) {
if (passedTime>=timeout){
//经历时间超过最大等待时间,退出循环
break;
}
try {
this.wait(timeout-passedTime); //考虑虚假唤醒
} catch (InterruptedException e) {
e.printStackTrace();
}
//求得经历时间
passedTime= System.currentTimeMillis()- begin;
}
return response;
}
//产生结果
public synchronized void createResponse(Object response){
//给结果成员变量赋值
this.response=response;
this.notifyAll();
}
}
4.8.2异步模式之生产者/消费者
package com.kevin.user.threadDemo;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
public class Test5 {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(2);
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
queue.put(new Message(id, "值" + id));
}, "生产者" + i).start();
}
new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
Message message = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者").start();
}
}
@Slf4j
//消息队列类,java线程之间通信
class MessageQueue {
//消息的队列集合
private final LinkedList<Message> list = new LinkedList<>();
//队列的容量
private int capcity;
public MessageQueue(int capcity) {
this.capcity = capcity;
}
//获取消息
public Message take() {
//检查队列是否为空
synchronized (list) {
while (list.isEmpty()) {
try {
log.info("队列为空,消费者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//从头部取一个并返回
Message message = list.removeFirst();
log.info("已消费消息:{}", message);
list.notifyAll();
return message;
}
}
//存入消息
public void put(Message message) {
synchronized (list) {
//检查队列是否已经满了
while (list.size() == capcity) {
try {
log.info("队列已满,生产者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.addLast(message);
log.info("已生产消息:{}", message);
list.notifyAll();
}
}
}
@Slf4j
final class Message {
private int id;
private Object value;
public int getId() {
return id;
}
public Object getValue() {
return value;
}
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}
4.9 park&unpark方法
基本使用
它们是LockSupport类中的方法
//暂停当前线程
LockSupport.park();
//恢复某个线程的运行
LockSupport.unpark(暂停线程对象);
先park再unpark
park对应的线程状态是wait
public class test6 {
public static void main(String[] args) throws InterruptedException {
Thread t1= new Thread(()->{
log.info("开始...");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("park开始...");
LockSupport.park();
log.info("park结束...");
},"t1");
t1.start();
Thread.sleep(2);
log.info("unpark...");
LockSupport.unpark(t1);
}
}
结论:unpark既可以在park之前调用,也可以在park之后调用,都是用来恢复某个线程的运行,可以在线程执行park前就调用unpark,就可以在将来恢复park线程的运行
特点
与Object的wait¬ify相比
原理
1.当前线程调用Unsafe.park()方法
2.检查_counter,本情况为0,这时,获得“-mutex"互斥锁
3.线程进入-cond条件变量阻塞
4.设置-counter=0
1.当前线程调用Unsafe.unpark(Thread_0)方法,设置_counter为1
2.唤醒cond条件变量中的Thread_0
3.Thread_0恢复运行
4.设置-counter=0
4.10重新理解线程状态
假设有线程Thread t
情况1 new --> runnable
当调用t.start()方法时,由new --> runnable
情况2 runnable--> waiting
t线程用synchronized(obj)获取对象锁后
情况3 runnable<--> waiting
情况4 runnable<--> waiting
情况5 runnable<--> timed_waiting
t线程用synchronized(obj)获取了对象锁后
情况6 runnable<--> timed_waiting
情况7 runnable<--> timed_waiting
情况8 runnable<--> timed_waiting
情况9 runnable<--> blocked
情况910 runnable<--> terminated
当前线程所有代码运行完毕,进入terminated