ReentrantLock及AQS源码分析
自己实现一个自旋锁
我们来写一个普通的自旋锁,然后再慢慢去改进它。
1 | // 标识是否有线程加锁 |
缺点:这样的自旋浪费CPU资源,没有拿到锁的线程会一直自旋。如果拿到锁的线程执行10s,那么其他线程白白浪费了10s的CPU资源。
解决思路:让得不到锁的线程让出CPU。
yield() + 自旋
1 | volatile int status = 0; |
缺点:yield()方法能够让出CPU资源,但是只有两个线程竞争时,yield是有效的。只是当前线程让出CPU,我们不知道下次CPU空闲时调度了哪个线程。如果有10000个线程,会出问题。
sleep() + 自旋
1 | volatile int status = 0; |
缺点:sleep()的缺点就更明显了,你咋知道要睡10秒?
park() + 自旋
1 | // 标识是否有线程加锁 |
unpark(Thread t)可以指定唤醒哪个线程。这样就比较完美了,这是伪代码,其实ReentrantLock用的就是类似这样的机制。
ReentrantLock源码分析
ReentrantLock基于AbstractQueuedSynchronizer这个抽象类来实现可重入的锁,ReentrantLock在构造方法时可以传入fair参数来决定是否是公平的锁,并且内部定义了2个数据结构(FairSync和NonfairSync)分别来实现公平非公平锁。
AQS的核心是一个state状态(0表示自由状态,>1表示加锁成功)和一个FIFO的队列,或者说自己维护的双向链表。
ReentrantLock是jdk 1.5出来的,据传是Doug Lea大神不满意当时synchronized关键字的效率(1.6后synchronized有了锁优化),自行实现了一个高效的锁AQS。后来sun公司直接将其收进了juc包中,并且在以后的版本对synchronized进行了优化。原来我们总喜欢比较两个方式性能的优劣,但实际在今天,已没有多大的区别了。
至于面试常问的synchronized和锁的区别,我的理解主要目的还是考察是否对它们使用过、实现原理以及存在层面上的比较。现在你单纯问宝马好还是奔驰好,无从谈起。
ReentrantLock使用方法也很简单,先创建锁,在业务逻辑前调用lock()实例方法上锁,在finally块中解锁即可。
AQS中的Node内部类
这个Node使我们理解AQS以及ReentrantLock的基础数据结构,它是在AQS中定义的,主要作为排队线程中的基本单位(节点),它本质上就是封装了Thread,并且提供了向前向后的指针来维护排队队列,你把它理解成双端链表也可以。
1 | public class Node{ |
主要的属性就这仨。
lock()上锁过程
lock()上锁过程对于公平锁和非公平锁只有一点差别,我们先贴一下源码。
1 | // FairSync |
1 | // NonfairSync |
为啥说非公平?因为它上来就想插队。
acquire()
acquire()方法对于是否公平是一样的,是父类AQS中的默认方法:
1 | public final void acquire(int arg) { |
tryAcquire()
这里贴的是FairSync的tryAcquire(),NonfairSync的实现只是少了!hasQueuedPredecessors()需要排队的判断。
1 | protected final boolean tryAcquire(int acquires) { |
hasQueuedPredecessors()
1 | public final boolean hasQueuedPredecessors() { |
AQS队列在初始化时会虚拟一个thread为null的Node,因为队列当中的head永远是持有锁的那个node。这个thread为null的虚拟节点提供了一个在AQS队列中占位的作用,持有锁的那个线程永远不在队列中,这个理念在AQS中我们一定要理解。
我觉得这个方法是整个AQS中最难的部分没有之一,所以这里不用注释的形式了,在下面对于各种情况详细分析一下:
- 队列中没有Node,那么head和tail都为null,这个时候队列没有被初始化,那么不需要排队,直接去上锁;但也有特殊情况,如果2个线程同时来lock(),看到队列没有初始化,就都用CAS去修改计数器,那么必然有一个会失败,那么失败的这个线程就去初始化队列,并且乖乖排队;
- (难点)队列中只有1个Node,也就是head == tail,但是我们上面说了,初始化完成后,head是thread为null的虚拟节点,如果只有一个数据,那队列长度应该是2啊?其实这里是一个中间状态,即持有锁的线程刚好释放锁,而自己又是队列中唯一一个节点,这时候不去等待unpark()通知,主动去尝试CAS拿锁,此时h == t,要看下一个判断:h.next != null(因为是当前要加锁的线程),并且后面s.thread != Thread.currentThread()也成立,整个return为false,就表示不用排队,直接去拿锁(日了狗了这一行代码);
- 队列中有若干Node(大于1个),那么h != t肯定成立,需要看后面那段表达式,大于1个的时候h.next肯定不为空,那么就看拿锁线程是不是排队的第一个线程:
- 如果当前线程不是第一个排队线程,后半段为true前半段也是true,整个函数返回true,那没啥废话,乖乖去排队;
- 如果当前线程是第一个排队线程(当做现在第一个排队的人是你媳妇),那么这里整个函数返回false,不需要排队,直接外层去CAS拿锁,这里又分2种情况:
- 加锁成功:持有锁的线程刚好执行完了,释放掉锁,那么当然就是当前线程拿到锁;
- 加锁失败:持有锁的线程还没执行完,也不会进tryAcquire(1)的else分支,那么本次拿锁失败,在acquire(1)中会addWaiter()正常排队。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
上面说如果加锁失败,就把它加到AQS队列中,我再贴一下代码:
1 | public final void acquire(int arg) { |
如果能跑到acquireQueued(addWaiter(Node.EXCLUSIVE), arg)这里说明当前线程要排队了。这里分为2种情况:
- 其他线程持有锁,那么当前线程需要排队,但此时队列还没有被初始化;
- 其他线程持有锁,队列已经被初始化,所以当前线程需要去排队。
addWaiter(Node.EXCLUSIVE)
1 | private Node addWaiter(Node mode) { |
所以我们看下来,这个addWaiter()方法就是拿锁失败时,让当前线程入队,并且维护队列的链表关系。
接下来看acquireQueued(addWaiter(Node.EXCLUSIVE), arg),经过上面的一系列操作后,变成了acquireQueued(node, arg)。
acquireQueued(node, arg)
1 | final boolean acquireQueued(final Node node, int arg) { |
shouldParkAfterFailedAcquire(p, node)
这里我们先说一下ws是啥,它是Node节点中定义的属性waitStatus:
1 | /** waitStatus value to indicate thread has cancelled */ |
所以当一个Node刚创建的时候,这个ws一定是为0的。
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
这个方法传入2个参数,前驱节点pred和当前节点node,我们有一个前提是进入到这个方法是线程竞争状态下,也就是锁还被别人持有的情况下才会进入的,那么就分为2种情况:
- pred为虚拟node:此时拿到的虚拟node的ws为0,那么会进入最后一个else,也就是将pred的ws改为-1(-1表示这个线程在park状态),注意当前node应该是队列中的第二个node,排队的第一个node,这里为啥要先改状态呢?有2个原因:
- 一是这个Node.SIGNAL的状态在其他情况其他方法也有可能用到,我们不能把0作为可以park的状态;
- 二是多一次自旋,这个shouldParkAfterFailedAcquire()方法外面已经有一次自旋了,还记得外面是一个死循环吗?在这里改完ws之后外面会再来一次循环,再自旋一次(为啥自旋2次?100次不行吗?肯定不行啊,浪费CPU,2次是三思之后决定的),ReentrantLock的自旋加锁就体现在这里。ws改为-1后返回false,外面进入下一次循环。
- 如果第二次自旋还拿不到锁,进入到这个方法后,pred的ws为-1,那么直接return true,接下来就需要park了;
- 那如果第二次自旋拿到了锁,那么维护链表关系,acquireQueued()返回false,在再外面的acquire()中不进行selfInterrupt()操作,顺利拿到锁。
- pred为普通排队node:那这里当前线程就是需要park的,因为你前面的线程还在老老实实排队啊。那么修改前一个Node的ws状态,然后第二次进入shouldParkAfterFailedAcquire()是返回true,进行下一波操作park自己。
为啥这里ws要改前一个排队的线程?有2个原因:
- 自己不能改自己的ws,万一你改了ws为-1,然后没有park的话不是骗人吗;
- 一旦park了之后,就没有CPU了,不能执行代码了,所以自己也没办法改。
自己不知道自己睡着了嘛。
另外这个ws在解锁过程中也非常重要,后面会分析。
parkAndCheckInterrupt()
1 | private final boolean parkAndCheckInterrupt() { |
unlock()解锁过程
1 | public void unlock() { |
sync.release(1)
1 | public final boolean release(int arg) { |
tryRelease(arg)
当前线程释放锁,比较简单。这时候还没有unpark其他线程。
1 | protected final boolean tryRelease(int releases) { |
unparkSuccessor(h)
由释放锁的线程叫醒队列中的第一个线程,这里传的h为head,也就是thread为null的那个node。
1 | private void unparkSuccessor(Node node) { |
还记得我们在加锁时park线程的那行代码吗parkAndCheckInterrupt()
,这里我再贴一下吧(并发的源码真的是难啊):
1 | private final boolean parkAndCheckInterrupt() { |
interrupt之后,lock()方法正常返回,我们一定要记住:ReentrantLock的加锁本质上讲就是lock()方法的正常返回。
总结
至此,我们可以说Java现代化的同步中的synchronized关键字是基于对象头进而基于管程monitor的,Lock是基于AQS的。
AQS还有其他的应用,比如CountDownLatch、Semaphore,以及线程池中创建新线程封装的Worker也是AQS的子类(addWorker()方法还记得吗?)。