Featured image of post AbstractQueuedSynchronizer 源码(三)

AbstractQueuedSynchronizer 源码(三)

分析 CountDownLatch、CyclicBarrier 以及 Semaphore 源码,整个系列总结

前面花费了两篇文章来讲独占锁的相关源码,那么这篇文章我将来讲一讲共享锁,所谓共享锁,指的是该锁可以被多个线程同时持有,能够并发地访问资源,如ReadWriteLock中的读锁就是共享锁。

本篇文章我会先用CountDownLatch来说说共享模式,然后顺便把其他AQS的相关类CyclicBarrierSemaphore的源码一起说一下。

CountDownLatch

CountDownLatch这个类是典型的共享模式使用,属于比较常用的类,同时也是面试常考类,比如经典的问多线程条件下,如何让多线程同时执行某段程序,就可以用到CountDownLatch

我们先来看一段代码,看看CountDownLatch的用法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Driver2 { 
    void main() throws InterruptedException {
      	//初始化CountDownLatch
        CountDownLatch doneSignal = new CountDownLatch(N);
        Executor e = Executors.newFixedThreadPool(8);
      	//创建N个任务让线程池来运行
        for (int i = 0; i < N; ++i) 
            e.execute(new WorkerRunnable(doneSignal, i));
      	//每一个线程在执行到这个地方的时候都会等待,直到最后一个线程到这里,方法才结束
        doneSignal.await();
}


class WorkerRunnable implements Runnable { 
    private final CountDownLatch doneSignal;
    private final int i;
        
    WorkerRunnable(CountDownLatch doneSignal, int i) { 
        this.doneSignal = doneSignal;
        this.i = i; 
    }
        
    public void run() { 
        try { 
            doWork(i);
            //任务完成之后调用countDown()方法
            doneSignal.countDown(); 
        } catch (InterruptedException ex) { 
            
        } 
    }
    
    void doWork(int i) { ...}
}

从上面的例子中我们可以看出,CountDownLatch主要用来讲一个大的任务拆分成由多个线程来执行,等最后一个线程执行完成之后,再往下执行其他操作。所以上面提到的面试题是不是就可以来解答了?我们可以使用两个CountDownLatch来实现,先让每一个线程都在同一地点等待,然后调用countDown()方法让所有线程同时执行,这样是不是就实现了?我们来看看代码怎么写的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class CountDownLatchTest {
    private final static CountDownLatch START_SIGNAL = new CountDownLatch(1);

    private final static CountDownLatch DONE_SIGNAL = new CountDownLatch(8);

    public static void main(String[] args) throws InterruptedException {
        CountDownLatchTest countDownLatchTest = new CountDownLatchTest();
        Executor pool = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 8; i++) {
            pool.execute(countDownLatchTest::run);
        }
      	//休眠是为了让线程都能够到START_SIGNAL.await();方法处
        TimeUnit.SECONDS.sleep(1);
      	//唤醒所有线程
        START_SIGNAL.countDown();
      	//所有线程执行完成之后都在这里等待,直到最后一个线程执行完成
        DONE_SIGNAL.await();
        System.out.println("完成!");
    }

    private void run() {
        try {
            START_SIGNAL.await();
            doWork();
        } catch (InterruptedException e) {
             e.printStackTrace();
        } finally {
            DONE_SIGNAL.countDown();
        }
    }

    private void doWork() {
        System.out.println("doWork!");
    }
}

从上面两个例子,我们不难发现,其实CountDownLatch类真的是类如其名,它真的是一个栅栏,它的目的很明确,就是先拦截线程,让所有线程都在同一起跑线上,然后打开栅栏让线程同时去做任务,是不是脑海里那画面就来了?

源码分析

聊完功能和使用方式,咱们还是言归正传,看看Doug Lea大叔的代码是怎样写的,毕竟

Talk is chep, show me the code~

我们从构造方法开始看:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public CountDownLatch(int count) {
    //输入的值小于0直接抛出异常
    if (count < 0) throw new IllegalArgumentException("count < 0");
    //内部初始化了一个Sync类
    this.sync = new Sync(count);
}


private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
		
    Sync(int count) {
        //是不是感觉很熟悉?没错这里还是用到了同步队列中的state字段
        setState(count);
    }

    ...
}

其实不得不感叹Doug Lea的是真的厉害,能够使用一个state变量将独占锁和共享锁玩出花来,我们先来想一想,既然这里是设置了state字段了,那么后面调用countDown()方法的时候就是对state字段做减法操作了,等到state的值为1的时候,当前线程在做完减法操作之后,还需要负责唤醒所有调用了await()方法的线程,有没有感觉很巧妙?有没有那种继续分析下去的冲动?

那么我们继续往下看……

先来分析await()方法,顾名思义,该方法表示线程阻塞,等待被唤醒。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    //这个很常见了,如果线程中断,那么抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果tryAcquireShared(arg) >= 0那么将不做任何操作,否则往下看
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //添加一个状态为SHARED的结点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {//自旋
            //获取前驱结点
            final Node p = node.predecessor();
            if (p == head) {
              	//如果前驱结点为头结点,判断state值,只要state值不等于0,就返回-1,否则返回1
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //到这里,说明state值为0,那么唤醒结点
                    //需要注意的是这里正常情况下一次循环肯定无法执行,因为正常情况都是先设置好
                    //栅栏,然后等待线程过来,所以大部分线程都是走下面的判断条件,然后挂起线程
                    //那么这里什么是否会走呢?我先卖个关子,等到了countDown()方法就知道了
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //线程到这里线程会被挂起,第一篇文章中有分析过,这里不再赘述,这里会把node前驱结点p
            //状态设置为-1
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

到这里await()方法已经被分析完了,其中最主要的在r >= 0的条件下线程如何处理的我们还没有去分析,因为这里的状态和countDown()方法有关,所以我们先去看看countDown()方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void countDown() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    //如果tryReleaseShared()返回false那么直接结束,否则会执行doReleaseShared()方法
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

//这里逻辑其实就是对state减1,如果state值为0那么返回true,调用doReleaseShared()方法
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {//自旋
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

private void doReleaseShared() {
    for (;;) {//自旋
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //ws为0说明其后继结点还没有设置前驱结点status为-1
            //正常情况下head statue值为-1即为Signal
            if (ws == Node.SIGNAL) {
              	//将head的status设置为0,这只执行成功说明不存在线程与其竞争唤醒,执行失败说明存在竞争
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
              	//唤醒头结点的后继结点,这里唤醒其实就会导致await()方法中挂起的线程被唤醒
                unparkSuccessor(h);
            }
          	//存在一种状态ws的值为0
          	//当前结点为头结点,且当前结点虽然有后继结点,但是还没有修改或者已经修改了head的status
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//这里PROPAGATE要注意
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

针对countDown()方法,我认为其中最重要的地方有两点,第一点是在unparkSuccessor()方法,第二点是在compareAndSetWaitStatus(h, 0, Node.PROPAGATE)这个地方,第一点在我们之前的文章中有过详细分析,有兴趣的可以看一下这篇文章,下面我们重点分析一下compareAndSetWaitStatus(h, 0, Node.PROPAGATE)这行代码的意图。

其实这一行很怪,目前还没有真正分析出PROPAGATE的真实意义,所以只能尝试分析一下

我们先来讨论一下在什么条件下,头结点的状态值为0,首先能够走到else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))这里,说明头结点是存在后继结点的,但是后继结点是没有把它的前驱结点(即头结点)的waitStatus值设置为-1的,看了一下代码,后继结点的线程大概执行的位置如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //步骤1.将当前线程对应结点添加到队列尾部,此时没有设置其前驱结点状态
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            //步骤2.获取前驱结点
            final Node p = node.predecessor();
            if (p == head) {
                //步骤3.尝试获取锁,state值为0返回1,否则返回-1
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //步骤4.唤醒后继结点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //步骤5.更改前驱结点值为-1
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

我们先对线程命个名,执行await()方法的线程为线程A,执行countDown()方法的线程为线程B,继续分析,线程B因为进入了countDown()方法的内部的前提是线程A已经进入了await()方法内部,且上面方法中步骤1已经走完了,同时还没有走到步骤5,所以此时才会有线程A走到else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))处,此时会存在两种情况:

  • 情境一:线程A不走步骤4;线程A走到完步骤3的时候,线程B才进入到countDown()方法内部的doReleaseShared()方法,如下代码:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void doReleaseShared() {
    for (;;) {
      	//线程A走完步骤3,线程B走到这里,此时head对应状态为0
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            //直接到这里,此时如果线程A还没有走步骤5中的设置前驱结点状态,那么这里线程B设置成功
            //这样线程B执行完直接退出
            //如果线程A走完了步骤5中的设置前驱结点状态,那么这里执行失败,继续循环
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
  • 情境二:线程A走步骤4;线程A在走步骤3之前,线程B已经进入到了countDown()方法,更改了队列state值为0,那么此时线程A就会走步骤4,(这里打断一下,还记得上面我们卖的关子吗,在分析await()源码时有分析到何时会走步骤4,这里就走到了),那么我们来看看步骤4中的关键方法setHeadAndPropagate()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
private void setHeadAndPropagate(Node node, int propagate) {
    //旧的头结点
    Node h = head; // Record old head for check below
    //设置当前结点为头结点
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        //当前结点的后继结点
        Node s = node.next;
        if (s == null || s.isShared())
            //又调用了doReleaseShared()
            doReleaseShared();
    }
}

setHeadAndPropagate()方法中,我们看到其实仅仅是设置了当前结点为头结点,然后又调用了doReleaseShared()方法来唤醒后继结点,注意此时线程B有可能还是在doReleaseShared()方法中自旋的,所以就有可能同时出现两个线程唤醒同一个后继结点,当然只会存在一个线程唤醒成功。

到此countDown()await()都已经分析完了,但是PROPAGATE这个状态值我还没有分析,下面我们来看看。

首先,我先说我的结论,在我感觉下来这里的PROPAGATE不能被替换成SINGAL,但是变成0我认为没有问题,因为在我看来这里设置成特殊状态是有两个目的,一个是为了判断结点的状态,另外一个是为了线程A在执行步骤5的时候能够成功设置头结点的状态为-1,方便自旋,我还是把步骤5中的方法给拿过来:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        //如果结点状态为-1,那么返回true,导致该线程挂起
        return true;
    if (ws > 0) {
        
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
        * waitStatus must be 0 or PROPAGATE.  Indicate that we
        * need a signal, but don't park yet.  Caller will need to
        * retry to make sure it cannot acquire before parking.
        */
      	//结点状态为0或者-3都会走这里,然后返回false,避免该线程挂起后没有线程唤醒它
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

从上图我们可以看到,如果线程B把头结点的waitStatus被修改成了-1,那么这里会直接返回true,然后挂起,这样这个线程是无法被唤醒的,所以肯定不能设置成-1,然后设置成0我觉得可以,但是没有找到任何论据支撑,还望有缘人能够指点一番。

CyclicBarrier

聊完CountDownLatch之后,咱们接着来聊聊CyclicBarrierCyclicBarrier字面意思是“可以重复使用的栅栏”,和CountDownLatch相比,最明显的区别是它可以重复使用,同时它是基于Condition实现的,而我们上面介绍的CountDownLatch是基于AQS共享模式来时先的,因为是基于Condition来实现,所以如果知道Condition原理,那么它的源码读起来就比较简单,如果还不知道Condition用法和原理,可以看看我的上一篇文章。言归正传,我们开始分析一下CyclicBarrier的实现原理。

我们还是先来看一个例子,学习一下CyclicBarrier的使用方式。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package condition;

import java.util.concurrent.CyclicBarrier;

/**
 * @author yinan
 * @date 2020/4/2
 */
public class CyclicBarrierDemo {

    static class TaskThread extends Thread {

        CyclicBarrier barrier;

        public TaskThread(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println(getName() + " 到达栅栏 A");
              	//所有线程第一次都执行到这里时,最后一个线程会执行设置好的方法
                barrier.await();
                System.out.println(getName() + " 冲破栅栏 A");

                Thread.sleep(2000);
              	//重复使用
                System.out.println(getName() + " 到达栅栏 B");
                barrier.await();
                System.out.println(getName() + " 冲破栅栏 B");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        int threadNum = 5;
      	//构造方法,两个参数,第一个参数是线程数,第二个参数是最后一个线程执行的方法
        CyclicBarrier barrier = new CyclicBarrier(threadNum, 
                () -> System.out.println(Thread.currentThread().getName() + " 完成最后任务"));

        for(int i = 0; i < threadNum; i++) {
            new TaskThread(barrier).start();
        }
    }

}

从上述例子中,我们可以看到,CyclicBarrier功能上和CountDownLatch差不多,只不过它能够在线程统一到达某点之后,会去执行预先设定好的方法,使用起来更加简单。

源码分析

看完例子,我们开始来分析源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class CyclicBarrier {
    //CyclicBarrier可以重复使用,所以会将每一次开始到所有线程穿过栅栏叫做一个Genaration
    private static class Generation {
        boolean broken = false;
    }

    private final ReentrantLock lock = new ReentrantLock();

  	//CyclicBarrier基于Condition实现
    private final Condition trip = lock.newCondition();
		
  	//线程数
    private final int parties;
    //穿越栅栏之前需要执行的操作
    private final Runnable barrierCommand;
	
    private Generation generation = new Generation();
    //还没有到达栅栏的线程数,即parties - 已经到达的线程数
    private int count;

    public CyclicBarrier(int parties, Runnable barrierAction) {
      	//线程数小于等于0直接抛出异常
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

我们还是从await()方法出发,来看看

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}


private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      	//当前所处周期
        final Generation g = generation;
        //周期被破坏会抛出异常
        if (g.broken)
            throw new BrokenBarrierException();
        //线程中断,破坏周期,抛出异常
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        //从剩余未穿过栅栏的线程数量中减1
        int index = --count;
      	//如果数量为0,说明所有线程到达栅栏
        if (index == 0) {  // tripped,
            boolean ranAction = false;
            try {
              	//如果初始化的时候,指定了通过栅栏前需要执行的操作,在这里会得到执行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
              	//ranAction为true说明执行command没有异常退出
                ranAction = true;
              	//唤醒线程,开启下一周期
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }
				//这里是非最后一个线程调用await方法时会走的逻辑
        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
              	//如果带有超时机制,调用带有超时的await方法
                if (!timed)
                    //非超时await
                    trip.await();
                else if (nanos > 0L)
                    //超时await
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
              	//到这里说明线程在await时候被中断
                if (g == generation && ! g.broken) {
                    //打破栅栏
                    breakBarrier();
                    //抛出异常
                    throw ie;
                } else {
                    //到这里要么g != generation要么g.broken
                    //第一种情况说明最后一个线程执行完成,开启了新一代,那么没有必要抛出异常
                    //记录当前线程状态即可
                    //第二种情况也不应该在这里抛出线程中断异常,而是下面的BrokenBarrierException异常
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
              	//检查栅栏是否被打破
                throw new BrokenBarrierException();
                //这个for循环除了异常,就是要从这里退出了
                //因为最后一个线程完成任务之后,会调用nextGeneration来开启新一代,然后释放锁
                //其他线程从Condition中得到锁并返回,然后到这里的时候,就会满足g != generation
                //那么什么时候不满足呢?当最后一个线程执行command的时候抛出了一场,那么就会执行打破栅栏操作
                //设置broken为true,然后唤醒这些线程,这些线程会从上面的判断条件中抛出异常
                //除此之外,还会因为超时原因,这是既不会抛出异常,也不会从下面return回去,而是从下面的判断条件
                //出去
            if (g != generation)
                return index;
            //发现超时了,打破栅栏,然后抛出异常(打破栅栏的意思就是不能继续开启下一周期)
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

private void nextGeneration() {
    // signal completion of last generation
    //唤醒所有线程
    trip.signalAll();
    // set up next generation
    //重置剩余未到达栅栏线程数
    count = parties;
    //开启下一周期
    generation = new Generation();
}

到此,await()方法已经分析完毕,下面我们来分析分析剩下来的代码。

首先,我们怎么得到又多少个线程到了栅栏上,处于等待状态,也就是获取parties - count的值:

1
2
3
4
5
6
7
8
9
public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return parties - count;
    } finally {
        lock.unlock();
    }
}

然后,判断一个栅栏是否被打破,这个直接看broken的值即可:

1
2
3
4
5
6
7
8
9
public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

前面在讨论await()方法的时候已经介绍了栅栏在何时会被打破,下面我们来回顾回顾:

  • 中断,如果等待的先层发生了中断,那么会打破栅栏,同时抛出InterruptedException异常;
  • 超时,打破栅栏,抛出TimeoutException异常;
  • 指定执行的操作抛出了一场,也就是最后一个线程执行command中的任务时抛出异常。

最后,我们来看看下面情况发生的时候会导致什么情况:

先看下面代码,该代码片段是关于重置栅栏的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      	//打破旧的栅栏
        breakBarrier();   // break the current generation
      	//开启新的周期
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

如果此时有三个线程,前两个线程正常执行并await()挂起了,然后此时第三个线程在执行await()方法之前,调用了reset()方法,此时会发生什么呢?

我们来分析分析,首先reset()方法内部会打破栅栏,同时开启新的周期,所以其他线程会因为新的周期关系被唤醒,而此时前两个线程因为栅栏状态为broken,所以会抛出异常返回,之后又因为重新开启周期关系,如果后续还存在调用await()方法,那么就会开启下一周期。

注意broken属性是Generation的属性,而非CyclicBarrier类的属性,所以当开启新的周期的时候,又会被重置为false

Semaphore

接下来,我们来分析最后一个类:Semaphore,有了CountDownLatch的基础之后,分析Semaphore会简单很多;那么Semaphore是什么呢?它类似一个资源池(可以比作线程池),每个线程需要调用acquire()方法获取资源之后,然后才能够执行,执行完之后,需要release资源,让给其他线程用。

有没有感觉Semaphore很熟悉?感觉好像在哪见过?没错,在高并发条件下针对突然涌入的大流量,我们的服务器可能无法支撑,那么就需要进行一些限流措施,其中有三种限流措施,分别是:信号量(Semaphore)漏桶算法以及令牌桶算法。其中令牌桶算法思路和信号量有点相似,都是存在获取到资源(信号或者令牌)之后,才会真正的执行某项措施。

言归正传,我们开始分析Semaphore源码,

首先分析分析它的实现思路:初始化的时候给Semaphore设置一个值,其实就是AQS中的state,然后当调用acquire()方法的时候,执行state - 1操作,调用release()方法的时候,执行state + 1操作,当state等于0的时候阻塞。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package condition;

/**
 * @author yinan
 * @date 2020/4/4
 */

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreTest {
    public static void main(String[] args) {
        //实例化Semaphore,并设置许可证为1个
        Semaphore semaphore = new Semaphore(1);
        
        new Thread(() -> {
            try {
                //尝试在一秒内获取一个许可证
                if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
                    System.out.println("甲:获取许可证成功。");
                    //延迟五秒,然后使用release()方法释放该许可证
                    System.out.println("甲:五秒后释放许可证。");
                    Thread.sleep(5000);
                    //释放许可证
                    System.out.println("甲:许可证已释放。");
                    semaphore.release();
                } else {
                    System.out.println("甲:获取许可证失败。");
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {

                //尝试再次获取一个许可证
                System.out.println("乙:正在获取许可证,超时为:10秒。");
                if (semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
                    System.out.println("乙:获取许可证成功。");
                } else {
                    System.out.println("乙:获取许可证失败。");
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        
    }
}

我们还是先从例子入手,看看Semaphore的使用方式,从示例中我们可以看到,当其中一个线程获取到许可证的时候,另外一个线程在等待指定时间之后如果还没有获取到线程,就会退出获取许可证。示例很简单,没有特别需要分析的。

源码分析

构造方法

1
2
3
4
5
6
7
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

可以看到,这里使用了和ReentrantLock一样的机制,使用了公平策略和非公平策略两种方式。

接下来看看acquire()方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}


public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
      	//小于0说明无法获取到许可证,那么该线程挂起
        doAcquireSharedInterruptibly(arg);
}

这几个方法的套路相信大家都见怪不怪了,最后都是调用tryAcquireShared()方法判断状态,然后执行doAcquireSharedInterruptibly()方法,其中tryAcquireShared()方法分为公平锁和非公平锁两种策略,而doAcquireSharedInterruptibly()CountDownLatch中的公用,所以我们来对比tryAcquireShared()方法在两种策略下的区别:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//公平锁策略
protected int tryAcquireShared(int acquires) {
    for (;;) {
      	//区别在于公平锁会先判断是否有线程在排队,然后再进行cas操作
        if (hasQueuedPredecessors())
            return -1;
      	//获取可用state
        int available = getState();
      	//减去当前需要的
        int remaining = available - acquires;
      	//remaining值小于0表示当前线程无法获取到许可证
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

//非公平锁策略
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

从代码上看,基本上公平锁和非公平锁的区别仅仅是在一两行代码上,到这里acquire()方法也就分析完了,看下来还是挺简单的,如果有不清晰的可以翻看我的前两篇文章。

接下来我们再分析分析release()方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void release() {
    //释放一个资源
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow,整型范围内的溢出
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}
//在上面的CountDownLatch类中的countDown()方法中有分析过
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

可以看到,`Semaphore源码还是挺简单的,基本上都是针对我们之前分析代码的组合。

总结

到此,我花费了三篇文章来分析的AQS源码也结束了,整个分析下来,第一要感叹Doug Lea真的是神一样的存在,能够把代码写得如此巧妙;第二我们还是需要静下心来好好学习学习这样优秀的代码,任何人总是能够从这样的代码中获益。

Licensed under CC BY-NC-SA 4.0
comments powered by Disqus