Featured image of post AbstractQueuedSynchronizer 源码(二)

AbstractQueuedSynchronizer 源码(二)

公平锁和非公平锁源码对比,Condition 使用和源码分析

通过对前文AbstractQueuedSynchronizer源码(一)的阅读,相信你已经对AQS中的独占锁有了了解,本文将在前文的基础上,继续深入,主要关注如下几点:

  • 深入分析ReentrantLock公平锁和非公平锁的区别
  • Condition的使用场景以及源码分析
  • 源码中问题解析

本文将尽可能把以上几点分析清楚,也希望能够对得起你的时间……

公平锁与非公平锁

ReentrantLock默认采用的是非公平锁,如果你想使用公平锁,那么需要在其实例化的时候去手动设置。

1
2
3
4
5
6
7
8
public ReentrantLock() {
  	//默认采用非公平锁
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
  	//通过传入的参数可以手动设置公平锁和非公平锁
    sync = fair ? new FairSync() : new NonfairSync();
}

公平锁和非公平锁的lock()方法在源码中只有两处有着区别。

我们先来看看公平锁源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static final class FairSync extends Sync {

    final void lock() {
      	//1.这里会直接获取锁,并没有先cas获取锁
        acquire(1);
    }

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
          	//2.这里会先判断有没有线程在队列等待,而非公平锁不会
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

再看看非公平锁源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static final class NonfairSync extends Sync {

    final void lock() {
      	//1.这里会先直接cas尝试获取锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
          	//2.这里没有判断队列中是否有线程
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

所以,公平锁和非公平锁就只有这两个区别:

  • 非公平锁在调用lock()方法时,会先直接cas看看能否插队获取到锁,如果此时刚好没有线程获取到锁,那么就能够直接获取锁。
  • 在第一次cas获取锁失败之后,和公平锁一样,会进入到tryAcquire()方法中,如果此时看到锁没有被线程持有,那么非公平锁会直接cas抢锁,而公平锁会先看看队列中是否有线程在等待获取锁。

从源码来看就只有这两个区别,一般来说都是使用非公平锁,毕竟其吞吐量更大,但是偶尔还是会出现部分线程长期处于饥饿状态。

Condition

说完公平锁和非公平锁,接下来咱们还是要来看看基于AQS的几个扩展类实现的扩展功能,先来看看Condition类,想必你应该知道Condition的具体使用方法,下面以一个小demo来回顾回顾它的用法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package condition;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {  
   final Lock lock = new ReentrantLock();//锁对象  
   final Condition notFull  = lock.newCondition();//写线程条件   
   final Condition notEmpty = lock.newCondition();//读线程条件   
  
   final Object[] items = new Object[100];//缓存队列  
   int putptr/*写索引*/, takeptr/*读索引*/, count/*队列中存在的数据个数*/;  
  
   public void put(Object x) throws InterruptedException {  
     lock.lockInterruptibly();  //可中断锁
     try {  
       while (count == items.length)//如果队列满了   
         notFull.await();//阻塞写线程  
       items[putptr] = x;//赋值   
       if (++putptr == items.length) putptr = 0;//如果写索引写到队列的最后一个位置了,那么置为0  
       ++count;//个数++  
       notEmpty.signal();//唤醒读线程  
     } finally {  
       lock.unlock();  
     }  
   }  
  
   public Object take() throws InterruptedException {  
     lock.lockInterruptibly();  
     try {  
       while (count == 0)//如果队列为空  
         notEmpty.await();//阻塞读线程  
       Object x = items[takeptr];//取值   
       if (++takeptr == items.length) takeptr = 0;//如果读索引读到队列的最后一个位置了,那么置为0  
       --count;//个数--  
       notFull.signal();//唤醒写线程  
       return x;  
     } finally {  
       lock.unlock();  
     }  
   }   
 }

从上面例子我们可以看到,如果想要使用Condition,那么必须要先获取到锁,然后才能进行操作,这个和Object中有一点类似,Object需要和sychronized在一起使用持有了对象锁才能在之后进行阻塞和唤醒。

其实我们常见的数据结构,阻塞队列在内部就是使用的类似的方式实现的。

接下来我们开始分析Condition的源码,但是在分析之前,我还需要重申一下关于Condition的使用方式,当前线程在执行await()signal()方法之前,需要先获取到锁,之后才能执行。

接下来看源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public Condition newCondition() {
  	//内部通过Sync实例化一个ConditionObject对象
    return sync.newCondition();
}

final ConditionObject newCondition() {
    return new ConditionObject();
}

//ConditionObject类实现了Condition接口
public class ConditionObject implements Condition, java.io.Serializable {
}

所以我们直接看ConditionObject类:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class ConditionObject implements Condition, java.io.Serializable {

  	//条件队列的第一个结点
    private transient Node firstWaiter;

  	//条件队列的最后一个结点
    private transient Node lastWaiter;

    ......
}

我们先来说一下条件队列,在上一节中我们有提到同步队列,那是一个双向链表实现的队列,而这里引入了另外一个队列叫做条件队列,这个队列是一个单向链表实现的队列,主要属性有第一个结点(firstWaiter)、最后一个结点(lastWaiter)以及上节说过的Node结点中的nextWaiter属性,表示当前结点的下一结点。

那么这个条件队列和之前的同步队列有没有关联呢?我们接着往下看。

首先来看看ConditionObject类中的await()方法:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
//线程调用该方法之后会被阻塞,直到别的线程调用signal或者signalAll方法
public final void await() throws InterruptedException {
  	//可被中断的,如果当前线程被设置为中断状态,这里会抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
  	//添加当前线程到条件队列结点中,后面会展开分析
    Node node = addConditionWaiter();
  	//释放锁,想一想也应该明白,如果这个方法里面不释放锁,那么将会导致别的线程无法执行该方法
  	//所以肯定要释放锁,返回的是锁释放之前的状态值
    int savedState = fullyRelease(node);
    int interruptMode = 0;
  	//isOnSyncQueue:当前结点是否转移到同步队列,可以看到条件队列和同步队列产生关联
    while (!isOnSyncQueue(node)) {
      	//线程挂起
        LockSupport.park(this);
      	//判断线程中断状态,interruptMode不为0,退出循环
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
      //该循环退出有两种可能,一种是当前线程对应结点已经转移到同步队列,
      //另外一种是interruptMode字段不为0,break退出循环
    }
  	//到这里说明线程开始尝试获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

//我们回到Node node = addConditionWaiter();这一行代码
//将当前线程对应的结点放到条件队列尾部
private Node addConditionWaiter() {
  	//获取当前队尾节点
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
  	//队列最后一个结点取消,那么将会被清除出去
    if (t != null && t.waitStatus != Node.CONDITION) {
      	//遍历整个队列,清除状态为cancle的结点,后续详细分析
        unlinkCancelledWaiters();
      	//重新设置t结点为最后一个结点
        t = lastWaiter;
    }
  	//初始化当前线程对应结点
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
      	//t为空说明条件队列为空,那么条件队列第一个结点为当前线程对应结点
        firstWaiter = node;
    else
      	//队尾节点不为空,那么队尾节点的后继结点为当前线程对应结点
        t.nextWaiter = node;
  	//重新设置队尾节点为当前线程对应结点
    lastWaiter = node;
    return node;
}

//遍历整个队列,清除状态为cancle的结点
//当节点入队时发现最后节点被取消以及await时发生了当前节点取消操作,都会调用该方法
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
      	//如果节点状态不为CONDITION状态,那么就认为该节点已被取消
        if (t.waitStatus != Node.CONDITION) {
          	//将该节点后置节点指向null
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

//我们回到int savedState = fullyRelease(node);这一行代码
//该方法时释放锁,从上一篇文章,我们知道释放节点无非就是将队列状态设为0,同时队列对应的线程设为null
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
      	//获取释放锁之前的队列状态
        int savedState = getState();
      	//释放锁,这里直接调用了unlock()方法中的release方法,所以释放锁的同时还会通知
      	//同步队列第一个结点去获取锁
      	//锁释放成功之后,同步队列staet值为0
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
          	//释放锁失败抛出异常,如果一个线程没有持有锁,直接调用await()方法的话
          	//到这里会抛出异常
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
          	//这里只有抛出异常才会执行
            node.waitStatus = Node.CANCELLED;
    }
}

//执行完int savedState = fullyRelease(node);这一行之后
//我们来看看while (!isOnSyncQueue(node)) {}中的isOnSyncQueue方法
//该方法是判断当前结点是否转移到同步队列中
final boolean isOnSyncQueue(Node node) {
  	//因为结点移动过去之后,结点状态会被修改成0,所以如果状态还是CONDITION,那么肯定没有被移动过去
  	//如果结点的前驱结点prev还是null,那么肯定还没有移动过去,上一篇最后问题4有说明为何直接判断prev即可
  	//值得说明的是,如果node.prev() != null也不能判断已经移动过去,可能node.next为null
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
  	//如果节点有next节点,那么肯定在同步队列了
    if (node.next != null) // If has successor, it must be on queue
        return true;
    //这里就是从同步队列尾部遍历向前找,如果找到相等的,说明在同步队列,否则不在
    return findNodeFromTail(node);
}

//从同步队列的队尾往前遍历,找到返回true,找不到返回false
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

//上面!isOnSyncQueue(node)如果返回false的话,那么当前线程就会被挂起
//LockSupport.park(this);

分析完await()方法之后,我们接下来分析分析signal()方法,需要注意的是执行signal()方法的时候,当前线程是持有锁的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public final void signal() {
  	//执行signal方法要求当前线程持有独占锁,如果没有抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
  	//获取条件队列第一个结点
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

//判断当前线程是否持有独占锁
protected final boolean isHeldExclusively() {
    // While we must in general read state before owner,
    // we don't need to do so to check if current thread is owner
    return getExclusiveOwnerThread() == Thread.currentThread();
}

//从条件队列队头往后遍历,找出第一个需要转移的node
//因为条件队列中部分结点可能取消排队
private void doSignal(Node first) {
    do {
      	//firstWaiter指向first的后面一个结点,因为first结点马上要离开了
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
      	//因为first结点马上要被移动到同步队列中,所以需要断开first结点和条件队列关系
        first.nextWaiter = null;
      //transferForSignal转移结点,转移成功返回true,否则返回false
      //如果转移失败,那么需要first的后面第一个结点转移
    } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
}

//将结点从条件队列转移到同步队列
final boolean transferForSignal(Node node) {
    /*
    * If cannot change waitStatus, the node has been cancelled.
    */
    //cas失败表示修改状态值失败,说明该节点的状态发生了改变,这里是取消了
    //因为取消了所以不需要转移
    //否则这里值修改成功,当前结点状态变为0,注意仅仅是结点值改变了,结点还没有转移到同步队列
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
    * Splice onto queue and try to set waitStatus of predecessor to
    * indicate that thread is (probably) waiting. If cancelled or
    * attempt to set waitStatus fails, wake up to resync (in which
    * case the waitStatus can be transiently and harmlessly wrong).
    */
    //将当前结点塞到同步队列队尾
    //返回值p表示的是node结点的前驱结点
    Node p = enq(node);
    int ws = p.waitStatus;
    //到这里会判断当前结点的前驱结点,正常情况下前驱结点状态小于等于0,所以这里如果线程被唤醒,有两种情况
    //1.前驱结点状态大于0,
    //2.前驱结点状态小于等于0,但是cas失败,state状态值发生改变
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    //这里唤醒的目的需要考虑清楚
    //因为如果这里提前唤醒结点,那么说明当前结点的前驱结点取消了排队,或者前驱结点cas失败
    //然后担心当前结点无法被唤醒而执行的操作,所以结点node对应线程被唤醒之后,继续执行后续操作
    //从后续操作中我这里给出结论:如果前驱结点取消了,那么会将前驱结点移除掉,然后重新挂起
    //结点node除非是head结点的下一个结点才会有机会去争取锁
        LockSupport.unpark(node.thread);
    return true;
}

在执行完signal()方法之后,线程将会从条件队列被移动到同步队列中,条件队列中将不会存在该结点,其实到这里已经结束了,但是好像感觉出了什么问题?按道理正常情况下if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))是会返回false的,也就是说并不会去唤醒线程,那么线程又是何时被唤醒呢?

上一篇文章中我们有提到一种情况,那就是解锁的时候会去执行release()方法,该方法内部会存在唤醒head结点的下一个结点的可能,release()方法有没有感觉很熟悉?没错,我们刚刚在分析await()方法的时候,会去执行fullyRelease()方法,该方法内部也是会调用``release()方法,所以await()`方法也会去唤醒线程。

除此之外,如果线程发生中断,其实也会导致LockSupport.park()返回继续往下执行。

所以我们来总结一下,以下三种情况会导致LockSupport.park(this)继续往下执行:

  • 正常条件下,线程执行await()或者unlock()方法唤醒head结点的后继结点对应线程,导致线程被唤醒
  • 线程中断,在park()源码中会判断线程状态,如果中断会直接返回
  • signal()方法中,如果ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)会唤醒线程

聊完线程被唤醒的几种条件,我们继续看看线程被唤醒之后,接下来会做什么……

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
int interruptMode = 0;
//isOnSyncQueue:当前结点是否转移到同步队列,可以看到条件队列和同步队列产生关联
while (!isOnSyncQueue(node)) {
    //线程挂起
    LockSupport.park(this);
    //判断线程中断状态,interruptMode不为0,退出循环
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
    //该循环退出有两种可能,一种是当前线程对应结点已经转移到同步队列,
    //另外一种是interruptMode字段不为0,break退出循环
}
//到这里说明线程开始尝试获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

线程被唤醒之后,会先判断线程状态,这里先解释一下interruptMode字段,这个字段有三个值:0REINTERRUPT(1)THROW_IE(-1)

  • REINTERRUPT(1):表示await返回的时候需要重新设置中断状态
  • THROW_IE(-1):表示await返回的时候需要抛出InterruptedException
  • 0:表示在await期间没有发生中断

REINTERRUPT(1)THROW_IE(-1)都是线程发生了中断,但是线程发生中断的时机不同,导致会有两个状态。

下面我们看看checkInterruptWhileWaiting(node)方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private int checkInterruptWhileWaiting(Node node) {
  	//先判断线程是否发生过中断,如果发生过再看是signal前还是后
    return Thread.interrupted() ?
      	//判断是signal前还是后发生中断
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

//signal前发生,返回true,否则返回false
final boolean transferAfterCancelledWait(Node node) {
  	//如果这里成功,说明是signal前发生中断,因为signal会将node状态改为0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
      	//将结点放入同步队列,即使线程中断也会将其结点放入同步队列
        enq(node);
        return true;
    }
    /*
    * If we lost out to a signal(), then we can't proceed
    * until it finishes its enq().  Cancelling during an
    * incomplete transfer is both rare and transient, so just
    * spin.
    */
    //到这里说明不是signal之前发生中断,所以需要自旋等待结点转移到同步队列中
    while (!isOnSyncQueue(node))//判断结点是否转移到同步队列
        Thread.yield();
    return false;
}

这里我们需要注意一个地方,那就是即使线程发生了中断,也会将线程对应的结点转移到同步队列,因为会出现一种场景,即一个线程因为中断被唤醒,但是发现其不是被signal那个,所以又会重新进入到同步队列。

在线程被唤醒并进入同步队列之后,线程将开始去尝试获取锁,我们跟着上面的步骤继续往下看

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
//到这里说明线程开始尝试获取锁
//acquireQueued返回表示当前线程对应结点获取到锁,返回值表示是否发生过中断,false没有中断,true中断
//如果中断过,还会判断何时中断,如果不为THROW_IE说明是signal之后发生中断,会设置interruptMode
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
//到这里的时候,当前线程已经获取到了锁,但是因为线程可能在被signal之前发生中断,而此时被唤醒的
//线程也会进入同步队列,但是没有设置node.nextWaiter = null
if (node.nextWaiter != null) // clean up if cancelled
  	//这里会清理条件队列中的结点状态非CONDITION的结点
    unlinkCancelledWaiters();
if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

//依据interruptMode值进行处理
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

拓展

到目前为止关于Conditionawait()signal()方法相关源码已经讲解完了,为了丰富整篇文章,下面我再讲几个拓展点:

1.带超时的await

如果await()方法带超时的话,那么该是什么样子的呢?我们先来猜一猜,既然是超时,说明在某个时间段内线程会被挂起,如果超过这个时间的话就会被唤醒然后做某些操作。我们这里以await(long time, TimeUnit unit)方法为例,进行分析:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public final boolean await(long time, TimeUnit unit)
        throws InterruptedException {
    //将时间转换成纳秒
    long nanosTimeout = unit.toNanos(time);
    //判断线程状态,如果处于中断状态,那么直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //将线程添加到条件队列
    Node node = addConditionWaiter();
    //释放锁
    int savedState = fullyRelease(node);
    //过期时间
    final long deadline = System.nanoTime() + nanosTimeout;
    //是否超时,false没有超时,true超时
    boolean timedout = false;
    int interruptMode = 0;
    //判断当前线程对应结点是否存在于同步队列中,如果存在于同步队列说明存在某一线程调用了
    //signal方法将当前线程对应结点转移到了同步队列
    while (!isOnSyncQueue(node)) {
      	//到了时间
        if (nanosTimeout <= 0L) {
            //到了时间之后会取消等待,取消等待之后会调用transferAfterCancelledWait方法
            //如果该方法返回true说明当前结点被成功转移到同步队列
            //如果返回false说明当前结点早已被放入条件队列中,那么就没有超时,返回flse
            timedout = transferAfterCancelledWait(node);
            break;
        }
      	//如果设置的超时时间小于指定时间,那么就不走parkNanos,直接自旋
      	//spinForTimeoutThreshold 值为1000
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
      	//剩余时间
        nanosTimeout = deadline - System.nanoTime();
    }
    //后面和await()方法相同操作
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

看下来,这里其实没有特别复杂的逻辑,无非就是线程挂起指定时间,然后判断是否超时,如果超时将结点转移到同步队列去争抢锁。

2.线程取消排队

到现在我们都说的是线程获取锁以及线程排队,那么线程如果取消排队将会发生什么呢?相信有心的会记得,在await()方法中存在两处清理条件队列中已经取消排队结点的地方(清理方法为unlinkCancelledWaiters()),一个是在添加新的结点到条件队列的时候,那里的判断如果满足条件会去清理队列中状态为已经取消的结点,另外一个是在线程被唤醒的时候,那里的判断如果满足也会去清理。但是,线程取消排队是如何进行的?

其实取消排队的方式你应该见过了,那就是最简单直接的使用线程中断方式,上面说线程被唤醒的三种方式的时候有提到过线程中断是会唤醒线程的,而此时线程的状态会被标记为中断状态,这样该线程在调用某些方法,比如lockInterruptibly()方法的时候就会直接抛出异常。

既然到了这里,那么就顺便提一下cancelAcquire()方法,该方法在acquireQueued()方法内因为发生异常的时候而被调用。下面我们来看看该方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    //前驱结点状态为cancle时,会继续寻找前面的结点
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
  	//移除当前结点
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
      	//如果当前结点存在后继结点,那么将前缀结点直接指向后缀结点
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
          	//直接唤醒后继者
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

这个方法没有特别需要分析的地方,结点如果被取消,那么将当前结点状态设置为CANCELLED,之后会在某些情况下将这些结点清除出去。

总结

本文主要是承接上文,针对公平锁和非公平锁进行了对比分析,从分析结果来看,其实仅仅有两个地方的区别,非公平锁条件下,线程在调用lock方法的时候直接去尝试更改锁的状态和拥有者,这里压根没有进同步队列,另外一个是非公平锁条件下,线程获取锁时不会去判断条件队列中是否存在阻塞的线程,而是直接去获取锁。

除此之外,我从Condition的使用方式出发,分析了await()方法和singal()方法的源码,我们需要注意的是线程在调用await()方法时会释放锁,而调用singal()方法的时候仅仅将放在条件队列中的结点转移到了同步队列中,并没有唤醒线程的操作。

另外,我还针对LockSupport.park(this)能够继续往下执行的三种方式进行了总结,希望你能够记住和理解。

Licensed under CC BY-NC-SA 4.0
comments powered by Disqus