AbstractQueuedSynchronizer
- 前言
- AbstractQueuedSynchronizer(1)(JDK 1.8)
- 用途
- 主要源码分析
- Node内部类
- ConditionObject类
-
- 主要的属性及方法
-
- 主要要实现的方法
前言
最近在使用Java 并发包时遇到一些问题,感觉对于其还是不够了解,故开始着手阅读相关源码。
AbstractQueuedSynchronizer(1)(JDK 1.8)
用途
Provides a framework for implementing blocking locks and related
synchronizers (semaphores, events, etc) that rely on
first-in-first-out (FIFO) wait queues.
我的理解:Java并发包里提供的一个用于实现锁和一系列同步器的一个框架。
主要源码分析
本身是一个抽象类,并发包里锁和同步器的实现大都继承了它。
Node内部类
- Node类是一个静态final的内部类,主要用于实现一个等待队列。
- 有独占模式和共享模式两种。
- 主要的属性如下,其中比较重要的就是这个waitStatus,值为如下时的含义:
- SIGNAL(-1): 在当前节点释放或被取消时,唤醒后继节点;
- CANCELLED(1): 该节点不会再被阻塞,会被移除等待队列;
- CONDITION(-2):该节点当前处于一个条件等待队列里(应该给lock的Condition有关);
- PROPAGATE(-3):用于共享模式,暂时不清楚;
- 0:正常在队列中,如果是实现非公平锁,应该会拥有竞争锁的权利;
- next和nextWaiter的区别,主要就是nextWaiter用于表示下一个处于wait condition队列的节点或共享模式的节点,next表示同步队列中下一个可以唤醒的节点;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
ps: 如果处于独占模式下,该等待队列应该会逻辑上维护有两种队列,一个是可以正常竞争锁(如果用于实现锁的话)以及释放唤醒的节点同步队列,一个是处于等待某种条件的节点队列。
ConditionObject类
主要用于实现前面说的处于等待某种条件的节点队列,实现了相关入队出队操作,同时实现了Condition接口,其接口如下:
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
ps: 底层实现原子操作的,用到了unsafe包CAS操作。
重要方法
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
主要的属性及方法
主要属性
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
private static final Unsafe unsafe = Unsafe.getUnsafe();
重要方法
- acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- release(int arg)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
主要要实现的方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)