AQS AQS,全程AbstractQueuedSynchronizer,翻译过来就是抽象队列同步器,这个类在JUC包下面。在多线程中是十分重要的,保证有效工作线程对锁的抢占,已经线程的排队工作。这些都由AQS来制订规范的,使用者只需要继承该类并对方法进行重写就可以实现共享资源的获取以及释放。
AQS原理: 有一个state变量来标记当前资源是否空闲,当state=0表明当前空闲,即可抢占成功,线程抢占成功后,会把state的状态置为1,其他线程在进行锁定时,会尝试抢占,抢占失败之后,就会进入一个队列。这个队列叫做CLH队列,是一个虚拟的双向队列。
做了个原理图,有点拉。其实分析源码,在双向队列中有一个哨兵节点的。
可以看看AQS抽象类的结构是怎样的,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io .Serializable { static final class Node {...} private transient volatile Node head; private transient volatile Node tail; private volatile int state; ..... }
Node类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
源码分析 以ReenTrantLock为例,来分析。在底层是如何进行资源的抢占和释放的。
lock() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class AQSTest { public static void main (String[] args) { ReenTrantLock lock = new ReenTrantLock(); new Thread(() -> { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " === 进来了" ); TimeUnit.MINUTES.sleep(21 ); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }, "A" ).start(); new Thread(() -> { lock.lock(); try { System.out.println(Thread.currentThread().getName() + "进来了" ); } finally { lock.unlock(); } }, "B" ).start(); } }
当启动程序,线程A率先上锁,并执行自己的业务逻辑,我们可以看看lock.lock() 底层的逻辑。点进源码,发现执行的是ReenTrantLock类中的Sync的方法
1 2 3 public void lock () { sync.lock(); }
接着点进去,发现是一个抽象方法,查看子类实现,有公平锁和非公平锁两种,以非公平锁为例,点进去。此时的资源是没有人抢占的,处于空闲状态,即state=0,因此,当线程A要上锁时,会进行一次CAS操作,如果期望值是0,就会把state改为1,很明显,此时state=0,所以为true。那么就会把当前占用线程设置为线程A。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L ; final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); }
此时,A就会执行自己的逻辑,即办理业务。。。可能会很长时间,此时线程B尝试上锁。和上述一样,B也进行一次CAS操作,结果发现期望值和当前值不一样,expected state = 0, 但是由于A占用,所以此时的state=1,得到false。 所以B会执行acquire(1),进去该方法,发现该方法是AQS类中的方法。
1 2 3 4 5 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
B线程还是会尝试抢占锁,可以看到tryAcquire(1),进入该方法,AQS使用了模板设计模式,子类必须重写方法,我们还是非公平锁为例,B不会死心,即使CAS操作没有抢占到,还是会进行尝试,判断state的状态,比较占有线程和当前线程是不是同一对象。如果都不满足,则返回false。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
再看第二个方法,acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 先看里面的方法,进入方法。如果此时双向队列不为空,则会把当前线程节点放在队列的尾部,而此时队列为空。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
因此,会进入enq(node),进入方法,在空队列时,会先创建一个哨兵节点。再次循环,才会创建当前线程节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
进入该方法 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) ,代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
由此可见,当线程被封装成节点后, 会再一次进行抢占,如果此时抢占失败,则会修改哨兵节点的等待状态为-1 ,即线程被阻塞或中断,之后就会进入park状态,
1 2 3 4 private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
当某一线程拿到资源后,会修改资源状态为1,而其他线程会尝试进行抢占,如果抢占失败,会被封装成一个node节点,并进入双向队列的尾部,此时线程会再一次进行尝试抢占,如果此时还没有抢占到资源,则会修改哨兵节点的等待状态为-1,即被阻塞。 随后线程就会被锁定,LockSupport.park()。
unlock() 假设A执行完毕了,执行unlock() 方法,会执行sync.release(1) 方法,进入该方法
1 2 3 public void unlock () { sync.release(1 ); }
代码如下
1 2 3 4 5 6 7 8 9 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 protected final boolean tryRelease (int releases) { int c = getState() - releases; c = 1 - 1 = 0 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
可以得到,当A解锁时,会尝试释放资源,判断当前线程是否是占有线程,并且如果当前状态为1,则将占有线程置空,然后资源状态设置为0。并且判断哨兵节点的状态是否不为0,然后进入unparkSuccessor(h) 方法,代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
前面线程B被阻塞着,代码如下,B被唤醒,此时B尝试抢占资源,结果发现state为0,于是B抢占成功
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
当B抢占到资源后,会把node节点中的线程清空,而头指针会指向node节点,而原先的哨兵节点会被gc,此时node节点就成了哨兵节点。
后面的线程一样的。不过多赘述。
整个线程对资源的占用和释放过程就大致如上。。。。over