privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node) { intws= pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; if (ws > 0) { /* * 前驱节点终止了. 跳过前驱节点,再尝试 * 此处删除了无效的线程终止节点 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 找到一个前驱节点waitStats不是CANCELLED的并且最靠近head节点的那一个为止 // 用CAS机制把前驱节点的waitStatus更新为SIGNAL状态 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; }
这里会判断当前节点的前驱节点的状态,如果当前节点的前驱节点的waitStatus是SIGNAL,返回true,表示当前节点应当park。这个时候就会调用parkAndCheckInterrupt()方法.当前线程就会被阻塞住。从这个方法还可以看出,如果这个线程被唤醒了,这个线程自己会返回在它阻塞期间有没有被中断过。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。在acquireQueued(final Node node, int arg)方法中,如果这个线程被唤醒了,并且曾经在阻塞期间被中断过,就将中断标识符interrupted置为true。接着线程又会进入acquireQueued(final Node node, int arg)的for循环中。如果当前这个被唤醒的线程是正常被唤醒的,那么它的前继节点就应该是head,这个时候当前被唤醒的线程就会执行tryAcquire方法去获取锁。如果假设它获取锁成功了,那么它会把自己设置为head节点,并且把head节点的持有线程设置为null,以保持head节点是dummy节点,接着当前线程就去做自己的业务了。
privatevoidunparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ intws= node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Nodes= node.next; if (s == null || s.waitStatus > 0) { s = null; for (Nodet= tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
用unpark()唤醒等待队列中最前边的那个未放弃的线程,这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃的线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了!
“响应中断的独占锁”模式
1 2 3 4 5 6 7 8
// 用于实现Lock的lockInterruptibly方法 publicfinalvoidacquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) thrownewInterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
privatevoidsetHeadAndPropagate(Node node, int propagate) { Nodeh= head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Nodes= node.next; if (s == null || s.isShared()) doReleaseShared(); } }
privatevoiddoReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Nodeh= head; if (h != null && h != tail) { intws= h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } elseif (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
abstractstaticclassSyncextendsAbstractQueuedSynchronizer { /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstractvoidlock();
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ finalbooleannonfairTryAcquire(int acquires) { finalThreadcurrent= Thread.currentThread(); intc= getState(); if (c == 0) { // 可以获得锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); returntrue; } } elseif (current == getExclusiveOwnerThread()) { // 重入 intnextc= c + acquires; if (nextc < 0) // overflow 锁的最大上限int.max thrownewError("Maximum lock count exceeded"); setState(nextc); returntrue; } returnfalse; }
/* * Read vs write count extraction constants and functions. * Lock state is logically divided into two unsigned shorts: * The lower one representing the exclusive (writer) lock hold count, * and the upper the shared (reader) hold count. */
/** Returns the number of shared holds represented in count */ staticintsharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ staticintexclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
/** * The number of reentrant read locks held by current thread. * Initialized only in constructor and readObject. * Removed whenever a thread's read hold count drops to 0. */ privatetransient ThreadLocalHoldCounter readHolds;
/** * The hold count of the last thread to successfully acquire * readLock. This saves ThreadLocal lookup in the common case * where the next thread to release is the last one to * acquire. This is non-volatile since it is just used * as a heuristic, and would be great for threads to cache. * * <p>Can outlive the Thread for which it is caching the read * hold count, but avoids garbage retention by not retaining a * reference to the Thread. * * <p>Accessed via a benign data race; relies on the memory * model's final field and out-of-thin-air guarantees. */ privatetransient HoldCounter cachedHoldCounter;
/** * firstReader is the first thread to have acquired the read lock. * firstReaderHoldCount is firstReader's hold count. * * <p>More precisely, firstReader is the unique thread that last * changed the shared count from 0 to 1, and has not released the * read lock since then; null if there is no such thread. * * <p>Cannot cause garbage retention unless the thread terminated * without relinquishing its read locks, since tryReleaseShared * sets it to null. * * <p>Accessed via a benign data race; relies on the memory * model's out-of-thin-air guarantees for references. * * <p>This allows tracking of read holds for uncontended read * locks to be very cheap. */ privatetransientThreadfirstReader=null; privatetransientint firstReaderHoldCount;
/** * Returns true if the current thread, when trying to acquire * the read lock, and otherwise eligible to do so, should block * because of policy for overtaking other waiting threads. */ abstractbooleanreaderShouldBlock();
/** * Returns true if the current thread, when trying to acquire * the write lock, and otherwise eligible to do so, should block * because of policy for overtaking other waiting threads. */ abstractbooleanwriterShouldBlock();
Sync中提供了很多方法,但是有两个方法是抽象的,子类必须实现。
公平
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
finalbooleanwriterShouldBlock() { return hasQueuedPredecessors(); } finalbooleanreaderShouldBlock() { return hasQueuedPredecessors(); } //// --> call AbstractQueuedSynchronizer.hasQueuedPredecessors publicfinalbooleanhasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Nodet= tail; // Read fields in reverse initialization order Nodeh= head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
finalbooleanwriterShouldBlock() { returnfalse; // writers can always barge } finalbooleanreaderShouldBlock() { /* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */ return apparentlyFirstQueuedIsExclusive(); //该方法,具体又是在 AbstractQueuedSynchronizer中 } //// --> call AbstractQueuedSynchronizer.apparentlyFirstQueuedIsExclusive /** * Returns {@code true} if the apparent first queued thread, if one * exists, is waiting in exclusive mode. If this method returns * {@code true}, and the current thread is attempting to acquire in * shared mode (that is, this method is invoked from {@link * #tryAcquireShared}) then it is guaranteed that the current thread * is not the first queued thread. Used only as a heuristic in * ReentrantReadWriteLock. */ finalbooleanapparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
可以看到,非公平模式下,writerShouldBlock直接返回false,说明不需要阻塞;
而readShouldBlock调用了apparentFirstQueuedIsExcluisve()方法。即判断阻塞队列中 head 的第一个后继节点是否是来获取写锁的,如果是的话,让这个写锁先来,避免写锁饥饿。
/** * Acquires the read lock. * * <p>Acquires the read lock if the write lock is not held by * another thread and returns immediately. * * <p>If the write lock is held by another thread then * the current thread becomes disabled for thread scheduling * purposes and lies dormant until the read lock has been acquired. */ publicvoidlock() { sync.acquireShared(1); } //// --> call AbstractQueuedSynchronizer.acquireShared publicfinalvoidacquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); // 在 AQS 中,如果 tryAcquireShared(arg) 方法返回值小于 0 代表没有获取到共享锁(读锁),大于 0 代表获取到 } //// --> call ReentrantReadWriteLock.Sync.tryAcquireShared protectedfinalinttryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Threadcurrent= Thread.currentThread(); intc= getState(); // 获取当前锁状态 if (exclusiveCount(c) != 0 && // 有线程已经抢占了写锁 getExclusiveOwnerThread() != current) // 写线程不是当前线程 return -1; // 返回-1,进入lock等待队列 intr= sharedCount(c); // 获取当前读线程数 if (!readerShouldBlock() && // 当前读线程是否应当排队 r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // cas 新增读状态成功 if (r == 0) { // 如果r=0, 表示,当前线程为第一个获取读锁的线程 firstReader = current; firstReaderHoldCount = 1; // 如果第一个获取读锁的对象为当前对象,将firstReaderHoldCount 加一 } elseif (firstReader == current) { firstReaderHoldCount++; } else { // 如果不是第一个获取多锁的线程,将该线程持有锁的次数信息,放入线程本地变量中,方便在整个请求上下文(请求锁、释放锁等过程中)使用持有锁次数信息 HoldCounterrh= cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); elseif (rh.count == 0) readHolds.set(rh); rh.count++; } return1; // 成功获取锁 } return fullTryAcquireShared(current); }
// 看下这里返回 false 的情况: // c != 0 && w == 0: 写锁可用,但是有线程持有读锁(也可能是自己持有) // c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其他线程持有写锁 // 也就是说,只要有读锁或写锁被占用,这次就不能获取到写锁 if (w == 0 || current != getExclusiveOwnerThread()) returnfalse;
if (w + exclusiveCount(acquires) > MAX_COUNT) thrownewError("Maximum lock count exceeded");
// 这里不需要 CAS,仔细看就知道了,能到这里的,只可能是写锁重入,不然在上面的 if 就拦截了 setState(c + acquires); returntrue; }
// 如果写锁获取不需要 block,那么进行 CAS,成功就代表获取到了写锁 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) returnfalse; setExclusiveOwnerThread(current); returntrue; }