序
前面花费了两篇文章来讲独占锁的相关源码,那么这篇文章我将来讲一讲共享锁,所谓共享锁,指的是该锁可以被多个线程同时持有,能够并发地访问资源,如ReadWriteLock
中的读锁就是共享锁。
本篇文章我会先用CountDownLatch
来说说共享模式,然后顺便把其他AQS
的相关类CyclicBarrier
、Semaphore
的源码一起说一下。
CountDownLatch
CountDownLatch
这个类是典型的共享模式使用,属于比较常用的类,同时也是面试常考类,比如经典的问多线程条件下,如何让多线程同时执行某段程序,就可以用到CountDownLatch
。
例
我们先来看一段代码,看看CountDownLatch
的用法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
class Driver2 {
void main() throws InterruptedException {
//初始化CountDownLatch
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = Executors.newFixedThreadPool(8);
//创建N个任务让线程池来运行
for (int i = 0; i < N; ++i)
e.execute(new WorkerRunnable(doneSignal, i));
//每一个线程在执行到这个地方的时候都会等待,直到最后一个线程到这里,方法才结束
doneSignal.await();
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
//任务完成之后调用countDown()方法
doneSignal.countDown();
} catch (InterruptedException ex) {
}
}
void doWork(int i) { ...}
}
|
从上面的例子中我们可以看出,CountDownLatch
主要用来讲一个大的任务拆分成由多个线程来执行,等最后一个线程执行完成之后,再往下执行其他操作。所以上面提到的面试题是不是就可以来解答了?我们可以使用两个CountDownLatch
来实现,先让每一个线程都在同一地点等待,然后调用countDown()
方法让所有线程同时执行,这样是不是就实现了?我们来看看代码怎么写的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
public class CountDownLatchTest {
private final static CountDownLatch START_SIGNAL = new CountDownLatch(1);
private final static CountDownLatch DONE_SIGNAL = new CountDownLatch(8);
public static void main(String[] args) throws InterruptedException {
CountDownLatchTest countDownLatchTest = new CountDownLatchTest();
Executor pool = Executors.newFixedThreadPool(4);
for (int i = 0; i < 8; i++) {
pool.execute(countDownLatchTest::run);
}
//休眠是为了让线程都能够到START_SIGNAL.await();方法处
TimeUnit.SECONDS.sleep(1);
//唤醒所有线程
START_SIGNAL.countDown();
//所有线程执行完成之后都在这里等待,直到最后一个线程执行完成
DONE_SIGNAL.await();
System.out.println("完成!");
}
private void run() {
try {
START_SIGNAL.await();
doWork();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
DONE_SIGNAL.countDown();
}
}
private void doWork() {
System.out.println("doWork!");
}
}
|
从上面两个例子,我们不难发现,其实CountDownLatch
类真的是类如其名,它真的是一个栅栏,它的目的很明确,就是先拦截线程,让所有线程都在同一起跑线上,然后打开栅栏让线程同时去做任务,是不是脑海里那画面就来了?
源码分析
聊完功能和使用方式,咱们还是言归正传,看看Doug Lea
大叔的代码是怎样写的,毕竟
Talk is chep, show me the code~
我们从构造方法开始看:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public CountDownLatch(int count) {
//输入的值小于0直接抛出异常
if (count < 0) throw new IllegalArgumentException("count < 0");
//内部初始化了一个Sync类
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
//是不是感觉很熟悉?没错这里还是用到了同步队列中的state字段
setState(count);
}
...
}
|
其实不得不感叹Doug Lea
的是真的厉害,能够使用一个state
变量将独占锁和共享锁玩出花来,我们先来想一想,既然这里是设置了state
字段了,那么后面调用countDown()
方法的时候就是对state
字段做减法操作了,等到state
的值为1
的时候,当前线程在做完减法操作之后,还需要负责唤醒所有调用了await()
方法的线程,有没有感觉很巧妙?有没有那种继续分析下去的冲动?
那么我们继续往下看……
先来分析await()
方法,顾名思义,该方法表示线程阻塞,等待被唤醒。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//这个很常见了,如果线程中断,那么抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//如果tryAcquireShared(arg) >= 0那么将不做任何操作,否则往下看
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//添加一个状态为SHARED的结点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {//自旋
//获取前驱结点
final Node p = node.predecessor();
if (p == head) {
//如果前驱结点为头结点,判断state值,只要state值不等于0,就返回-1,否则返回1
int r = tryAcquireShared(arg);
if (r >= 0) {
//到这里,说明state值为0,那么唤醒结点
//需要注意的是这里正常情况下一次循环肯定无法执行,因为正常情况都是先设置好
//栅栏,然后等待线程过来,所以大部分线程都是走下面的判断条件,然后挂起线程
//那么这里什么是否会走呢?我先卖个关子,等到了countDown()方法就知道了
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//线程到这里线程会被挂起,第一篇文章中有分析过,这里不再赘述,这里会把node前驱结点p
//状态设置为-1
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
|
到这里await()
方法已经被分析完了,其中最主要的在r >= 0
的条件下线程如何处理的我们还没有去分析,因为这里的状态和countDown()
方法有关,所以我们先去看看countDown()
方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//如果tryReleaseShared()返回false那么直接结束,否则会执行doReleaseShared()方法
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//这里逻辑其实就是对state减1,如果state值为0那么返回true,调用doReleaseShared()方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {//自旋
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
private void doReleaseShared() {
for (;;) {//自旋
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//ws为0说明其后继结点还没有设置前驱结点status为-1
//正常情况下head statue值为-1即为Signal
if (ws == Node.SIGNAL) {
//将head的status设置为0,这只执行成功说明不存在线程与其竞争唤醒,执行失败说明存在竞争
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒头结点的后继结点,这里唤醒其实就会导致await()方法中挂起的线程被唤醒
unparkSuccessor(h);
}
//存在一种状态ws的值为0
//当前结点为头结点,且当前结点虽然有后继结点,但是还没有修改或者已经修改了head的status
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//这里PROPAGATE要注意
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
|
针对countDown()
方法,我认为其中最重要的地方有两点,第一点是在unparkSuccessor()
方法,第二点是在compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
这个地方,第一点在我们之前的文章中有过详细分析,有兴趣的可以看一下这篇文章,下面我们重点分析一下compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
这行代码的意图。
其实这一行很怪,目前还没有真正分析出PROPAGATE
的真实意义,所以只能尝试分析一下
我们先来讨论一下在什么条件下,头结点的状态值为0
,首先能够走到else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
这里,说明头结点是存在后继结点的,但是后继结点是没有把它的前驱结点(即头结点)的waitStatus
值设置为-1
的,看了一下代码,后继结点的线程大概执行的位置如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//步骤1.将当前线程对应结点添加到队列尾部,此时没有设置其前驱结点状态
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//步骤2.获取前驱结点
final Node p = node.predecessor();
if (p == head) {
//步骤3.尝试获取锁,state值为0返回1,否则返回-1
int r = tryAcquireShared(arg);
if (r >= 0) {
//步骤4.唤醒后继结点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//步骤5.更改前驱结点值为-1
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
|
我们先对线程命个名,执行await()
方法的线程为线程A
,执行countDown()
方法的线程为线程B
,继续分析,线程B
因为进入了countDown()
方法的内部的前提是线程A
已经进入了await()
方法内部,且上面方法中步骤1
已经走完了,同时还没有走到步骤5
,所以此时才会有线程A
走到else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
处,此时会存在两种情况:
- 情境一:线程
A
不走步骤4
;线程A
走到完步骤3
的时候,线程B
才进入到countDown()
方法内部的doReleaseShared()
方法,如下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
private void doReleaseShared() {
for (;;) {
//线程A走完步骤3,线程B走到这里,此时head对应状态为0
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//直接到这里,此时如果线程A还没有走步骤5中的设置前驱结点状态,那么这里线程B设置成功
//这样线程B执行完直接退出
//如果线程A走完了步骤5中的设置前驱结点状态,那么这里执行失败,继续循环
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
|
- 情境二:线程
A
走步骤4
;线程A
在走步骤3
之前,线程B
已经进入到了countDown()
方法,更改了队列state
值为0
,那么此时线程A
就会走步骤4
,(这里打断一下,还记得上面我们卖的关子吗,在分析await()
源码时有分析到何时会走步骤4
,这里就走到了),那么我们来看看步骤4
中的关键方法setHeadAndPropagate()
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
private void setHeadAndPropagate(Node node, int propagate) {
//旧的头结点
Node h = head; // Record old head for check below
//设置当前结点为头结点
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//当前结点的后继结点
Node s = node.next;
if (s == null || s.isShared())
//又调用了doReleaseShared()
doReleaseShared();
}
}
|
在setHeadAndPropagate()
方法中,我们看到其实仅仅是设置了当前结点为头结点,然后又调用了doReleaseShared()
方法来唤醒后继结点,注意此时线程B
有可能还是在doReleaseShared()
方法中自旋的,所以就有可能同时出现两个线程唤醒同一个后继结点,当然只会存在一个线程唤醒成功。
到此countDown()
和await()
都已经分析完了,但是PROPAGATE
这个状态值我还没有分析,下面我们来看看。
首先,我先说我的结论,在我感觉下来这里的PROPAGATE
不能被替换成SINGAL
,但是变成0
我认为没有问题,因为在我看来这里设置成特殊状态是有两个目的,一个是为了判断结点的状态,另外一个是为了线程A
在执行步骤5
的时候能够成功设置头结点的状态为-1
,方便自旋,我还是把步骤5
中的方法给拿过来:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果结点状态为-1,那么返回true,导致该线程挂起
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//结点状态为0或者-3都会走这里,然后返回false,避免该线程挂起后没有线程唤醒它
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
|
从上图我们可以看到,如果线程B
把头结点的waitStatus
被修改成了-1
,那么这里会直接返回true
,然后挂起,这样这个线程是无法被唤醒的,所以肯定不能设置成-1
,然后设置成0
我觉得可以,但是没有找到任何论据支撑,还望有缘人能够指点一番。
CyclicBarrier
聊完CountDownLatch
之后,咱们接着来聊聊CyclicBarrier
,CyclicBarrier
字面意思是“可以重复使用的栅栏”,和CountDownLatch
相比,最明显的区别是它可以重复使用,同时它是基于Condition
实现的,而我们上面介绍的CountDownLatch
是基于AQS
共享模式来时先的,因为是基于Condition
来实现,所以如果知道Condition
原理,那么它的源码读起来就比较简单,如果还不知道Condition
用法和原理,可以看看我的上一篇文章。言归正传,我们开始分析一下CyclicBarrier
的实现原理。
例
我们还是先来看一个例子,学习一下CyclicBarrier
的使用方式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
package condition;
import java.util.concurrent.CyclicBarrier;
/**
* @author yinan
* @date 2020/4/2
*/
public class CyclicBarrierDemo {
static class TaskThread extends Thread {
CyclicBarrier barrier;
public TaskThread(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(getName() + " 到达栅栏 A");
//所有线程第一次都执行到这里时,最后一个线程会执行设置好的方法
barrier.await();
System.out.println(getName() + " 冲破栅栏 A");
Thread.sleep(2000);
//重复使用
System.out.println(getName() + " 到达栅栏 B");
barrier.await();
System.out.println(getName() + " 冲破栅栏 B");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int threadNum = 5;
//构造方法,两个参数,第一个参数是线程数,第二个参数是最后一个线程执行的方法
CyclicBarrier barrier = new CyclicBarrier(threadNum,
() -> System.out.println(Thread.currentThread().getName() + " 完成最后任务"));
for(int i = 0; i < threadNum; i++) {
new TaskThread(barrier).start();
}
}
}
|
从上述例子中,我们可以看到,CyclicBarrier
功能上和CountDownLatch
差不多,只不过它能够在线程统一到达某点之后,会去执行预先设定好的方法,使用起来更加简单。
源码分析
看完例子,我们开始来分析源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public class CyclicBarrier {
//CyclicBarrier可以重复使用,所以会将每一次开始到所有线程穿过栅栏叫做一个Genaration
private static class Generation {
boolean broken = false;
}
private final ReentrantLock lock = new ReentrantLock();
//CyclicBarrier基于Condition实现
private final Condition trip = lock.newCondition();
//线程数
private final int parties;
//穿越栅栏之前需要执行的操作
private final Runnable barrierCommand;
private Generation generation = new Generation();
//还没有到达栅栏的线程数,即parties - 已经到达的线程数
private int count;
public CyclicBarrier(int parties, Runnable barrierAction) {
//线程数小于等于0直接抛出异常
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
|
我们还是从await()
方法出发,来看看
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//当前所处周期
final Generation g = generation;
//周期被破坏会抛出异常
if (g.broken)
throw new BrokenBarrierException();
//线程中断,破坏周期,抛出异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//从剩余未穿过栅栏的线程数量中减1
int index = --count;
//如果数量为0,说明所有线程到达栅栏
if (index == 0) { // tripped,
boolean ranAction = false;
try {
//如果初始化的时候,指定了通过栅栏前需要执行的操作,在这里会得到执行
final Runnable command = barrierCommand;
if (command != null)
command.run();
//ranAction为true说明执行command没有异常退出
ranAction = true;
//唤醒线程,开启下一周期
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
//这里是非最后一个线程调用await方法时会走的逻辑
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
//如果带有超时机制,调用带有超时的await方法
if (!timed)
//非超时await
trip.await();
else if (nanos > 0L)
//超时await
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//到这里说明线程在await时候被中断
if (g == generation && ! g.broken) {
//打破栅栏
breakBarrier();
//抛出异常
throw ie;
} else {
//到这里要么g != generation要么g.broken
//第一种情况说明最后一个线程执行完成,开启了新一代,那么没有必要抛出异常
//记录当前线程状态即可
//第二种情况也不应该在这里抛出线程中断异常,而是下面的BrokenBarrierException异常
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
//检查栅栏是否被打破
throw new BrokenBarrierException();
//这个for循环除了异常,就是要从这里退出了
//因为最后一个线程完成任务之后,会调用nextGeneration来开启新一代,然后释放锁
//其他线程从Condition中得到锁并返回,然后到这里的时候,就会满足g != generation
//那么什么时候不满足呢?当最后一个线程执行command的时候抛出了一场,那么就会执行打破栅栏操作
//设置broken为true,然后唤醒这些线程,这些线程会从上面的判断条件中抛出异常
//除此之外,还会因为超时原因,这是既不会抛出异常,也不会从下面return回去,而是从下面的判断条件
//出去
if (g != generation)
return index;
//发现超时了,打破栅栏,然后抛出异常(打破栅栏的意思就是不能继续开启下一周期)
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// signal completion of last generation
//唤醒所有线程
trip.signalAll();
// set up next generation
//重置剩余未到达栅栏线程数
count = parties;
//开启下一周期
generation = new Generation();
}
|
到此,await()
方法已经分析完毕,下面我们来分析分析剩下来的代码。
首先,我们怎么得到又多少个线程到了栅栏上,处于等待状态,也就是获取parties - count
的值:
1
2
3
4
5
6
7
8
9
|
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
|
然后,判断一个栅栏是否被打破,这个直接看broken
的值即可:
1
2
3
4
5
6
7
8
9
|
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
|
前面在讨论await()
方法的时候已经介绍了栅栏在何时会被打破,下面我们来回顾回顾:
- 中断,如果等待的先层发生了中断,那么会打破栅栏,同时抛出
InterruptedException
异常;
- 超时,打破栅栏,抛出
TimeoutException
异常;
- 指定执行的操作抛出了一场,也就是最后一个线程执行
command
中的任务时抛出异常。
最后,我们来看看下面情况发生的时候会导致什么情况:
先看下面代码,该代码片段是关于重置栅栏的:
1
2
3
4
5
6
7
8
9
10
11
12
|
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//打破旧的栅栏
breakBarrier(); // break the current generation
//开启新的周期
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
|
如果此时有三个线程,前两个线程正常执行并await()
挂起了,然后此时第三个线程在执行await()
方法之前,调用了reset()
方法,此时会发生什么呢?
我们来分析分析,首先reset()
方法内部会打破栅栏,同时开启新的周期,所以其他线程会因为新的周期关系被唤醒,而此时前两个线程因为栅栏状态为broken
,所以会抛出异常返回,之后又因为重新开启周期关系,如果后续还存在调用await()
方法,那么就会开启下一周期。
注意:broken
属性是Generation
的属性,而非CyclicBarrier
类的属性,所以当开启新的周期的时候,又会被重置为false
。
Semaphore
接下来,我们来分析最后一个类:Semaphore
,有了CountDownLatch
的基础之后,分析Semaphore
会简单很多;那么Semaphore
是什么呢?它类似一个资源池(可以比作线程池),每个线程需要调用acquire()
方法获取资源之后,然后才能够执行,执行完之后,需要release
资源,让给其他线程用。
有没有感觉Semaphore
很熟悉?感觉好像在哪见过?没错,在高并发条件下针对突然涌入的大流量,我们的服务器可能无法支撑,那么就需要进行一些限流措施,其中有三种限流措施,分别是:信号量(Semaphore)、漏桶算法以及令牌桶算法。其中令牌桶算法思路和信号量有点相似,都是存在获取到资源(信号或者令牌)之后,才会真正的执行某项措施。
言归正传,我们开始分析Semaphore
源码,
首先分析分析它的实现思路:初始化的时候给Semaphore
设置一个值,其实就是AQS
中的state
,然后当调用acquire()
方法的时候,执行state - 1
操作,调用release()
方法的时候,执行state + 1
操作,当state
等于0
的时候阻塞。
例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
package condition;
/**
* @author yinan
* @date 2020/4/4
*/
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreTest {
public static void main(String[] args) {
//实例化Semaphore,并设置许可证为1个
Semaphore semaphore = new Semaphore(1);
new Thread(() -> {
try {
//尝试在一秒内获取一个许可证
if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
System.out.println("甲:获取许可证成功。");
//延迟五秒,然后使用release()方法释放该许可证
System.out.println("甲:五秒后释放许可证。");
Thread.sleep(5000);
//释放许可证
System.out.println("甲:许可证已释放。");
semaphore.release();
} else {
System.out.println("甲:获取许可证失败。");
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
//尝试再次获取一个许可证
System.out.println("乙:正在获取许可证,超时为:10秒。");
if (semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
System.out.println("乙:获取许可证成功。");
} else {
System.out.println("乙:获取许可证失败。");
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
|
我们还是先从例子入手,看看Semaphore
的使用方式,从示例中我们可以看到,当其中一个线程获取到许可证的时候,另外一个线程在等待指定时间之后如果还没有获取到线程,就会退出获取许可证。示例很简单,没有特别需要分析的。
源码分析
构造方法
1
2
3
4
5
6
7
|
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
|
可以看到,这里使用了和ReentrantLock
一样的机制,使用了公平策略和非公平策略两种方式。
接下来看看acquire()
方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
//小于0说明无法获取到许可证,那么该线程挂起
doAcquireSharedInterruptibly(arg);
}
|
这几个方法的套路相信大家都见怪不怪了,最后都是调用tryAcquireShared()
方法判断状态,然后执行doAcquireSharedInterruptibly()
方法,其中tryAcquireShared()
方法分为公平锁和非公平锁两种策略,而doAcquireSharedInterruptibly()
和CountDownLatch
中的公用,所以我们来对比tryAcquireShared()
方法在两种策略下的区别:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
//公平锁策略
protected int tryAcquireShared(int acquires) {
for (;;) {
//区别在于公平锁会先判断是否有线程在排队,然后再进行cas操作
if (hasQueuedPredecessors())
return -1;
//获取可用state
int available = getState();
//减去当前需要的
int remaining = available - acquires;
//remaining值小于0表示当前线程无法获取到许可证
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//非公平锁策略
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
|
从代码上看,基本上公平锁和非公平锁的区别仅仅是在一两行代码上,到这里acquire()
方法也就分析完了,看下来还是挺简单的,如果有不清晰的可以翻看我的前两篇文章。
接下来我们再分析分析release()
方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
public void release() {
//释放一个资源
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow,整型范围内的溢出
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
//在上面的CountDownLatch类中的countDown()方法中有分析过
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
|
可以看到,`Semaphore源码还是挺简单的,基本上都是针对我们之前分析代码的组合。
总结
到此,我花费了三篇文章来分析的AQS
源码也结束了,整个分析下来,第一要感叹Doug Lea
真的是神一样的存在,能够把代码写得如此巧妙;第二我们还是需要静下心来好好学习学习这样优秀的代码,任何人总是能够从这样的代码中获益。