AQS框架

AQS框架概览

首先,我们通过下面的架构图来整体了解一下AQS框架:

AQS 框架图

上图中有颜色的为Method,无颜色的为Attribution。总的来说,AQS框架共分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。CLHCraigLandin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFOAQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

CLH 变体队列

AQS使用一个volatileint类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。AQS使用一个FIFO的队列表示排队等待锁的线程,队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus

AQS 队列

ReentrantLock的基本实现可以概括为:先通过CAS尝试获取锁。如果此时已经有线程占据了锁,那就加入AQS队列并且被挂起。当锁被释放之后,排在CLH队列队首的线程会被唤醒,然后CAS再次尝试获取锁。在这个时候,如果:

  • 非公平锁:如果同时还有另一个线程进来尝试获取,那么有可能会让这个线程抢先获取;
  • 公平锁:如果同时还有另一个线程进来尝试获取,当它发现自己不是在队首的话,就会排到队尾,由队首的线程获取到锁。

AQS数据结构

先来看下AQS中最基本的数据结构——Node,Node即为上面CLH变体队列中的节点。

AQS Node

解释一下几个方法和属性值的含义:

方法和属性值 含义
waitStatus 当前节点在队列中的状态
thread 表示处于该节点的线程
prev 前驱指针
predecessor 返回前驱节点,没有的话抛出npe
nextWaiter 指向下一个处于CONDITION状态的节点(由于本篇文章不讲述Condition Queue队列,这个指针不多介绍)
next 后继指针

线程两种锁的模式:

模式 含义
SHARED 表示线程以共享的模式等待锁
EXCLUSIVE 表示线程正在以独占的方式等待锁

waitStatus有下面几个枚举值:

枚举 含义
0 当一个Node被初始化的时候的默认值
CANCELLED 1,表示线程获取锁的请求已经取消了
CONDITION -2,表示节点在等待队列中,节点线程等待唤醒
PROPAGATE -3,当前线程处在SHARED情况下,该字段才会使用
SIGNAL -1,表示线程已经准备好了,就等资源释放了

同步状态State

在了解数据结构后,接下来了解一下AQS的同步状态:State。AQS中维护了一个名为state的字段,意为同步状态,是由Volatile修饰的,用于展示当前临界资源的获锁情况。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private volatile int state;

下面提供了几个访问这个字段的方法:

方法名 描述
protected final int getState() 获取State的值
protected final void setState(int newState) 设置State的值
protected final boolean compareAndSetState(int expect, int update) 使用CAS方式更新State

这几个方法都是Final修饰的,说明子类中无法重写它们。我们可以通过修改State字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程

独占模式与共享模式

对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是AQS架构图中的第一层:API层。

同步器接口

从架构图中可以得知,AQS提供了大量用于自定义同步器实现的Protected方法。自定义同步器实现的相关方法也只是为了通过修改State字段来实现多线程的独占模式或者共享模式。自定义同步器需要实现以下方法:

  • protected boolean isHeldExclusively()该线程是否正在独占资源。只有用到Condition才需要去实现它。
  • protected boolean tryAcquire(int arg)独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False
  • protected boolean tryRelease(int arg)独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False
  • protected int tryAcquireShared(int arg)共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • protected boolean tryReleaseShared(int arg)共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False

一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLockReentrantLock是独占锁,所以实现了tryAcquire-tryRelease

非公平锁加锁与解锁

加锁:

  • 通过ReentrantLock的加锁方法Lock进行加锁操作。
  • 会调用到内部类SyncLock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQSAcquire方法。
  • AQSAcquire方法会执行tryAcquire方法,但是由于tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire
  • tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。

解锁:

  • 通过ReentrantLock的解锁方法Unlock进行解锁。
  • Unlock会调用内部类SyncRelease方法,该方法继承于AQS
  • Release中会调用tryRelease方法,tryRelease需要自定义同步器实现,tryRelease只在ReentrantLock中的Sync实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
  • 释放成功后,所有处理由AQS框架完成,与自定义同步器无关。

通过上面的描述,大概可以总结出ReentrantLock加锁解锁时API层核心方法的映射关系。

API 映射

ReentrantLock中公平锁和非公平锁在底层是相同的,这里以非公平锁为例进行分析。在非公平锁中,有一段这样的代码:

// java.util.concurrent.locks.ReentrantLock

static final class NonfairSync extends Sync {
	...
	final void lock() {
		if (compareAndSetState(0, 1))
			setExclusiveOwnerThread(Thread.currentThread());
		else
			acquire(1);
	}
  ...
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
	if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer
protected boolean tryAcquire(int arg) {
	throw new UnsupportedOperationException();
}

具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的(以ReentrantLock为例。如果该方法返回了True,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。下面会详细解释线程是何时以及怎样被加入进等待队列中的。

AQS应用

ReentrantLock的可重入应用

ReentrantLock的可重入性是AQS很好的应用之一,在了解完上述知识点以后,我们很容易得知ReentrantLock实现可重入的方法。在ReentrantLock里面,不管是公平锁还是非公平锁,都有一段逻辑。

// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire

if (c == 0) {
	if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
		setExclusiveOwnerThread(current);
		return true;
	}
}
else if (current == getExclusiveOwnerThread()) {
	int nextc = c + acquires;
	if (nextc < 0)
		throw new Error("Maximum lock count exceeded");
	setState(nextc);
	return true;
}

// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire

if (c == 0) {
	if (compareAndSetState(0, acquires)){
		setExclusiveOwnerThread(current);
		return true;
	}
}
else if (current == getExclusiveOwnerThread()) {
	int nextc = c + acquires;
	if (nextc < 0) // overflow
		throw new Error("Maximum lock count exceeded");
	setState(nextc);
	return true;
}

从上面这两段都可以看到,有一个同步状态State来控制整体可重入的情况。StateVolatile修饰的,用于保证一定的可见性和有序性。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private volatile int state;
  • State初始化的时候为0,表示没有任何线程持有锁。
  • 当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁是,就会多次+1,这里就是可重入的概念。
  • 解锁也是对这个字段-1,一直到0,此线程对锁释放。

J.U.C中的应用场景

除了上边ReentrantLock的可重入性的应用,AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了JUC中的几种同步工具,大体介绍一下AQS的应用场景:

同步工具 同步工具与AQS的关联
ReentrantLock 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。
Semaphore 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数。
CountDownLatch 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatchawait方法)才可以通过。
ReentrantReadWriteLock 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数。
ThreadPoolExecutor Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquiretryRelease

自定义同步工具

public class LeeLock  {

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire (int arg) {
            return compareAndSetState(0, 1);
        }

        @Override
        protected boolean tryRelease (int arg) {
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively () {
            return getState() == 1;
        }
    }

    private Sync sync = new Sync();

    public void lock () {
        sync.acquire(1);
    }

    public void unlock () {
        sync.release(1);
    }
}

通过我们自己定义的Lock完成一定的同步功能。

public class LeeMain {

    static int count = 0;
    static LeeLock leeLock = new LeeLock();

    public static void main (String[] args) throws InterruptedException {

        Runnable runnable = new Runnable() {
            @Override
            public void run () {
                try {
                    leeLock.lock();
                    for (int i = 0; i < 10000; i++) {
                        count++;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    leeLock.unlock();
                }

            }
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(count);
    }
}

Links

下一页