ReentrantLock可重入锁
我们可以利用这个实现对某一个操作约束为同有个时刻只能有一个线程能够操作。我们呢先看一下下面这个demo
public class ReentrantLockTest {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread start1 = new Thread(new Runnable() {
@Override
public void run() {
try {
lock.lock();
System.out.println("线程1执行任务");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程1执行任务结束");
} finally {
lock.unlock();
}
}
}, "t1");
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
lock.lock();
System.out.println("线程2执行任务");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程2执行任务结束");
} finally {
lock.unlock();
}
}
}, "t2");
start1.start();
thread.start();
}
}
加锁了之后就能够实现只能有一个线程获取执行任务结束另一个线程才能够执行这个任务。
我们看一下lock方法的源码
public void lock() {
sync.lock();
}
sync是ReentrantLock类里面的一个成员变量,final修饰的,它的类型就是Sync类型,Sync
是ReentrantLock里面的一个静态内部抽象类,继承了AbstractQueuedSynchronizer
,Sync
类有两个实现类,分别是FairSync
和NonfairSync
,它们俩个都是ReentrantLock
的静态内部类,都是被final修饰的,不能被继承。分别实现了公平锁和非公平锁。
而这个成员变量sync在调用ReentrantLock
的构造方法的时候就会被赋值,
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
非公平锁的实现
NonfairSync
继承Sync
,而Sync
又继承了AbstractQueuedSynchronizer
,而AbstractQueuedSynchronizer
就是我们常说的AQS,抽象队列同步器,里面有一个state变量,这里不再赘述AbstractQueuedSynchronizer
。
我们呢直接看非公平锁中的lock方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
首先利用哦cas操作如果成功就调用setExclusiveOwnerThread
把获取到这个锁的线程设置为当前线程,否则调用acquire方法加入到等待锁的队列当中。这是AQS里面的方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里首先调用tryAcquire
在非公平锁里面的实现
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
获取锁成功分为两种情况,第一个if判断AQS的state是否等于0,表示锁没有人占有。接着,没有的话调用compareAndSetState使用CAS(Unsafe类调用本地方法)的方式修改state,传入的acquires写死是1。最后线程获取锁成功,setExclusiveOwnerThread将线程记录为独占锁的线程。
第二个if判断当前线程是否为独占锁的线程,因为ReentrantLock是可重入的,线程可以不停地lock来增加state的值,对应地需要unlock来解锁,直到state为零。
这里我们可以总结一下大致的流程是先尝试获取锁然后获取不到之后会在tryAcquire
方法里面再次尝试获取到锁,这就是非公平所,直接尝试获取锁,不直接进入队列。如果还是不行,就会调用addWaiter
方法。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
这个方法首先呢其实就是将这个线程添加到等待队列中去,然后把尾节点设置为新添到这个队列的线程。具体分析如下:
如果第一次进来的话等待队列肯定是空的,所以这里的pred也就是tail就是null,进入enq方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
这里第一次循环会创建一个节点了,里面的线程属性是null,然后第二次循环就会将当前节点加入到这个头节点的下一个节点,compareAndSetHead
方法和compareAndSetTail
都调用了unsafe类的方法。
我们这里可以看到在addWaiter
方法的外面还包裹了一个方法acquireQueued
,这个方法是干嘛的呢?
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这个方法首先获取这个新加的这个节点的前面一个节点,如果前面这个节点是头节点,就会循环获取锁,知道获取成功。但是也有可能不是头节点,那如果不是头节点,就会走下面的逻辑shouldParkAfterFailedAcquire
,在这个方法中呢会将前面这个节点的,一开始ws也就是节点的等待状态是0,所有会走下面的compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
将节点的等待状态设置称-1,为什么呢?这个等待状态的作用就是用来判断是否是最后一个节点,如果是最后一个节点,它的状态就是0,不是的话就是-1,这样就可以知道在这个节点的线程执行结束之后是否应该唤醒下一个节点的线程进行运行。如果节点的等待状态大于零,那么这个就是失效的节点,就会利用循环操作,将当前按节点是上一个节点的指向指向成上一个的上一个,直到找到合适的节点。
但是我们上面的代码中还有一个faiedl的判断,然后如果正常执行,finally里面的代码块是一定不会执行的,只有parkAndCheckInterrupt()
方法中,jvm内部出现了异常,才有可能执行finally里面的方法。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
上面的方法shouldParkAfterFailedAcquire
在第一次循环中被调用的时候当然是false,但是如果第二次循环还是没有获取到线程就会返回ture,然后就会调用下面的parkAndCheckInterrupt()
方法,这个方法就会将当前线程进行park,interrupted是Thread类的静态方法,返回这个线程的中断状态,并将当前线程的中断状态设置称false。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
公平锁的实现
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
与nonfairAcquire()方法相比,唯一的不同之处在于FairSync多了hasQueuedPredecessors()
(由AQS提供)方法:
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
其作用是判断:是否存在比当前线程等待锁时间更长的线程,有就返回true,没有就返回false。
也就是说每次获取锁都需要判断队列中是否还有排在前面的线程,若不存在才能继续获取锁!!若队列还存在等待线程则获取失败,当前线程进入队列尾部。这里有集中情况是需要加入到队列中的。
- h != t:这种就是队列中有元素在等待了。
- (s = h.next) == null:这种就是在并发的情况下,一个线程还没有来得及将头节点和尾节点同时指向头节点的时候就会存在这种情况。
- s.thread != Thread.currentThread():这种情况就是避免当前线程已经在等待了,所以没有必要再次等待了。
ReentrantLock解锁方法的实现
调用unlock方法,就会调用sync里面的release方法
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
在tryRelease
方法中就会尝试释放锁,这里如果锁被持有,就会返回false,知道没有线程占有锁的时候才会返回true。一旦这个返回true,就会调用unparkSuccessor
方法。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
在unparkSuccessor
方法中,就会从后面往前遍历链表中的节点,找到需要被唤醒的节点然后唤醒节点中park的线程。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}