ReentrantLock及AQS源码分析

自己实现一个自旋锁

我们来写一个普通的自旋锁,然后再慢慢去改进它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 标识是否有线程加锁
volatile int status = 0;

void lock() {
while (!compareAndSet(0, 1)) {
}
//lock
}

void unlock() {
status = 0;
}

boolean compareAndSet(int except, int newValue) {
// cas操作,修改status成功则返回true
}

缺点:这样的自旋浪费CPU资源,没有拿到锁的线程会一直自旋。如果拿到锁的线程执行10s,那么其他线程白白浪费了10s的CPU资源。

解决思路:让得不到锁的线程让出CPU。

yield() + 自旋

1
2
3
4
5
6
7
8
9
10
11
12
volatile int status = 0;

void lock() {
while (!compareAndSet(0, 1)) {
Thread.yield();
}
//lock
}

void unlock() {
status = 0;
}

缺点:yield()方法能够让出CPU资源,但是只有两个线程竞争时,yield是有效的。只是当前线程让出CPU,我们不知道下次CPU空闲时调度了哪个线程。如果有10000个线程,会出问题。

sleep() + 自旋

1
2
3
4
5
6
7
8
9
10
11
12
volatile int status = 0;

void lock() {
while (!compareAndSet(0, 1)) {
Thread.sleep(10 * 1000);
}
//lock
}

void unlock() {
status = 0;
}

缺点:sleep()的缺点就更明显了,你咋知道要睡10秒?

park() + 自旋

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 标识是否有线程加锁
volatile int status = 0;
// 等待队列、链表
Queue parkQueue;

void lock() {
while (!compareAndSet(0, 1)) {
// 拿锁失败,park
// 可以用LockSupport.park(),是native方法
park();
}
}

void unlock() {
lockNotify();
}

void park() {
// 将当前线程加入等待队列
parkQueue.add(currentThread);
// 释放CPU,阻塞
releaseCPU();
}

void lockNotify() {
// 拿到队列头部的线程唤醒
Thread t = parkQueue.header();
// 唤醒t
unpark(t);
}

unpark(Thread t)可以指定唤醒哪个线程。这样就比较完美了,这是伪代码,其实ReentrantLock用的就是类似这样的机制。

ReentrantLock源码分析

ReentrantLock基于AbstractQueuedSynchronizer这个抽象类来实现可重入的锁,ReentrantLock在构造方法时可以传入fair参数来决定是否是公平的锁,并且内部定义了2个数据结构(FairSync和NonfairSync)分别来实现公平非公平锁。

AQS的核心是一个state状态(0表示自由状态,>1表示加锁成功)和一个FIFO的队列,或者说自己维护的双向链表。

ReentrantLock是jdk 1.5出来的,据传是Doug Lea大神不满意当时synchronized关键字的效率(1.6后synchronized有了锁优化),自行实现了一个高效的锁AQS。后来sun公司直接将其收进了juc包中,并且在以后的版本对synchronized进行了优化。原来我们总喜欢比较两个方式性能的优劣,但实际在今天,已没有多大的区别了。

至于面试常问的synchronized和锁的区别,我的理解主要目的还是考察是否对它们使用过、实现原理以及存在层面上的比较。现在你单纯问宝马好还是奔驰好,无从谈起。

ReentrantLock使用方法也很简单,先创建锁,在业务逻辑前调用lock()实例方法上锁,在finally块中解锁即可。

AQS中的Node内部类

这个Node使我们理解AQS以及ReentrantLock的基础数据结构,它是在AQS中定义的,主要作为排队线程中的基本单位(节点),它本质上就是封装了Thread,并且提供了向前向后的指针来维护排队队列,你把它理解成双端链表也可以。

1
2
3
4
5
public class Node{
volatile Node prev;
volatile Node next;
volatile Thread thread;
}

主要的属性就这仨。

lock()上锁过程

lock()上锁过程对于公平锁和非公平锁只有一点差别,我们先贴一下源码。

1
2
3
4
5
// FairSync
final void lock() {
// 直接去acquire一个锁,对应底层就是state+1
acquire(1);
}
1
2
3
4
5
6
7
8
9
10
// NonfairSync
final void lock() {
// 非公平锁上来先cas一下能不能拿到锁
if (compareAndSetState(0, 1))
// 如果拿到了,设置当前线程为owner
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果没拿到,再去acquire
acquire(1);
}

为啥说非公平?因为它上来就想插队。

acquire()

acquire()方法对于是否公平是一样的,是父类AQS中的默认方法:

1
2
3
4
5
6
7
8
public final void acquire(int arg) {
// 尝试拿锁,后面详细分析这个tryAcquire
if (!tryAcquire(arg) &&
// 加入到AQS队列后立马park()
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// interrupt当前线程
selfInterrupt();
}

tryAcquire()

这里贴的是FairSync的tryAcquire(),NonfairSync的实现只是少了!hasQueuedPredecessors()需要排队的判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取上锁状态,自由状态为0,被上锁为1,大于1表示重入
int c = getState();
if (c == 0) { // 自由状态,当前线程去上锁
// hasQueuedPredecessors判断是否需要排队,比较复杂,后面介绍
// 如果不需要排队就cas加锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 加锁成功,把当前线程设为owner,返回true
setExclusiveOwnerThread(current);
return true;
}
}
// 如果state不为0,判断是不是重入,也就是判断是不是owner自己要上锁
else if (current == getExclusiveOwnerThread()) {
// 如果是重入,状态+1表示重入次数
int nextc = c + acquires;
if (nextc < 0)
// state溢出,重入最多是Integer.MAX_VALUE
throw new Error("Maximum lock count exceeded");
// 设置新的state并上锁成功
setState(nextc);
return true;
}
// 如果state不为0,还不是owner要上锁,说明有人在拿着锁,上锁失败
return false;
}

hasQueuedPredecessors()

1
2
3
4
5
6
7
8
9
10
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

AQS队列在初始化时会虚拟一个thread为null的Node,因为队列当中的head永远是持有锁的那个node。这个thread为null的虚拟节点提供了一个在AQS队列中占位的作用,持有锁的那个线程永远不在队列中,这个理念在AQS中我们一定要理解。

我觉得这个方法是整个AQS中最难的部分没有之一,所以这里不用注释的形式了,在下面对于各种情况详细分析一下:

  • 队列中没有Node,那么head和tail都为null,这个时候队列没有被初始化,那么不需要排队,直接去上锁;但也有特殊情况,如果2个线程同时来lock(),看到队列没有初始化,就都用CAS去修改计数器,那么必然有一个会失败,那么失败的这个线程就去初始化队列,并且乖乖排队;
  • (难点)队列中只有1个Node,也就是head == tail,但是我们上面说了,初始化完成后,head是thread为null的虚拟节点,如果只有一个数据,那队列长度应该是2啊?其实这里是一个中间状态,即持有锁的线程刚好释放锁,而自己又是队列中唯一一个节点,这时候不去等待unpark()通知,主动去尝试CAS拿锁,此时h == t,要看下一个判断:h.next != null(因为是当前要加锁的线程),并且后面s.thread != Thread.currentThread()也成立,整个return为false,就表示不用排队,直接去拿锁(日了狗了这一行代码);
  • 队列中有若干Node(大于1个),那么h != t肯定成立,需要看后面那段表达式,大于1个的时候h.next肯定不为空,那么就看拿锁线程是不是排队的第一个线程:
    • 如果当前线程不是第一个排队线程,后半段为true前半段也是true,整个函数返回true,那没啥废话,乖乖去排队;
    • 如果当前线程是第一个排队线程(当做现在第一个排队的人是你媳妇),那么这里整个函数返回false,不需要排队,直接外层去CAS拿锁,这里又分2种情况:
      • 加锁成功:持有锁的线程刚好执行完了,释放掉锁,那么当然就是当前线程拿到锁;
      • 加锁失败:持有锁的线程还没执行完,也不会进tryAcquire(1)的else分支,那么本次拿锁失败,在acquire(1)中会addWaiter()正常排队。

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

上面说如果加锁失败,就把它加到AQS队列中,我再贴一下代码:

1
2
3
4
5
6
7
8
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// tryAcquire取反,加锁失败
// 加入到AQS队列后立马park()
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// interrupt当前线程
selfInterrupt();
}

如果能跑到acquireQueued(addWaiter(Node.EXCLUSIVE), arg)这里说明当前线程要排队了。这里分为2种情况:

  1. 其他线程持有锁,那么当前线程需要排队,但此时队列还没有被初始化;
  2. 其他线程持有锁,队列已经被初始化,所以当前线程需要去排队。

addWaiter(Node.EXCLUSIVE)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
private Node addWaiter(Node mode) {
// 把当前线程封装为Node对象,mode为排它(读写锁中的读锁才是共享mode)
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 判断队尾是否为null,其实只要队列被初始化了,队尾一定不为null
// 换而言之这里判断的就是队列有没有被初始化,也就是上面我们说的那2种情况
if (pred != null) {
// 队尾不为空,队列已经初始化了
// 这种情况比较简单,把当前Node设置为队尾,维护链表关系即可
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 队列没有被初始化,下面来分析这个enq
enq(node);
// 返回当前node
return node;
}

// enq每次执行的情况都不同,这里分次来分析
private Node enq(final Node node) {
// 死循环
for (;;) {
// 第一次进入,t为null,队列没有被初始化
Node t = tail;
if (t == null) { // Must initialize
// 调用无参的Node构造方法,也就是加了一个thread为null的虚拟Node
// 并把这个虚拟Node设为头部
if (compareAndSetHead(new Node()))
// 此时AQS中只有一个元素,就是这个虚拟Node
// 然后将尾部指向它,第一次循环结束
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

// 下面我们来看第二次循环
// 这个代码写的比较高效,处理了不同的情况
private Node enq(final Node node) {
// 死循环
for (;;) {
// 这个时候t指向的是那个虚拟Node了,不为null
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 当前Node入队,前驱为虚拟Node
node.prev = t;
// cas设置尾部
if (compareAndSetTail(t, node)) {
// 维护链表关系
t.next = node;
// 返回虚拟节点?这个返回其实就是终止死循环
// 返回出去的t没啥意义,外面的addWaiter没有接收enq的返回
return t;
}
}
}
}

所以我们看下来,这个addWaiter()方法就是拿锁失败时,让当前线程入队,并且维护队列的链表关系。

接下来看acquireQueued(addWaiter(Node.EXCLUSIVE), arg),经过上面的一系列操作后,变成了acquireQueued(node, arg)。

acquireQueued(node, arg)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 死循环
for (;;) {
// 拿到当前Node的前驱p
// 这里分为2种情况:1. 上一个节点是head;2. 上一个节点不是head
final Node p = node.predecessor();
// 1. 如果上一个节点是head,则表示当前节点为队列中的第二个元素,“排队”的第一个元素
// 第一个排队的先去尝试拿锁,这里为啥要尝试拿锁呢?
// 这相当于你买票,你前面那人正在窗口,你是排队的第一个
// 那此时你肯定不死心,要问问你前面那人买完了没?
// 1.1 买完了的话,去旁边收拾你的身份证,我可以开始买了(可以拿锁了)
// 1.2 前面那人要是还没处理完(还持有锁),那我再睡一会儿(park)
// 所以这里拿锁就是看一下是不是之前的线程刚好释放锁,主动碰碰运气
// 这也是提高效率,如果运气好,就不需要调一次unpark了,直接拿锁
if (p == head && tryAcquire(arg)) {
// 如果运气好cas到了锁,那么前面的Node出队列,当前Node设为队首
// 这步操作是去除前面Node的占位
setHead(node);
// 前面Node出队,断链
p.next = null; // help GC
failed = false;
// 返回false,不需要interrupt,当前线程拿锁执行
return interrupted;
}
// 2. 上一个节点不是head + 第一个排队的拿锁失败(其他线程没释放锁)
// 这种情况下要park了
// shouldParkAfterFailedAcquire先判断要不要park
// 这行代码很有说道,下面分析
if (shouldParkAfterFailedAcquire(p, node) &&
// 调用LockSupport.park(this);
parkAndCheckInterrupt())
// 进入下次循环,再尝试cas拿锁,还拿不到安心排队
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire(p, node)

这里我们先说一下ws是啥,它是Node节点中定义的属性waitStatus:

1
2
3
4
5
6
7
8
9
10
11
12
13
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

若等于0,则表示非上面的任何状态

所以当一个Node刚创建的时候,这个ws一定是为0的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

这个方法传入2个参数,前驱节点pred和当前节点node,我们有一个前提是进入到这个方法是线程竞争状态下,也就是锁还被别人持有的情况下才会进入的,那么就分为2种情况:

  • pred为虚拟node:此时拿到的虚拟node的ws为0,那么会进入最后一个else,也就是将pred的ws改为-1(-1表示这个线程在park状态),注意当前node应该是队列中的第二个node,排队的第一个node,这里为啥要先改状态呢?有2个原因:
    • 一是这个Node.SIGNAL的状态在其他情况其他方法也有可能用到,我们不能把0作为可以park的状态;
    • 二是多一次自旋,这个shouldParkAfterFailedAcquire()方法外面已经有一次自旋了,还记得外面是一个死循环吗?在这里改完ws之后外面会再来一次循环,再自旋一次(为啥自旋2次?100次不行吗?肯定不行啊,浪费CPU,2次是三思之后决定的),ReentrantLock的自旋加锁就体现在这里。ws改为-1后返回false,外面进入下一次循环。
      • 如果第二次自旋还拿不到锁,进入到这个方法后,pred的ws为-1,那么直接return true,接下来就需要park了;
      • 那如果第二次自旋拿到了锁,那么维护链表关系,acquireQueued()返回false,在再外面的acquire()中不进行selfInterrupt()操作,顺利拿到锁。
  • pred为普通排队node:那这里当前线程就是需要park的,因为你前面的线程还在老老实实排队啊。那么修改前一个Node的ws状态,然后第二次进入shouldParkAfterFailedAcquire()是返回true,进行下一波操作park自己。

为啥这里ws要改前一个排队的线程?有2个原因:

  • 自己不能改自己的ws,万一你改了ws为-1,然后没有park的话不是骗人吗;
  • 一旦park了之后,就没有CPU了,不能执行代码了,所以自己也没办法改。

自己不知道自己睡着了嘛。

另外这个ws在解锁过程中也非常重要,后面会分析。

parkAndCheckInterrupt()

1
2
3
4
5
6
7
private final boolean parkAndCheckInterrupt() {
// park自己
LockSupport.park(this);
// 到此为止,没拿到锁的线程就被park住了等待唤醒
// 后面的代码在解锁过程中分析
return Thread.interrupted();
}

unlock()解锁过程

1
2
3
 public void unlock() {
sync.release(1);
}

sync.release(1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 public final boolean release(int arg) {
// 尝试解锁,下面会分析
if (tryRelease(arg)) {
Node h = head;
// 当前lock被解锁,那么看AQS中的head,即下一个排队的Thread
// 当h不是无锁状态下,进行unpark()操作
// 前面这个不为null判断是避免NPE,别想多
// h的ws肯定不为0,应该是-1,因为队列中后面的node改了前面node的ws
if (h != null && h.waitStatus != 0)
// 这里面调LockSupport.unpark(h.next.thread)
// 对下一个Node unpark
// 为啥不是unpark自己?这里有2点原因:
// 1. 自己在执行啊,调用unlock的时候已经获得锁在执行了
// 2. 下一个Node对应的线程已经在park了,没有cpu资源
// 所以没办法执行任何代码了,只能别人来改
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease(arg)

当前线程释放锁,比较简单。这时候还没有unpark其他线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected final boolean tryRelease(int releases) {
// 减少state
int c = getState() - releases;
// 如果当前要解锁的线程不是lock的owner,会异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 标记锁状态
boolean free = false;
if (c == 0) {
// 如果没有重入了,state对应的值为0,这个lock就完全free了
// 其他线程可以过来拿锁了
free = true;
// 把owner设为null,无主状态
setExclusiveOwnerThread(null);
}
setState(c);
// 返回free
return free;
}

unparkSuccessor(h)

由释放锁的线程叫醒队列中的第一个线程,这里传的h为head,也就是thread为null的那个node。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// ws为-1,先把它的ws改为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 取到“排队”的第一个线程为s
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// ws大于0这种情况我们先不讨论
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果有排队线程,unpark唤醒它
if (s != null)
LockSupport.unpark(s.thread);
}

还记得我们在加锁时park线程的那行代码吗parkAndCheckInterrupt(),这里我再贴一下吧(并发的源码真的是难啊):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private final boolean parkAndCheckInterrupt() {
// 之前阻塞在这里的
LockSupport.park(this);
// unpark之后,这里返回true
return Thread.interrupted();
}

// 所以下面******的那行代码将interrupted置为true,死循环开始下一轮循环

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// unpark后第二次循环
final Node p = node.predecessor();
// p为前面那个thread为null的node,是head
if (p == head && tryAcquire(arg)) {
// 尝试自旋并且拿到锁之后
// 这里排队的第一个已经拿到锁了
// setHead在下面分析了
setHead(node);
// 维护链表关系
p.next = null; // help GC
failed = false;
// 返回true,中断阻塞,执行被唤醒的线程接下来的逻辑
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
// ******
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private void setHead(Node node) {
// 将head置为当前node
head = node;
// 将head对应的线程置为null
// 因为最开始我们说了,持有锁的那个线程永远不在队列中
// 只是提供了占位作用
node.thread = null;
// 维护链表关系,方法外层也维护了
node.prev = null;
}

interrupt之后,lock()方法正常返回,我们一定要记住:ReentrantLock的加锁本质上讲就是lock()方法的正常返回

总结

至此,我们可以说Java现代化的同步中的synchronized关键字是基于对象头进而基于管程monitor的,Lock是基于AQS的。

AQS还有其他的应用,比如CountDownLatch、Semaphore,以及线程池中创建新线程封装的Worker也是AQS的子类(addWorker()方法还记得吗?)。