0%

AQS的分析

AQS

​ AQS,全程AbstractQueuedSynchronizer,翻译过来就是抽象队列同步器,这个类在JUC包下面。在多线程中是十分重要的,保证有效工作线程对锁的抢占,已经线程的排队工作。这些都由AQS来制订规范的,使用者只需要继承该类并对方法进行重写就可以实现共享资源的获取以及释放。

​ AQS原理: 有一个state变量来标记当前资源是否空闲,当state=0表明当前空闲,即可抢占成功,线程抢占成功后,会把state的状态置为1,其他线程在进行锁定时,会尝试抢占,抢占失败之后,就会进入一个队列。这个队列叫做CLH队列,是一个虚拟的双向队列。

​ 做了个原理图,有点拉。其实分析源码,在双向队列中有一个哨兵节点的。

AQS

可以看看AQS抽象类的结构是怎样的,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public abstract class AbstractQueuedSynchronizer 
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

//静态内部类,会把线程封装成一个个node节点放入队列
static final class Node {...}

//双向队列的头节点
private transient volatile Node head;

//双向队列的位节点
private transient volatile Node tail;

//标志当前资源的状态
private volatile int state;

..... //还有很多方法
}

Node类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node(); //共享
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null; //独占

//withStatus的4中状态
static final int CANCELLED = 1; //表明节点(线程) 被取消或者中断
static final int SIGNAL = -1; //表明节点被阻塞,需要unpark,即解锁
static final int CONDITION = -2;
static final int PROPAGATE = -3;

volatile int waitStatus;

volatile Node prev;

volatile Node next;

volatile Thread thread;

Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

源码分析

​ 以ReenTrantLock为例,来分析。在底层是如何进行资源的抢占和释放的。

lock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class AQSTest {
public static void main(String[] args) {
ReenTrantLock lock = new ReenTrantLock();
//创建两个线程
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " === 进来了");
TimeUnit.MINUTES.sleep(21);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}

}, "A").start();

new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "进来了");
} finally {
lock.unlock();
}
}, "B").start();
}
}

​ 当启动程序,线程A率先上锁,并执行自己的业务逻辑,我们可以看看lock.lock() 底层的逻辑。点进源码,发现执行的是ReenTrantLock类中的Sync的方法

1
2
3
public void lock() {
sync.lock();
}

​ 接着点进去,发现是一个抽象方法,查看子类实现,有公平锁和非公平锁两种,以非公平锁为例,点进去。此时的资源是没有人抢占的,处于空闲状态,即state=0,因此,当线程A要上锁时,会进行一次CAS操作,如果期望值是0,就会把state改为1,很明显,此时state=0,所以为true。那么就会把当前占用线程设置为线程A。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//底层调用unsafe.compareAndSwapInt(this, stateOffset, expect, update)
if (compareAndSetState(0, 1))
//将占有线程设置成当前线程。
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

​ 此时,A就会执行自己的逻辑,即办理业务。。。可能会很长时间,此时线程B尝试上锁。和上述一样,B也进行一次CAS操作,结果发现期望值和当前值不一样,expected state = 0, 但是由于A占用,所以此时的state=1,得到false。 所以B会执行acquire(1),进去该方法,发现该方法是AQS类中的方法。

1
2
3
4
5
public final void acquire(int arg) { //arg = 1
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

​ B线程还是会尝试抢占锁,可以看到tryAcquire(1),进入该方法,AQS使用了模板设计模式,子类必须重写方法,我们还是非公平锁为例,B不会死心,即使CAS操作没有抢占到,还是会进行尝试,判断state的状态,比较占有线程和当前线程是不是同一对象。如果都不满足,则返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final boolean nonfairTryAcquire(int acquires) { //acquires = 1
final Thread current = Thread.currentThread(); //获取当前线程
int c = getState(); //获取当前状态,很明显此时的state = 1,
if (c == 0) { //false
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//current = B && ExclusiveOwnerThread = A
else if (current == getExclusiveOwnerThread()) { //false
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//所以会直接返回false
return false;
}

​ 再看第二个方法,acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 先看里面的方法,进入方法。如果此时双向队列不为空,则会把当前线程节点放在队列的尾部,而此时队列为空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node addWaiter(Node mode) {
//通过当前线程(B)创建一个节点,并且表明为独占。
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; //扫描指针指向尾节点,
if (pred != null) { //此时队列是空的,因此pred == null
node.prev = pred; //让新节点的前驱指针指向尾结点,
if (compareAndSetTail(pred, node)) { //如果pred是尾结点则把尾结点设置成Node
pred.next = node; //让pred的后继指向node节点
return node;
}
}
enq(node);
return node;
}

​ 因此,会进入enq(node),进入方法,在空队列时,会先创建一个哨兵节点。再次循环,才会创建当前线程节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node enq(final Node node) {
for (;;) {
Node t = tail; //此时tail指向null,
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) //所以会创建一个新节点,不存放数据,设置为头节点,并且让尾指针也指向这个头结点,而这个节点即为前面图中所说的哨兵节点
tail = head;
} else {
//第二次循环,此时的t指向的是哨兵节点,将node节点挂到哨兵节点后面。
node.prev = t;
if (compareAndSetTail(t, node)) { //此时t代表哨兵节点,把尾结点设置成node
t.next = node;
return t;
}
}
}
}

​ 进入该方法 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) ,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//官方翻译的意思差不多是,会以无限循环来独占的方式来获取队列中的线程   
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //得到当前线程的前驱节点
// p执行哨兵节点,head指向哨兵节点,所以p == head 为 true
// 但是,arg = 1,并且工作线程仍然是A 此时尝试抢占肯定是失败的,所以不会进入if语句
if (p == head && tryAcquire(arg)) { // false
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //得到哨兵节点的等待状态, 初始化都为0
if (ws == Node.SIGNAL) //SIGNAL=-1 表示线程处于等待状态或者中断。false
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//将线程的状态改为阻塞。。。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

​ 由此可见,当线程被封装成节点后, 会再一次进行抢占,如果此时抢占失败,则会修改哨兵节点的等待状态为-1 ,即线程被阻塞或中断,之后就会进入park状态,

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

当某一线程拿到资源后,会修改资源状态为1,而其他线程会尝试进行抢占,如果抢占失败,会被封装成一个node节点,并进入双向队列的尾部,此时线程会再一次进行尝试抢占,如果此时还没有抢占到资源,则会修改哨兵节点的等待状态为-1,即被阻塞。 随后线程就会被锁定,LockSupport.park()。


unlock()

假设A执行完毕了,执行unlock() 方法,会执行sync.release(1) 方法,进入该方法

1
2
3
public void unlock() {
sync.release(1);
}

代码如下

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) { //arg = 1
if (tryRelease(arg)) { //尝试释放资源
Node h = head; //指向哨兵节点
if (h != null && h.waitStatus != 0) //判断哨兵节点不为空并且当前等待状态为不是0
unparkSuccessor(h); //进行解锁
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

protected final boolean tryRelease(int releases) { //releases = 1
int c = getState() - releases; c = 1- 1 = 0
//判断是否占有线程是当前线程 如果不是则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //true
free = true;
setExclusiveOwnerThread(null); //将占有线程置空
}
setState(c); //将资源状态改为0
return free;
}

​ 可以得到,当A解锁时,会尝试释放资源,判断当前线程是否是占有线程,并且如果当前状态为1,则将占有线程置空,然后资源状态设置为0。并且判断哨兵节点的状态是否不为0,然后进入unparkSuccessor(h) 方法,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; //得到哨兵节点的状态 即为-1
if (ws < 0) //true
compareAndSetWaitStatus(node, ws, 0); //则设置成0

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//修改之后,获取到第一个有效节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // 如果不为空,则将第一个有效节点进行解锁
LockSupport.unpark(s.thread);
}

​ 前面线程B被阻塞着,代码如下,B被唤醒,此时B尝试抢占资源,结果发现state为0,于是B抢占成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//官方翻译的意思差不多是,会以无限循环来独占的方式来获取队列中的线程   
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //得到当前线程的前驱节点
// p执行哨兵节点,head指向哨兵节点,所以p == head 为 true
// 此时state = 0, A已经释放资源了,于是B抢占资源成功
if (p == head && tryAcquire(arg)) { // true
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

​ 当B抢占到资源后,会把node节点中的线程清空,而头指针会指向node节点,而原先的哨兵节点会被gc,此时node节点就成了哨兵节点。

​ 后面的线程一样的。不过多赘述。


整个线程对资源的占用和释放过程就大致如上。。。。over

-------------本文结束感谢您的阅读-------------