一、获取共享锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (; ; ) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); } else { pred.next = node; compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
private Node enq(final Node node) { for (; ; ) { Node t = tail; if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else {
node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
|
二、释放共享锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
|
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
private void doReleaseShared() {
for (; ; ) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
|
三、aqs共享锁问题
这里Node.PROPAGATE的作用:
(1)网友解说
为了避免线程无法会唤醒的窘境。,因为共享锁会有很多线程获取到锁或者释放锁,所以有些方法是并发执行的,就会产生很多中间状态,而PROPAGATE就是为了让这些中间状态不影响程序的正常运行。
(2)个人解说
上面时官方的注释:
1 2 3 4 5 6
| 尝试唤醒下一个排队节点,如果: 传播由调用者指示,也可能是上一个操作。 下一个节点是shared模式 上面的h.waitStatus<0这些条件是避免没有必要的唤醒下一个节点
这两种检查中的保守性可能会导致不必要的唤醒,但只有在有多个线程获得/释放的情况下,所以大多数现在或很快就需要信号。
|
1 2
| if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0)
|
个人理解:
1 2 3 4 5 6
| (1)propagate>0,表示还有资源可以申请,在多线程抢夺资源的情况下,会唤醒下一个共享节点,下一个节点可能会获得执行权 (2)两个head==null,就不明白了,看了整个aqs的源码,除了在第一次添加node的之前会出现head和tail为空,其它时候不会出现为空的情况 (3)这里有两次判断head.waitStatus<0,这里作用是尽量避免没有必要的唤醒下一个共享node执行 (4)为什么不head.waitStatus<=0, 情况一:当执行addWaiter但未进入doAcquireShared里的for循环的逻辑或者还没有执行完,不为0,可以避免不必要的唤醒。(比如信号量,有可以设置资源量,同时多个线程申请,当超过这个量是就会加入阻塞队列,当其它线程执行releaseshared时可能会出现这种情况) 情况二:doReleaseShared()会把Node.SIGNAL变为0,唤醒阻塞队列node获取执行权,如果node未tryAcquireShared到资源,则又会把状态从0转变SIGNAL,这么做可以避免在多线程情况下多次释放相同head的资源
|
下面看看release核心代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; } if (h == head) break; } }
|
代码解析:
1 2
| (1)这里的for循环是避免在多线程情况下,并且多个线程可以同时获取到资源的情况下,head发生改变 (2)如果head是尾节点则没有必要唤醒了,因为next为空,没有必要
|
为什么需要Node.PROPAGATE的图解?
这个问题必须要用多线程的思维去理解,不然很难理解到,如果可申请资源只有1,那么怎么也不会出现这个PROPAGATE这个情况。
从图中可以看出在共享锁的情况下,多线程可能会图中情况,因为在多线程情况下,下面情况是可能出现的,因为虽然waitStatus的修改具备原子性,但是下面一连串的操作不具备。
如果把SIGNAL->0,虽然可以避免没有必要的唤醒,但是可能出现下面图中的情况;并且如果出现申请资源失败的情况,就算后面调用releaseShared也不能释放共享锁了,所以PROPAGATE是必须的,这是一个保障。
从上面图可以看出,如果没有把head的waitStatus状态
这里又有问题了,既然已经是head,为啥会出现tryAcquireShared申请资源失败呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| private void doAcquireShared(int arg) { ... try { ... for (; ; ) { ... if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); ... return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { ... } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
|
看了上面的代码可以发现,虽然setHeadAndPropagate可以在共享锁模式下唤醒传播后续节点的执行,但是并没有释放资源,也就是说当我们锁定一段代码执行的时候,我们可以并行执行,但是执行过程中是占用资源的,releaseShared才会释放资源,才会调用tryReleaseShared方法。
共享锁的理念:
可以理解为只读锁类型的锁,表示可以同一时间多个线程都可以进入。虽然共享锁允许多个线程进入,但是它也占用资源,所以需要我们手动调用代码释放资源。