JUC-19-AQS

JUC-19-AQS

1. AQS结构

  • AQS(AbstractQueuedSynchronizer)是Java众多锁以及并发工具的基础类,底层采用乐观锁,大量采用CAS操作保证其原子性,并且在并发冲突时,采用自旋方法重试。实现了轻量高效的获取锁。

1.1 信号量

  • 在AQS中,状态是由volatile state来表示。
1
private volatile int state;
  • 该属性值表示锁的状态。
    • state为0表示锁未被占用,
    • state为1表示锁被线程持有,
    • 而state大于1表示锁被重入。

而本文分析的是独占锁,那么同一时刻,锁只能被一个线程持有。

不仅需要记录锁的状态,还需要记录当前获取锁的线程,实现重入。可以通过来记录。

1
private transient Thread exclusiveOwnerThread;

1.2 等待队列

  • 等待队列采用悲观锁的思想,表示当前所等待的资源,状态或条件短时间内可能无法满足,而调用park方法(借助操作系统)来完成线程的阻塞。
  • 在AQS中,队列时一个双端链表,将当前线程包装成某种类型的数据结构扔到等待队列中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static final class Node {  
// 节点所代表的线程
volatile Thread thread;
// 双向链表,每个节点需要保存自己的前驱节点和后继节点的引用
volatile Node prev;
volatile Node next;
// 线程所处的等待锁的状态,初始化时,该值为0。
volatile int waitStatus;
//队列中节点线程被取消
static final int CANCELLED = 1;
//节点将其前驱节点设置为-1,当前驱节点释放锁后,会自动唤醒该节点。
static final int SIGNAL = -1;
//线程被重新包装为Node节点,并存入Condition队列中。
static final int CONDITION = -2;
//共享锁唤醒风暴时,将0->PROPAGATE,表示被传播唤醒
static final int PROPAGATE = -3;
// 该属性用于条件队列或者共享锁 。在Condition队列中,使用其作为指针。
Node nextWaiter;
}

一般在独占锁下,我们需要关注的就是下面几个参数:

  • thread:当前Node所代表的线程;
  • waitStatus:表示节点所处的等待状态;
  • prev next节点的前驱和后继

1.3 CAS操作

  • CAS采用乐观锁机制,保证操作的原子性。一般是改变状态或改变指针(引用)指向。

mark

1.4 总结

在AQS源码中:

  1. 锁属性
1
2
3
4
 //锁的状态
private volatile int state;
//当前持有锁的线程
private transient Thread exclusiveOwnerThread;
  1. sync queue相关的属性
1
2
3
//thread属性为null
private transient volatile Node head;
private transient volatile Node tail; // 队尾,新入队的节点
  1. Node相关属性
1
2
3
4
5
6
7
8
9
10
11
// 节点所代表的线程
volatile Thread thread;

// 双向链表,每个节点需要保存自己的前驱节点和后继节点的引用
volatile Node prev;
volatile Node next;

// 线程所处的等待锁的状态,初始化时,该值为0
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;

2. lock 接口

Lock是一个接口,方法定义如下

1
2
3
4
5
void lock() // 如果锁可用就获得锁,如果锁不可用就阻塞直到锁释放
void lockInterruptibly() // 和 lock()方法相似, 但阻塞的线程可中断,抛出 java.lang.InterruptedException异常
boolean tryLock() // 非阻塞获取锁;尝试获取锁,如果成功返回true
boolean tryLock(long timeout, TimeUnit timeUnit) //带有超时时间的获取锁方法
void unlock() // 释放锁

实现Lock接口的类有很多,以下为几个常见的锁实现

  • ReentrantLock:表示重入锁,它是唯一一个实现了Lock接口的类。重入锁指的是线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次数
  • ReentrantReadWriteLock:重入读写锁,它实现了ReadWriteLock接口,在这个类中维护了两个锁,一个是ReadLock,一个是WriteLock,他们都分别实现了Lock接口。读写锁是一种适合读多写少的场景下解决线程安全问题的工具,基本原则是:读和读不互斥、读和写互斥、写和写互斥。也就是说涉及到影响数据变化的操作都会存在互斥。
  • StampedLock: stampedLock是JDK8引入的新的锁机制,可以简单认为是读写锁的一个改进版本,读写锁虽然通过分离读和写的功能使得读和读之间可以完全并发,但是读和写是有冲突的,如果大量的读线程存在,可能会引起写线程的饥饿。stampedLock是一种乐观的读策略,使得乐观锁完全不会阻塞写线程

3. ReentrantLock实现

3.1 加锁逻辑

ReentrantLock有公平锁和非公平锁两种实现,默认实现非公平锁。但是可配置为公平锁

1
ReentrantLock lock=new ReentrantLock(true);

调用公平锁加锁逻辑:

1
2
3
4
final void lock() {  
//开始加锁,将state修改为1
acquire(1);
}

真正的加锁方法:

1
2
3
4
5
public final void acquire(int arg) {  
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

1. tryAcquire(arg)
该方法由继承AQS的子类实现,为获取锁的具体逻辑;

2. addWaiter(Node.EXCLUSIVE)
该方法由AQS实现,负责在获取锁失败后调用,将当前请求锁的线程包装成Node并且放到等待队列中,并返回该Node。

3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
该方法由AQS实现。针对上面加入到队列的Node不断尝试两种操作之一:

  • 若前驱节点是head节点的时候,尝试获取锁;
  • 调用park将当前线程挂起,线程阻塞。

4. selfInterrupt
该方法由AQS实现。恢复用户行为。

  • 用户在外界调用t1.interrupt()进行中断。

  • 线程在parkAndCheckInterrupt方法被唤醒之后。会调用Thread.interrupted();判断线程的中断标识,而该方法调用完毕会清除中断标识位。

  • 而AQS为了不改变用户标识。再次调用selfInterrupt恢复用户行为。

3.2 构建等待队列 addWaiter

  • 我们使用ReentrantLock独占锁时,等待队列是延迟加载的。
  • 也就是说若是线程交替执行,那么借助信号量(状态)来保证。
  • 若是线程并发执行,就需要将阻塞线程放入到队列中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//注意这个方法可能存在并发问题。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//队列已经存在
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//队列不存在
enq(node);
return node;
}
  1. 队列不存在的情况(初始化队列 enq(Node))

mark

注意,该方法处理CAS操作是原子性的,其他操作都存在并发冲突问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node enq(final Node node) {  
for (;;) {
Node t = tail;
// 初始化队列 (一个Thread 为null的空node)
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// (维护队列 CAS改变队尾)
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

该方法采用自旋+CAS。CAS是保证同一时刻只有一个线程能成功改变引用的指向。

mark

  • 根据上面的流程图,sync queue的创建过程。head节点是new Node()产生的,即其中的属性为默认值。也就是thread属性为null。也就是说正在执行的线程也会在sync queue中占据头节点,但是节点中不会保存线程信息。

    mark

尾分叉问题:

上面已经说了,该方法是线程不安全的。

1
2
3
4
5
6
7
8
//步骤1:可能多个节点的prev指针都指向尾结点,导致尾分叉
node.prev = t;
//步骤2:但同一时刻,tail引用只会执行一个node。
if (compareAndSetTail(t, node)) {
//步骤3:现在环境是线程安全,旧尾结点的后继指针指向新尾结点。
t.next = node;
return t;
}

mark

执行完步骤2,但步骤3还未执行时,恰好有线程从头节点开始往后遍历。此时(旧)尾结点中的next域还为null。它是遍历不到新加进来的尾结点的。这显然是不合理的。

但此时步骤1是执行成功的,所以若是tail节点往前遍历,实际上是可以遍历到所有节点的,这也是为什么在AQS源码中,有时候常常会出现从尾结点开始逆向遍历链表的情况

那些“分叉”的节点,肯定会入队失败。那么继续自旋,等待所有的线程节点全部入队成功。

3.3 尝试获取锁 tryAcquire

  • 根据标志位state,来判断锁是否被占用。此时可能锁未被占用,由于是公平锁,于是会去判断sync queue中是否有人在排队。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected final boolean tryAcquire(int acquires) {  
//获取当前线程
final Thread current = Thread.currentThread();
//获取Lock对象的上锁情况,0-表示无线程持有;1-表示被线程持有;大于1-表示锁被重入
int c = getState();
//若此刻无人占有锁
if (c == 0) {
if (!hasQueuedPredecessors() && //判断队列中是否有前辈。若返回false代表没有,开始尝试加锁
compareAndSetState(0, acquires)) { //此刻队列中没有存在前辈,尝试加锁
setExclusiveOwnerThread(current); //将当前线程修改为持有锁的线程(后续判断可重入)
return true;
}
}
//若是当前线程是持有锁的线程
else if (current == getExclusiveOwnerThread()) {
//当前状态+1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//否则,代表加锁失败
return false;
}
1
2
3
4
5
6
7
8
9
//该方法不具有原子性,可能多个线程都觉得自己不需要排队,最终还是依靠外面
//条件上的CAS来保持其原子性。
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

上述方法是判断队列中是否存在元素。可能存在以下几种情况:

  • 此时未维护队列h!=t返回false,即无人排队;
  • 此时队列只有头节点(哑结点),h!=t返回false,即无人排队;
  • 此时队列中存在2个的节点。若线程是头结点的后继节点线程(即处理正在办理业务的线程,进来的线程是第一个排队的线程)。那么s.thread != Thread.currentThread()返回false,即可是尝试加锁。
  • 队列存在2个以上节点,且进来的线程不是第一个排队的线程,那么该线程需要乖乖的排队

当然该方法不是并发安全的方法,即可能存在多个线程觉得自己无需排队,最终还是依靠CAS来争夺锁。

1
2
3
4
5
if (!hasQueuedPredecessors() &&  compareAndSetState(0, acquires)) {
//线程安全
setExclusiveOwnerThread(current);
return true;
}

同一时刻,只有一个线程可以成功改变state的状态。记录该线程为独占锁线程,一般后续可以重入。

没成功获取锁那么会调用2.2 中的方法,将该线程加入到阻塞队列中

3.4 阻塞线程 acquireQueued

  • 若执行到该方法,说明addWaiter方法已经成功将该线程包装为Node节点放到了队尾。
  • 在该方法中依旧尝试获取锁;
  • 再次获取锁失败后,会将其阻塞;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final boolean acquireQueued(final Node node, int arg) {  
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取node的前驱节点
final Node p = node.predecessor();
//若前驱节点在办理业务,那么它将再次获取一次锁。
if (p == head && tryAcquire(arg)) {
//获取锁成功,此处便是线程安全。
//将自己设置为头节点,并将自己设置为哑节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//获取锁失败,将自己挂起。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

上述方法时自旋方法,而出口就是获取到锁。若线程获取不到锁,便会将自己阻塞。

1
2
3
4
5
6
//该方法时node线程获取锁成功后执行的,故是线程安全的。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

mark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  
//上一个节点的waitStatus
int ws = pred.waitStatus;
// Node.SIGNAL==-1
if (ws == Node.SIGNAL)
return true;
//ws大于0,则说明该节点已经被取消了。
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//CAS变更ws的状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

上述方法是加锁失败开始执行的。也就是一个线程决定挂起之前需要执行的操作。这里就用到了节点中的信号量waitStatus

  1. 判断前驱节点waitStatus的值,会做出如下操作:
    1.1 前驱节点waitStatus若是-1,直接返回true。
    1.2 前驱节点waitStatus若大于0,证明前驱节点已被取消,那么在链表中删除前驱节点,直到node的前驱节点的waitStatus不大于0为止。然后返回false
    1.3. 若前驱节点waitStatus等于0,使用CAS尝试改变前驱节点waitStatus状态,由0到-1,然后返回false。
  1. 若是返回true,那么去阻塞该节点,若是返回false,那么继续自旋,继续上述过程,直至该方法返回true为止,方法返回true,便会执行下列方法,阻塞线程。
1
2
3
4
5
6
private final boolean parkAndCheckInterrupt() {  
//将线程挂起
LockSupport.park(this);
//线程被唤起时,查看线程的中断标识(注意,查看完毕后,中断标识归位)
return Thread.interrupted();
}

需要注意的是:当前节点在阻塞之前,会将前驱节点的waitStatus设置为-1,就可保证前驱节点在适当的时机唤醒自己。

4. 附录

4.1 CAS对象

开始我认为对象的CAS算法,实际上会是B对象去覆盖堆内存上的A对象,其实不然。比较交换的是引用。

1
2
3
4
5
6
7
//该方法是获取引用。而非堆上的内存。
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicReference.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

5. 加锁总结

  1. 因为AQS的等待队列是延迟加载,只有多个线程并发访问时,才会开始维护队列。
  2. 因为head节点中不包含thread属性的值,又被称为哑节点
  3. head是正在办理业务的节点,而他的后继节点是第一个排队节点。
  1. 尝试加锁过程
  1. 根据status判断当前锁是否被持有,若被持有,直接维护队列
  2. 若未被持有,判断当前队列是否有节点在排队,若有节点排队,直接维护队列
  3. 若无节点排队,则通过CAS修改锁状态标识,修改成功代表线程持有该锁;
  4. 使用exclusiveOwnerThread来保存持有锁的线程(解决线程重入);
  1. 维护队列过程

最终线程的head节点为哑节点。后续线程被组装成node节点,维护在链表中。

  1. 线程阻塞过程
  1. 判断node节点是否为head节点的后续节点(第一个排队节点),若是的话,尝试获取锁。若获取到,将其设置为head节点,并将其设置为哑节点;
  2. 在阻塞前,会将自己的前驱节点的waitStatus设置为SIGNAL。以便可以唤醒自己。

6. 锁的释放

ReentrantLock.unlock

加锁的过程分析完以后,再来分析一下释放锁的过程,调用release方法,这个方法里面做两件事,

1,释放锁 ;

2,唤醒park的线程

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease

  • 这个动作可以认为就是一个设置锁状态的操作,而且是将状态减掉传入的参数值(参数是1),如果结果状态为0,就将排它锁的Owner设置为null,以使得其它的线程有机会进行执行。
  • 在排它锁中,加锁的时候状态会增加1(当然可以自己修改这个值),在解锁的时候减掉1,同一个锁,在可以重入后,可能会被叠加为2、3、4这些值,只有unlock()的次数与lock()的次数对应才会将Owner线程设置为空,而且也只有这种情况下才会返回true。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 这里是将锁的数量减1
if (Thread.currentThread() != getExclusiveOwnerThread())// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 由于重入的关系,不是每次释放锁c都等于0,
// 直到最后一次释放锁时,才会把当前线程释放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

unparkSuccessor

  • 在方法unparkSuccessor(Node)中,就意味着真正要释放锁了,它传入的是head节点(head节点是占用锁的节点),当前线程被释放之后,需要唤醒下一个节点的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {//判断后继节点是否为空或者是否是取消状态,
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) //然后从队列尾部向前遍历找到最前面的一个waitStatus小于0的节点, 至于为什么从尾部开始向前遍历,因为在doAcquireInterruptibly.cancelAcquire方法的处理过程中只设置了next的变化,没有设置prev的变化,在最后有这样一行代码:node.next = node,如果这时执行了unparkSuccessor方法,并且向后遍历的话,就成了死循环了,所以这时只有prev是稳定的
s = t;
}
//内部首先会发生的动作是获取head节点的next节点,如果获取到的节点不为空,则直接通过:“LockSupport.unpark()”方法来释放对应的被挂起的线程,这样一来将会有一个节点唤醒后继续进入循环进一步尝试tryAcquire()方法来获取锁
if (s != null)
LockSupport.unpark(s.thread); //释放许可
}

移除锁的节点图文解析

  • head节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点,节点的变化过程如下

mark

这个过程也是涉及到两个变化

  • 修改head节点指向下一个获得锁的节点
  • 新的获得锁的节点,将prev的指针指向null

注意:

  • 这里有一个小的变化,就是设置head节点不需要用CAS,原因是设置head节点是由获得锁的线程来完成的,而同步锁只能由一个线程获得,所以不需要CAS保证
  • 只需要把head节点设置为原首节点的后继节点,并且断开原head节点的next引用即可

参考博客

https://www.jianshu.com/p/dcc608274b88

https://segmentfault.com/a/1190000017372067

https://www.cnblogs.com/fsmly/p/11274572.html

打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  • © 2019-2022 Zhuuu
  • PV: UV:

请我喝杯咖啡吧~

支付宝
微信