一、介绍 提供实现依赖先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量、事件等)的框架。AbstractQueuedSynchronizer被设计为大多数类型的同步器的使用基础。aqs是实现 ReentrantLock、CountDownLatch、Semaphore、FutureTask 等类的基础。对于互联网运用高并发的童鞋来说,redisson分布式锁应该都用过,这里面也用aqs来做分布式的一个线程同步处理,只是这里的通知已经不是通过LockSupport.unpack(),而是采用的redis的订阅/通知来做的。
aqs的源码虽然多,但是综合下来主要分为独占锁、共享锁、以及通知等待,后面我将一一做源码分析。
aqs主要内容:
二、源码分析 aqs节点结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 static final class Node { /** * 等待状态 * SIGNAL : {@link Node #SIGNAL } * CANCELLED: {@link Node #CANCELLED } * CONDITION: {@link Node #CONDITION } * PROPAGATE: {@link Node #PROPAGATE } * 0 : 初始状态,在队列中等待 */ volatile int waitStatus; /** * 指向前一个node */ volatile Node prev ; /** * 指向后一个node */ volatile Node next ; /** * 当前node 所处的线程 */ volatile Thread thread; /** * 连接到下一个节点等待条件 */ Node nextWaiter ; }
组赛节点不包括head节点,因为head节点是当前正在执行的节点或者空姐点。
申请独占锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 public final void acquire(int arg) { if (!try Acquire(arg ) && acquireQueued(addWaiter (Node.EXCLUSIVE) , arg)) selfInterrupt() ; } static void selfInterrupt() { Thread . currentThread() .interrupt() ; } private Node addWaiter(Node mode ) { Node node = new Node(Thread.currentThread () , mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred , node ) ) { pred.next = node; return node; } } enq(node); return node; }private final boolean compareAndSetTail(Node expect , Node update ) { return unsafe.compareAndSwapObject(this , tailOffset , expect , update ) ; }private Node enq(final Node node) { for (; ; ) { Node t = tail; if (t == null) { if (compareAndSetHead(new Node() )) tail = head; } else { node.prev = t; if (compareAndSetTail(t , node ) ) { t.next = node; return t; } } } } final boolean acquireQueued(final Node node , int arg ) { boolean failed = true ; try { boolean interrupted = false ; for (; ; ) { final Node p = node.predecessor() ; if (p == head && try Acquire(arg ) ) { setHead(node ) ; p.next = null; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p , node ) && parkAndCheckInterrupt() ) interrupted = true ; } } finally { if (failed) cancelAcquire(node ) ; } } private final boolean parkAndCheckInterrupt() { LockSupport . park(this); return Thread . interrupted() ; }private void setHead(Node node ) { head = node; node.thread = null; node.prev = null; }
shouldParkAfterFailedAcquire代码中的一段:当前node的前置节点pred的状态为取消时,向前找到未取消的节点为前置节点
作用:
1、精简节点队列,设置node.prev=SIGNAL节点
设置node的状态为SIGNAL的前置节点,可以减少唤醒node线程需要查找的过程节点,因为unparkSuccessor唤醒后继线程是从tail开始查找的。
2、唤醒后继线程时,更新节点的前置节点为head.(特殊情况)
当执行队列节点状态出现问题时,节点状态是取消状态,cancelAcquire会执行一次unparkSuccessor(node),而后执行shouldParkAfterFailedAcquire,通过node.prev = pred = pred.prev;可以清除之前的节点,如果head和node之间的都是取消状态节点,则node.prev=head
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); } else { pred.next = node; compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
这样做的目的:取消的节点不会在获取锁得到线程执行权,也就不会再通知后置节点。
例如:当node3的前置节点pred(node2)状态为cancel时,向前查找未取消的节点,这里将会找到node1
1 2 3 do { node.prev = pred = pred .prev; } while (pred .waitStatus > 0 );
问题1:这里有个问题,如果当前节点向前查找未取消的节点为prev前置节点时,万一node1找到的那个prev节点已经执行过了怎么办?
这个下面解锁源码分析会提到,这里先简单说一下,如果node1的LockSupport.unpack先于node3.prev=node1,这里node3还是会执行,因为当node1.next的状态为取消时,会从tail向前查找,不管node3.prev是node1还是node2,最终都会找到node1,都会得出一个结论,node3是离node1最近的一个非取消节点
问题2:pred会为空吗?
不会,因为当第一次加入节点队列时,会设置一个new Node()节点为head,此时状态为0;而后面head的节点也不会为取消,因为head时当前正在执行的节点或者最后一个执行的节点,当然这里说的是未申请到资源加入队列的,aqs独占锁并不是所有的执行都会创建Node加入队列中等待执行,只要申请资源成功也可以获取线程资源。这里可以看看enqnode和setheader的逻辑
1 2 3 4 5 6 pred.next = node;// 等待状态为0 或者PROPAGATE(-3 ),设置前驱的等待状态为SIGNAL, 并且之后会回到循环再次重试获取锁。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 为什么pred.next = node;不放在if 里面? 如果在do -while 中则会出现node3-node1之间的next 全部指向node3,如果在do -while 后执行,暂时还没有发现什么问题,next 的用途很少
分析:当最终找到pred节点waitStatus状态不为SIGNAL时,再次进入shouldParkAfterFailedAcquire方法执行else
取消获取锁
在独占锁中,cancelAcquire到底会不会执行?
1 2 3 4 5 6 通过分析acquireQueued方法的代码可以得出唯一可能造成异常,导致failed=true 的时候,只有当:node.predecessor() 或者try Acquire(arg ) 报错 而node.predecessor() 是不可能报错的,当队列中有执行节点时队列中至少两个节点:head节点和当前节点 而try Acquire(arg ) 到底会不会报错: 网友观点 if (!try Acquire(arg ) &&acquireQueued(addWaiter (Node.EXCLUSIVE) , arg)) 这里如果报错就不会添加node 而我看了aqs的源码设计时通过tryAcquire的来实现各种不同的并发控制类,tryAcquire是否报错已经不是AbstractQueuedSynchronizer这个类能控制的了,而是扩展类逻辑处理,可能出现当第一次执行tryAcquire不报错,而其它线程申请了资源,当前线程再次申请资源报错了呢? 可以看出aqs作者还是做了aqs同步器的一个执行保障
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 /** * 该方法实现某个node 取消获取锁。 */ private void cancelAcquire(Node node ) { // Ignore if node doesn 't exist if (node == null ) return; node.thread = null; // 遍历并更新节点前驱,把node 的prev 指向前部第一个非取消节点。 Node pred = node.prev; while (pred.waitStatus > 0 ) node.prev = pred = pred.prev; // 记录pred节点的后继为predNext,后续CAS会用到。 Node predNext = pred.next; // 直接把当前节点的等待状态置为取消,后继节点即便也在cancel可以跨越node 节点。 node .waitStatus = Node.CANCELLED; /* * 如果CAS将tail从node 置为pred 节点了 * 则剩下要做的事情就是尝试用CAS将pred节点的next更新为null以彻底切断pred和node 的联系。 * 这样一来就断开了pred 与pred的所有后继节点,这些节点由于变得不可达,最终会被回收掉。 * 由于node 没有后继节点,所以这种情况到这里整个cancel 就算是处理完毕了。 * * 这里的CAS更新pred的next即使失败了也没关系,说明有其它新入队线程或者其它取消线程更新掉了。 */ if (node == tail && compareAndSetTail(node , pred )) { compareAndSetNext(pred, predNext, null); } else { // 如果node 还有后继节点,这种情况要做的事情是把pred 和后继非取消节点拼起来。 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; /* * 如果node的后继节点next非取消状态的话,则用CAS尝试把pred的后继置为node的后继节点 * 这里if条件为false或者CAS失败都没关系,这说明可能有多个线程在取消,总归会有一个能成功的。 */ if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { /* * 这时说明pred == head或者pred状态取消或者pred.thread == null * 在这些情况下为了保证队列的活跃性,需要去唤醒一次后继线程。 * 举例来说pred == head完全有可能实际上目前已经没有线程持有锁了, * 自然就不会有释放锁唤醒后继的动作。如果不唤醒后继,队列就挂掉了。 * * 这种情况下看似由于没有更新pred的next的操作,队列中可能会留有一大把的取消节点。 * 实际上不要紧,因为后继线程唤醒之后会走一次试获取锁的过程, * 失败的话会走到shouldParkAfterFailedAcquire的逻辑。 * 那里面的if中有处理前驱节点如果为取消则维护pred/next,踢掉这些取消节点的逻辑。 */ unparkSuccessor(node); } /* * 取消节点的next之所以设置为自己本身而不是null, * 是为了方便AQS中Condition部分的isOnSyncQueue方法, * 判断一个原先属于条件队列的节点是否转移到了同步队列。 * * 因为同步队列中会用到节点的next域,取消节点的next也有值的话, * 可以断言next域有值的节点一定在同步队列上。 * * 在GC层面,和设置为null具有相同的效果。 */ node.next = node; // help GC } }
node节点向前查找waitStatus不是cancel的节点并且设置prev,这里的node就是node3
1 2 while (pred .waitStatus > 0 ) node.prev = pred = pred .prev;
如果node是tail节点,设置pred节点为tail节点,并且设置pred的后置节点为null。
1 2 3 4 5 6 7 if (node == tail && compareAndSetTail(node , pred )) { compareAndSetNext(pred, predNext, null); } 过程分析: (1 )当node 时tail 时,cas比较node 和tail ,替换成pred节点,这里可能tail节点发生变化,变成node 的next 节点 (2 )如果(1 )成功,那么pred成为新的tail节点,当其它线程申请资源加入队列,则tail也会发生变化,所以pred.next被其它线程替换,则设置null也会失败
例如:node3为node
如果node不是tail节点,并且pred不是head,pred节点没有cancel(状态<=0,置为SIGNAL,thread!=null)
1 2 3 4 5 6 7 8 9 10 11 12 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node .next ; /* * 如果node 的后继节点next 非取消状态的话,则用CAS尝试把pred的后继置为node 的后继节点 * 这里if 条件为false 或者CAS失败都没关系,这说明可能有多个线程在取消,总归会有一个能成功的。 */ if (next != null && next .waitStatus <= 0 ) compareAndSetNext(pred, predNext, next ); }
例如:node1为pred
如果pred(node1)是head节点或者被另一个线程突然取消了,那么为了保证队列的活跃性,唤醒后继节点
例子:
当node1、node2、node3都是取消节点,则最终的图可能会出现这样子
cancelAcquire为什么要这么设计?
1 2 3 4 5 首先要明确一点的是,这个方法除了做队列如果出现异常唤醒队列外,还有一个非常中的是做取消节点的内存处理即GC (1 )设置所有取消节点的prev 指向前面最近的一个SIGNAL正常节点,prev 可能是head(如果是head则需要唤醒队列),如果是pred也是CANCELLED,则需要重新唤醒一次队列(这种可能出现的情况是,当线程1 执行cancelAcquire中途,pred突然执行因为异常执行cancelAcquire,那么选出来的pred就不正常,情况不明之下,唤醒队列很重要,这里是一个保障) (2 )非常重要的一点时prev 设置成功,GC回收则成功了一半 所有的cancel节点prev 指向SIGNAL,并且cancel节点的next =自己,这里这么设计主要考虑了Condition部分的isOnSyncQueue方法。 GC回收的另一半则是通过通过shouldParkAfterFailedAcquire执行找到一个非cancel的pred节点,当设置为head节点时,head.prev .next =null ,head.prev =null
伪代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 void acquire(int arg){//申请执行 if(boolean tryAcquire(int arg)){//尝试申请资源 return true ;//申请成功,可以直接执行 }else{ Node node =addWaiter(Node.EXCLUSIVE){//添加独占锁节点 Node node =new Node Node pred = tail; if(pred!=null){//第一次向队列中添加node 将当前node 加入队列尾部tail }else{ enq(node ){ if (tail= =null){ 创建head节点,设置tail= head,进入循环将当前node 加入队列尾部tail }else{ 将当前node 加入队列尾部tail } } } } boolean result= acquireQueued(node , int arg){ Node p = node.predecessor();//p为node 前置节点 if (p==head){//如果node 的前置节点为head if(tryAcquire(arg)){//如果申请资源成功,则相当于lock获取到锁 setHead(node );//设置node 为head节点 return ; } } if (shouldParkAfterFailedAcquire(pred, node ){ if (pred.waitStatus == Node.SIGNAL){//前置节点是通知状态,表示正常,等pred执行完逻辑释放锁时,就会唤醒node 线程 return ; } if(pred.waitStatus>0 ){ 将node 节点的prev 设置为未取消的节点 }else{ 设置前驱的等待状态为SIGNAL, 并且之后会回到循环再次重试获取锁。 } return }){ if(parkAndCheckInterrupt(){ 阻塞当前线程,并且返回当前线程恢复是否是通过interrupt }){ } } } if(result){ 获取资源成功 } } }
释放独占锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 /** * 释放独占锁 */ public final boolean release(int arg) { if (tryRelease(arg)) { /* * 此时的head 节点可能有3 种情况: * 1 . null (AQS的head 延迟初始化+无竞争的情况) * 2 . 当前线程在获取锁时new出来的节点通过setHead设置的 * 3 . 由于通过tryRelease已经完全释放掉了独占锁,有新的节点在acquireQueued中获取到了独占锁,并设置了head * 第三种情况可以再分为两种情况: * (一)时刻1 :线程A通过acquireQueued,持锁成功,set了head * 时刻2 :线程B通过tryAcquire试图获取独占锁失败失败,进入acquiredQueued * 时刻3 :线程A通过tryRelease释放了独占锁 * 时刻4 :线程B通过acquireQueued中的tryAcquire获取到了独占锁并调用setHead * 时刻5 :线程A读到了此时的head 实际上是线程B对应的node * (二)时刻1 :线程A通过tryAcquire直接持锁成功,head 为null * 时刻2 :线程B通过tryAcquire试图获取独占锁失败失败,入队过程中初始化了head ,进入acquiredQueued * 时刻3 :线程A通过tryRelease释放了独占锁,此时线程B还未开始tryAcquire * 时刻4 :线程A读到了此时的head 实际上是线程B初始化出来的傀儡head */ Node h = head ; // head 节点状态不会是CANCELLED,所以这里h.waitStatus != 0 相当于h.waitStatus < 0 if (h != null && h.waitStatus != 0 ) // 唤醒后继线程 unparkSuccessor(h); return true ; } return false ; } /** * 唤醒后继线程 */ private void unparkSuccessor(Node node ) { //尝试将node 的等待状态置为0 ,这样的话,后继争用线程可以有机会再尝试获取一次锁。 int ws = node .waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node , ws, 0 ); /* * 这里的逻辑就是如果node .next 存在并且状态不为取消,则直接唤醒s即可 * 否则需要从tail 开始向前找到node 之后最近的非取消节点。 * * 这里为什么要从tail 开始向前查找也是值得琢磨的: * 如果读到s == null,不代表node 就为tail ,参考addWaiter以及enq函数中的我的注释。 * 不妨考虑到如下场景: * 1 . node 某时刻为tail * 2 . 有新线程通过addWaiter中的if 分支或者enq方法添加自己 * 3 . compareAndSetTail成功 * 4 . 此时这里的Node s = node .next 读出来s == null,但事实上node 已经不是tail ,它有后继了! */ Node s = node .next ; if (s == null || s.waitStatus > 0 ) { s = null; for (Node t = tail ; t != null && t != node ; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null) LockSupport.unpark(s.thread); }
三、总结 aqs的逻辑看完了还是很容易理解,aqs设计的内容非常多,需要一个一个知识点解析,这一章节主要分析独占锁,而在我们设计的网站程序中,独占锁时相当重要的,所以这个是掌握aqs的种种之路。
对于aqs节点的gc内存回收,几个地方:
(1)sethead,设置head.prev.next=null,head.prev=null,则这里GC node,这里GC的是正常节点
(2)shouldParkAfterFailedAcquire 主要作用是调整node的prev到非取消节点
(3)cancelAcquire 看上图分析