程序员scholar 程序员scholar
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • Web 标准

    • HTML
    • CSS
    • JavaScript
  • 前端框架

    • Vue2
    • Vue3
    • Vue3 + TS
    • 微信小程序
    • uni-app
  • 工具与库

    • jQuery
    • Ajax
    • Axios
    • Webpack
    • Vuex
    • WebSocket
    • 第三方登录
  • 后端与语言扩展

    • ES6
    • Typescript
    • node.js
  • Element-UI
  • Apache ECharts
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • Web 标准

    • HTML
    • CSS
    • JavaScript
  • 前端框架

    • Vue2
    • Vue3
    • Vue3 + TS
    • 微信小程序
    • uni-app
  • 工具与库

    • jQuery
    • Ajax
    • Axios
    • Webpack
    • Vuex
    • WebSocket
    • 第三方登录
  • 后端与语言扩展

    • ES6
    • Typescript
    • node.js
  • Element-UI
  • Apache ECharts
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
npm

(进入注册为作者充电)

  • JUC并发编程

    • 第一章 线程的基础概念
    • 第二章 并发编程的三大特性
    • 第三章 synchronized的总结
    • 第四章 深入ReentrantLock
      • 1. ReentrantLock
      • 2. AQS简介
        • 2.1 AQS的int变量
        • 2.2 AQS的CLH队列
        • 2.3 内部类Node
      • 3. AQS前置知识点
        • 3.1 模板方法
        • 3.2 LookSupport(线程阻塞工具类)
        • 3.3 CAS
      • 4. 条件变量
      • 5. 独占模式 源码讲解
        • 5.1 加锁过程
        • 5.1.1 tryAcquire
        • 5.1.2 addWaiter ;抢锁失败,CLH入队
        • 5.1.3 acquireQueued
        • 5.2 释放锁
        • 5.2.1 tryRelease
        • 5.2.2 unparkSuccessor
      • 6. 共享模式 源码讲解
        • 6.1 获取锁
        • 6.1.1 tryAcquireShared
        • 6.1.2 doAcquireShared
        • 6.2 释放锁
        • 6.2.1 doAcquireShared
      • 7. Condition
        • 7.1 await
        • 7.2 signal
        • 7.3 总结
    • 第五章 深入ReentrantReadWriteLock
    • 第六章 阻塞队列
    • 第七章 线程池
    • 第八章 并发集合
    • 第九章 并发工具
    • 第十章 异步编程
  • JavaEE
  • JUC并发编程
scholar
2024-03-14
目录

第四章 深入ReentrantLock

# 1. ReentrantLock

  1. 基本介绍:
    • ReentrantLock是Java标准库中java.util.concurrent.locks包提供的一个可重入互斥锁,实现了Lock接口。与synchronized关键字相比,它提供了更加丰富的锁操作功能。
    • 它是一个显式锁,使用时需要在代码中明确地获取和释放锁。这与synchronized不同,后者是基于代码块或方法的隐式锁。
  2. 可重入性:
    • ReentrantLock是可重入的,这意味着同一个线程可以多次获取同一把锁,而不会导致死锁。每次获取锁时,锁的持有计数会增加,每次释放锁时,计数会减少。只有当计数降至0时,锁才会被真正释放。
  3. 灵活性和功能:
    • 提供了尝试非阻塞获取锁(tryLock())、可中断的锁获取操作(lockInterruptibly())以及带超时的锁获取(tryLock(long timeout, TimeUnit unit)),增加了编程的灵活性。
    • 支持公平锁和非公平锁。公平锁意味着锁的分配将遵循先进先出(FIFO)的原则,而非公平锁可能允许"插队"。
    • 提供了一种机制(Condition对象),允许线程分组等待特定条件的发生,相比于synchronized提供的Object.wait()和notify()方法,这提供了更细粒度的线程同步控制。

使用场景

  • 当需要更高级的锁操作时,例如尝试锁、计时锁等待、等待/通知机制,ReentrantLock是一个比synchronized更好的选择。
  • 在高度竞争的锁环境中,ReentrantLock的性能通常优于synchronized。
  • 当需要选择公平锁时,ReentrantLock提供了一种机制,而synchronized不支持公平性设置。
方法 作用
ReentrantLock() 创建一个ReentrantLock实例,默认使用非公平锁。
ReentrantLock(boolean fair) 创建一个ReentrantLock实例,如果参数fair为true,则使用公平锁;否则使用非公平锁。
lock() 获取锁。如果锁已被其他线程占用,则当前线程会等待直到锁被释放后才能获取到锁。
tryLock() 尝试获取锁,如果锁立即可用且成功获取,则返回true;如果锁被其他线程占用,则返回false。
tryLock(long time, TimeUnit unit) 尝试在给定的最大时间内获取锁,如果锁在给定时间内被获取,则返回true;否则返回false。
unlock() 释放锁。如果当前线程持有该锁,调用此方法会释放锁。如果锁的持有计数降至0,则释放锁。
newCondition() 返回与此Lock实例一起使用的新Condition实例。此方法用于等待/通知机制。
lockInterruptibly() 获取锁,除非当前线程被中断。如果当前线程在入锁时被中断,则抛出InterruptedException。
getHoldCount() 查询当前线程保持此锁的个数,即调用lock方法的次数减去调用unlock方法的次数。
isHeldByCurrentThread() 查询当前线程是否保持此锁。
isLocked() 查询此锁是否由任意线程保持。
isFair() 查询此锁是否为公平锁。

我们的ReentrantLock在使用起来可能有一些需要注意的事项:

  1. lock() 方法要在 try 代码块之前调用。这是因为,如果 lock() 方法在 try 代码块内并且发生异常,那么锁可能无法正确获取,而在 finally 代码块中,unlock() 方法仍会被执行。这可能会尝试释放并未真正被线程持有的锁,从而引发 IllegalMonitorStateException 异常。
  2. unlock() 方法应在 finally 代码块中调用,以确保锁一定会被释放,无论 try 代码块中的代码是否抛出异常。这是因为,如果 try 代码块中的代码抛出异常,并且 unlock() 不在 finally 中被执行,那么可能导致锁没有被正确释放,从而阻止其他线程获取该锁,进一步可能导致死锁现象。

ReentrantLock和synchronized的区别?

1. 基本概念和使用方式的区别:

  • ReentrantLock 是java.util.concurrent.locks包中的一个类。它提供了丰富的锁操作功能,需要通过显式地调用lock()和unlock()方法来管理锁。
  • synchronized 是Java中的一个关键字,用于在方法或代码块上自动管理锁。它在执行完同步代码块后自动释放锁,无需手动操作。

2. 性能和竞争激烈情况的处理:

  • 在锁的竞争激烈的情况下,ReentrantLock可能提供比synchronized更优的性能,因为ReentrantLock提供了无锁升级的机制,可以减少线程阻塞的可能性。
  • synchronized在JVM层面经过了大量优化,包括锁粗化、锁消除、轻量级锁和偏向锁等技术。在竞争不是非常激烈的情况下,这些优化使得synchronized的性能并不逊色。

3. 底层实现原理的区别:

  • ReentrantLock是基于Java的高级同步框架Abstract Queued Synchronizer(AQS)实现的。AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来管理获取同步状态失败的线程。
  • synchronized依赖于JVM内部的监视器锁(monitor)实现,这是一种依赖于底层操作系统互斥量(Mutex)的实现,利用了操作系统的同步机制。

4. 功能性的区别:

  • ReentrantLock提供了一些synchronized所不具备的高级功能:
    • 支持公平锁和非公平锁选择。
    • 提供了可以被中断的锁获取操作。
    • 支持尝试非阻塞地获取锁(tryLock)和带超时的锁获取。
    • 提供了与条件变量(Condition)配合使用,实现等待/通知模式的更加灵活的线程同步机制。

选择哪个:如果你对并发编程特别熟练,推荐使用ReentrantLock,功能更丰富。如果掌握的一般般,使用synchronized会更好

# 2. AQS简介

引入

如果你想深入研究Java并发的话,那么AQS一定是绕不开的一块知识点,Java并发包很多的同步工具类底层都是基于AQS来实现的,比如我们工作中经常用的Lock工具ReentrantLock、栅栏CountDownLatch、信号量Semaphore等,而且关于AQS的知识点也是面试中经常考察的内容,所以,无论是为了更好的使用还是为了应付面试,深入学习AQS都很有必要。

AQS初识

AQS,全名AbstractQueuedSynchronizer,是一个抽象类的队列式同步器,它是实现同步器的基础组件,如常用的ReentrantLock、Semaphore、CountDownLatch等。AQS定义了一套多线程访问共享资源的同步模板,解决了实现同步器时涉及的大量细节问题,能够极大地减少实现工作。

下面是AQS的组成结构。

img

AbstractQueuedSynchronizer(AQS)主要由以下三个核心部分组成:

  1. State(同步状态):
    • AQS 使用一个整型的 state 字段来表示同步状态,这是实现同步控制的基础。对 state 的操作包括设置、查询和使用 CAS (Compare-And-Swap)更新等,这些操作提供了构建锁和其他同步器的基本工具。
  2. Node 组成的等待队列:
    • AQS 内部使用一个由 Node 实例组成的双向链表来维护等待获取同步状态的线程。在 AQS 中,这个等待队列用于管理那些未能成功获取同步状态的线程。每个 Node 包含线程引用及等待状态等信息,并通过前驱和后继指针与其他 Node 连接。
  3. ConditionObject 条件变量:
    • ConditionObject 是 AQS 提供的用于实现条件等待/通知机制的类。每个 ConditionObject 实例内部都维护了一个由 Node 组成的条件队列(这实际上是一个单向队列),用于管理调用了条件等待方法(如 await)的线程。当条件得到满足并调用 signal() 或 signalAll() 方法时,线程会从条件队列移动到同步队列,尝试重新获取同步状态。

AbstractQueuedSynchronizer

先了解下AbstractQueuedSynchronizer提供的核心函数

AQS(AbstractQueuedSynchronizer)提供了一套机制来支持两种资源共享方式:独占(Exclusive)和共享(Shared)。这两种模式适用于不同的并发场景,使得基于AQS实现的同步组件能够灵活应对多样化的需求。

独占模式(Exclusive)和 共享模式(Shared)

  • 独占模式意味着同一时刻只有一个线程能够获取到锁或者说访问同步资源。这是最常见的同步场景,比如ReentrantLock就是基于AQS的独占模式实现的。在独占模式下,当一个线程请求资源并成功获取后,其他任何线程都无法获取到该资源,直到持有资源的线程释放资源。
  • 共享模式允许多个线程同时获取同步资源。这种模式适用于资源有多份副本,或者说访问资源的操作可以同时由多个线程并行执行的场景。例如,Semaphore(信号量)就是允许多个线程同时访问某个资源的典型共享模式同步组件,而CountDownLatch允许多个线程等待直到倒计时结束。

AQS通过内部维护一个volatile int类型的状态变量(state)和一个基于FIFO队列的等待线程队列,来实现同步控制的基础架构。自定义同步器通过继承AQS并实现几个关键方法,就可以实现特定的同步语义。

状态管理

  • getState():返回同步状态
  • setState(int newState):设置同步状态
  • compareAndSetState(int expect, int update):使用CAS设置同步状态
  • isHeldExclusively():当前线程是否持有资源

独占资源(不响应线程中断)

  • tryAcquire(int arg):独占式获取资源,子类实现
  • acquire(int arg):独占式获取资源模板
  • tryRelease(int arg):独占式释放资源,子类实现
  • release(int arg):独占式释放资源模板

共享资源(不响应线程中断)

  • tryAcquireShared(int arg):共享式获取资源,返回值大于等于0则表示获取成功,否则获取失败,子类实现
  • acquireShared(int arg):共享式获取资源模板
  • tryReleaseShared(int arg):共享式释放资源,子类实现
  • releaseShared(int arg):共享式释放资源模板

# 2.1 AQS的int变量

在AQS中维护了一个用关键字volatile修饰同步状态变量state ,代表着该共享资源的状态一更改就能被所有线程可见,而AQS的加锁方式本质上就是多个线程通过CAS完成对state值的修改,当state为0时代表线程可以竞争锁,不为0时代表当前对象锁已经被占有。所以state的具体语义由实现者去定义,现有的ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch定义的state语义都不一样。

  • ReentrantLock:state表示锁的持有次数。对于不可重入锁,state的值为0表示锁是可获取的,为1表示锁已被占有;对于可重入锁,state的值表示同一个线程重入锁的次数。
  • ReentrantReadWriteLock:state的高16位用于表示读锁的持有数量(支持多个线程同时持有读锁),低16位用于表示写锁的状态(写锁为独占锁)。
  • Semaphore:state表示当前可用的许可数。
  • CountDownLatch:state表示倒计时计数器的当前值,倒计时结束(即state为0)时,等待的线程被释放。

通过内置的FIFO队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对State值的修改。其他线程来加锁时则会失败,加锁失败的线程会被放入一个FIFO的等待队列中,这些线程会被UNSAFE.park()操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。

# 2.2 AQS的CLH队列

AQS中的队列是受到CLH队列启发的一个变体,实现为一个虚拟的双向FIFO队列。这个队列通过节点(Node)连接,管理那些因为未能立即获得同步状态而等待的线程。不同于CLH的单向链表结构,AQS的队列支持线程的阻塞和唤醒操作,适用于更广泛的同步场景。

注意事项

当我们在在AQS中提到“CLH队列”时,实际上是指AQS内部使用的一种特定的双向链表队列。

当一个线程竞争资源失败,就会将等待资源的线程封装成一个Node节点,通过CAS原子操作插入队列尾部,最终不同的Node节点连接组成了一个CLH队列,这些线程会被UNSAFE.park()操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。所以说AQS通过CLH队列管理竞争资源的线程,个人总结CLH队列具有如下几个优点:

  1. FIFO顺序确保公平性:AQS的队列是按照FIFO(先进先出)的顺序来管理等待获取同步状态的线程,这种方式天然地支持了一种公平性原则,即先尝试获取资源的线程会先被考虑。这有助于防止“饥饿”现象,尤其在竞争激烈的环境中。

  2. CAS操作实现非阻塞的队列管理:通过使用CAS(Compare-And-Swap)操作来完成节点的安全插入和移除,AQS能够在不需要锁的情况下,安全地管理其内部的队列。这种非阻塞的队列操作减少了锁竞争,提高了同步机制的效率。

  3. 结合自旋锁思想与阻塞机制:虽然AQS队列的管理采用了类似自旋锁的思想,特别是在节点的快速插入过程中,但当获取同步状态失败的线程被封装成Node加入队列后,AQS使用Unsafe.park()和Unsafe.unpark()方法来挂起和唤醒线程,而不是让线程持续自旋。这种策略在等待同步状态时更为高效,因为它减少了不必要的CPU消耗。

CLH队列用一张原理图来表示大致如下:

image-20230210111957060

  • 头结点(head):通常作为哨兵节点存在,其thread字段为null。这个节点不代表任何等待获取同步状态的线程。
  • 尾节点(tail):指向队列中最后一个节点,新加入的节点将插入到tail节点之后。

在AQS的等待队列中,第一个节点为虚节点(也叫哨兵节点),其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是从第二个节点开始的。

# 2.3 内部类Node

Node是AQS的内部类,每个等待资源的线程都会封装成Node节点组成CLH队列或条件队列,所以说Node是非常重要的部分,理解它是理解AQS的第一步。

1711244659988

Node类里面的变量大部分都很好理解,其中waitStatus、nextWaiter有一些注意事项,下面做个补充说明

volatile int waitStatus 等待状态有以下几个值

waitStatus等待状态如下

Node nextWaiter 特殊标记

  • Node在FIFO队列时,nextWaiter表示共享式或独占式标记
  • Node在条件队列时,nextWaiter表示下个Node节点指针

# 3. AQS前置知识点

# 3.1 模板方法

AbstractQueuedSynchronizer是个抽象类,所有用到方法的类都要继承此类的若干方法,对应的设计模式就是模版模式。

模版模式定义:一个抽象类公开定义了执行它的方法的方式/模板。它的子类可以按需要重写方法实现,但调用将以抽象类中定义的方式进行。这种类型的设计模式属于行为型模式。

# 3.2 LookSupport(线程阻塞工具类)

为什么需要LookSupport?

传统的线程等待唤醒机制有两种方式分别是 synchronized(wait和notify)和 JUC 包中的显示锁 Lock(condition 的 await() 方法和 signal()方法),但是这两个方式有两个缺点,分别是都不能脱离 synchronized,和lock、unlock,如果脱离就会报错,还有就是 wait 和 notify以及await 和 signal 的执行顺序要固定,必须先wait然后在notify,否则会导致程序无法结束。

  • 所以出现第三种方式,那就是线程阻塞工具类 LockSupport(park和unpark),LockSupport 类可以在任何地方阻塞当前线程以及唤醒指定被阻塞的线程。
  • LockSupport 所有的方法都是静态方法,主要有两类方法:park 和 unpark。
void park(Object blocker); // 暂停当前线程
void parkNanos(Object blocker, long nanos); // 暂停当前线程,不过有超时时间的限制
void parkUntil(Object blocker, long deadline); // 暂停当前线程,直到某个时间
void park(); // 无期限暂停当前线程
void parkNanos(long nanos); // 暂停当前线程,不过有超时时间的限制
void parkUntil(long deadline); // 暂停当前线程,直到某个时间
void unpark(Thread thread); // 恢复当前线程
Object getBlocker(Thread t);
1
2
3
4
5
6
7
8

LockSupport 类使用了一种名为 Permit(许可)的概念来做到阻塞和唤醒线程的功能,每个线程都有一个许可(Permit)

  • 初始状态:每个线程开始时,其permit的数量是0,表示没有许可。
  • 许可的获取(park方法):当一个线程调用LockSupport.park()时,如果该线程的permit数量为0,则线程将被阻塞挂起。如果permit数量为1,则线程不会被阻塞,park调用会消耗掉这个许可(将permit设置回0),线程可以继续执行。
  • 许可的释放(unpark方法):当调用LockSupport.unpark(Thread thread)时,如果指定的线程的permit数量为0,unpark将其增加到1(如果已经是1,则保持不变)。这样做即可保证,如果该线程已经因调用park而被挂起,它将被唤醒;如果该线程还未调用park,那么下次调用park时,它将立即返回而不会挂起。

Permit(许可)的特点

  • 非累积性:permit的数量只能是0或1,多次调用unpark不会累积许可。这意味着,即使unpark被调用多次,只要调用park,许可即被消耗,线程再次调用park时会再次阻塞(除非再次调用unpark)。
  • 立即生效:unpark操作可以在park之前调用,让permit成为可用状态。这种情况下,线程在随后执行park操作时将不会阻塞,因为它已经拥有了许可。

使用场景:这种基于许可的机制非常适用于控制线程间的执行顺序。例如,在某些场景下,可能需要确保线程B在线程A执行到某个特定点之后再执行,通过park和unpark搭配使用,可以简单地实现这种需求,而不需要依赖于Java的传统对象监视器方法(wait/notify)。

一个经典的使用 LockSupport 的场景是实现两个线程交替打印数字和字母:












 






 











public class AlternatePrintDemo {

    static Thread thread1, thread2;

    public static void main(String[] args) {
        thread1 = new Thread(() -> {
            for (int i = 1; i <= 26; i++) {
                System.out.print(i);
                // 唤醒thread2线程
                LockSupport.unpark(thread2);
                // 阻塞当前线程
                LockSupport.park();
            }
        });

        thread2 = new Thread(() -> {
            for (char c = 'A'; c <= 'Z'; c++) {
                // 阻塞当前线程,等待被thread1唤醒
                LockSupport.park();
                System.out.print(c);
                // 唤醒thread1线程
                LockSupport.unpark(thread1);
            }
        });

        thread1.start();
        thread2.start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 3.3 CAS

CAS 是 CPU指令级别实现了原子性的比较和交换(Conmpare And Swap)操作,注意CAS不是锁只是CPU提供的一个原子性操作指令。图片

CAS在语言层面不进行任何处理,直接将原则操作实现在硬件级别实现,之所以可以实现硬件级别的操作核心是因为CAS操作类中有个核心类UnSafe类。

关于CAS引发的ABA问题、性能开销问题、只能保证一个共享变量之间的原则性操作问题,CAS前面已经写过,在此不再重复讲解。

注意:并不是说 CAS 一定比SYN好,如果高并发执行时间久 ,用SYN好, 因为SYN底层用了wait() 阻塞后是不消耗CPU资源的。如果锁竞争不激烈说明自旋不严重,此时用CAS。

# 4. 条件变量

Object的wait、notify函数是配合Synchronized锁实现线程间同步协作的功能,AQS的ConditionObject条件变量也提供这样的功能,通过ConditionObject的await和signal两类函数完成。

不同于Synchronized锁,一个AQS可以对应多个条件变量,而Synchronized只有一个。

img

如上图所示,ConditionObject内部维护着一个单向条件队列,不同于CLH队列,条件队列只入队执行await的线程节点,并且加入条件队列的节点,不能在CLH队列, 条件队列出队的节点,会入队到CLH队列。

当某个线程执行了ConditionObject的await函数,阻塞当前线程,线程会被封装成Node节点添加到条件队列的末端,其他线程执行ConditionObject的signal函数,会将条件队列头部线程节点转移到CLH队列参与竞争资源,具体流程如下图

图片

最后补充下,条件队列Node类是使用nextWaiter变量指向下个节点,并且因为是单向队列,所以prev与next变量都是null

# 5. 独占模式 源码讲解

讲完了AQS的一些基础定义,我们就可以开始学习同步的具体运行机制了,为了更好的演示,我们用ReentrantLock作为使用入口,一步步跟进源码探究AQS底层是如何运作的,这里说明一下,因为ReentrantLock底层调用的AQS是独占模式,所以下文讲解的AQS源码也是针对独占模式的操作

image-20240324080458619

# 5.1 加锁过程

我们都知道,ReentrantLock的加锁和解锁方法分别为lock()和unLock(),我们先来看获取锁的方法。

public ReentrantLock() {
     // 默认创建的就是非公平锁
    sync = new NonfairSync();   
}

// 默认非公平的加锁实现
final void lock() {
    // 使用CAS尝试将state从0设置为1,以获取锁
    if (compareAndSetState(0, 1))
        // 如果成功,当前线程就设置为该锁的独占持有者
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 如果CAS失败(意味着其他线程已持有锁),则调用acquire尝试获取锁
        acquire(1);
}

// 公平锁的加锁实现
final void lock() {
    acquire(1);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  • 对于公平锁来说,讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中。
  • 对于非公平锁来说,不管是否有等待队列,线程进来后直接利用CAS尝试抢占锁,如果抢占成功state值回被改为1,且设置对象独占锁线程为当前线程,否则就调用acquire(1)再次尝试获取锁。

我们假定有两个线程A和B同时竞争锁,A进来先抢占到锁,此时的AQS模型图就类似这样:

img

继续走下面的方法,当我们点击acquire方法的时候,它会跳转到父类AQS类中,这就是典型的模板设计模式;

在模板方法设计模式中,算法的框架(或步骤序列)被定义在超类的方法中,而一些步骤的实现则留给子类来完成。

  • AQS的acquire方法定义了获取同步状态(例如,锁)的高层逻辑。这个方法接受一个表示请求同步状态的数量的参数(对于许多锁实现来说,这个参数通常是1)。
  • 模板方法:acquire方法本身作为一个模板,其内部会调用tryAcquire方法,这是一个需要被AQS子类实现的抽象方法,用于尝试直接获取同步状态。如果tryAcquire返回false(表示同步状态获取失败),则当前线程会被加入到等待队列中,直到它可以成功获取到同步状态。
public final void acquire(int arg) {
    // 尝试直接获取锁,如果成功则直接返回
    if (!tryAcquire(arg) &&
        // 如果直接获取锁失败,将线程加入等待队列并尝试获取锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果在等待获取锁的过程中线程被中断,则在此处补上中断
        selfInterrupt();
}
1
2
3
4
5
6
7
8

AQS中的acquire方法包含了几个函数的调用,

  1. tryAcquire(int arg):
    • 这是一个抽象方法,由子类ReentrantLock具体实现。方法的目的是尝试直接获取锁。对于ReentrantLock而言,涉及到尝试将状态state从0设置为1(或在可重入情况下增加持有计数)。
    • 如果tryAcquire成功,意味着当前线程获取了锁,方法执行直接返回。
  2. addWaiter(Node.EXCLUSIVE):
    • 如果tryAcquire失败,说明锁已经被其他线程持有,当前线程需要等待。addWaiter方法将当前线程包装成一个节点(Node),并将其添加到等待队列的尾部。
    • 这里Node.EXCLUSIVE表示这是一个独占模式的节点。
  3. acquireQueued(Node node, int arg):
    • 该方法是AQS中的关键,负责管理线程的等待过程。如果线程无法立即获取锁,则它将在这里阻塞等待。
    • 线程在等待队列中自旋,尝试获取锁。如果获取成功,线程将被移出队列并继续执行。如果在等待过程中线程被中断,acquireQueued将返回true,表示线程在等待过程中被中断过。
  4. selfInterrupt():
    • 如果acquireQueued返回true(表示线程在等待获取锁的过程中被中断过),selfInterrupt方法将会被调用。
    • selfInterrupt实际上就是线程自我中断,通过调用Thread.currentThread().interrupt()来恢复中断状态。这样做的目的是让线程的中断状态被保留,以便调用者能够检测到中断发生并做出相应的处理。

我们一个个来看源码,并结合上面的两个线程来做场景分析。

# 5.1.1 tryAcquire

  • 该方法就是为了再次尝试获取锁
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread(); // 获取当前线程
    int c = getState(); // 获取当前锁的状态
    if (c == 0) { // 如果锁未被占用
        if (compareAndSetState(0, acquires)) { // 使用CAS尝试获取锁
            setExclusiveOwnerThread(current); // 设置当前线程为锁的拥有者
            return true; // 获取锁成功
        }
    }
    else if (current == getExclusiveOwnerThread()) { // 如果锁已被当前线程占用
        int nextc = c + acquires; // 计算新的锁状态值
        if (nextc < 0) // 检查溢出
            throw new Error("Maximum lock count exceeded"); // 状态值溢出异常
        setState(nextc); // 设置新的锁状态
        return true; // 表示重入锁获取成功
    }
    return false; // 获取锁失败
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

当线程B进来后,nonfairTryAcquire方法首先会获取state的值,如果为0,则正常获取该锁,不为0的话判断是否是当前线程占用了,是的话就累加state的值,这里的累加也是为了配合释放锁时候的次数,从而实现可重入锁的效果。

当然,因为之前锁已经被线程A占领了,所以这时候tryAcquire会返回false,继续下面的流程。

# 5.1.2 addWaiter ;抢锁失败,CLH入队

private Node addWaiter(Node mode) {
    // 创建一个新的Node节点,代表当前线程
    Node node = new Node(Thread.currentThread(), mode);
    // 尝试快速路径添加到队列尾部
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // CAS操作设置新的尾节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 快速路径失败,调用enq方法进行完整的入队操作
    enq(node);
    return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

这段代码的主要作用是将当前线程封装成一个Node节点,,并尝试将其加入到AQS的等待队列中。

当一个线程无法立即获取到同步状态并需要等待时,该线程会被封装成一个Node节点。这个Node代表了线程在等待队列中的存在,其中Node构成了一个双向链表结构。首先,代码尝试将这个新创建的节点添加到等待队列的末尾。这一操作首先检查当前的队列tail:

  • 如果tail不为null(即队列已经被初始化并且不为空),它尝试通过compareAndSetTail(pred, node)方法使用CAS操作安全地将新节点设置为尾节点。此时,新节点的prev指向原尾节点,而原尾节点的next指向新节点,从而完成了节点的插入。
  • 如果队列尚未初始化(tail是null),或者上述的快速路径插入失败,enq(node)方法被调用。enq方法负责在队列初始化后或在高并发场景下安全地完成节点的插入。它通过一个自旋循环,不断尝试将节点插入队列尾部,直到成功为止。
private Node enq(final Node node) {
 // CAS"自旋",直到成功加入队尾
    for (;;) {
        Node t = tail;
        if (t == null) {
         // 队列为空,初始化一个Node结点作为Head结点,并将tail结点也指向它
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
         // 把当前结点插入队列尾部
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

当前代码流程如下

  1. CAS自旋循环:方法使用了CAS自旋循环来确保节点能够被安全地加入到队列中。这种循环会一直执行,直到节点成功插入队列为止。
  2. 队列初始化:如果发现tail为null,表示队列尚未初始化,方法会尝试通过compareAndSetHead(new Node())创建一个新的空节点作为头节点,并设置tail指向这个新的头节点。这一步是对队列的初始化。
  3. 节点插入:
    • 在队列非空(即tail不为null)的情况下,当前节点的prev指向原tail节点,尝试通过compareAndSetTail(t, node)更新tail为当前节点,这样当前节点就成为了新的尾部节点。
    • compareAndSetTail(t, node)操作成功后,原tail节点(现在的prev节点)的next指向当前节点,完成了双向链接。
  4. 更新tail指针:在当前节点成功插入队列后,tail会指向这个新插入的节点,因为它现在是队列中的最后一个节点。

image-20240324144247255

如果此时有另一个线程C进来的话,发现锁已经被A拿走了,然后队列里已经有了线程B,那么线程C就只能乖乖排到线程B的后面去

img

# 5.1.3 acquireQueued

一旦加入同步队列,就需要使用该方法,自旋阻塞 唤醒来不断的尝试获取锁,直到被中断或获取到锁。

接着解读方法,通过tryAcquire()和addWaiter(),我们的线程还是没有拿到资源,并且还被排到了队列的尾部,如果让你来设计的话,这个时候你会怎么处理线程呢?其实答案也很简单,能做的事无非两个:

1、循环让线程再抢资源。但仔细一推敲就知道不合理,因为如果有多个线程都参与的话,你抢我也抢只会降低系统性能

2、进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源

毫无疑问,选择2更加靠谱,acquireQueued方法做的也是这样的处理:















 
 








 
















 




final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;  // 标记是否会被中断
        for (;;) {
            final Node p = node.predecessor(); // 获取当前节点的前驱节点
            // 如果前驱是头节点,并且尝试获取锁成功
            if (p == head && tryAcquire(arg)) {
                setHead(node); // 将当前节点设置为头节点
                p.next = null; // 帮助GC,断开原头节点的next链接
                failed = false;
                return interrupted;
            }
            // 如果应该阻塞等待,则进入等待状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; // 如果在等待过程中被中断,记录中断状态
        }
    } finally {
        if (failed) // 如果获取锁失败,则取消获取锁的请求
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 如果前驱节点的状态为SIGNAL,表示当前节点可以安全地阻塞等待
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) { // 如果前驱节点被取消,向前循环直到找到一个未被取消的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 将前驱节点的状态设置为SIGNAL,以便于当前节点在后续被阻塞
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 阻塞当前线程
    return Thread.interrupted(); // 返回当前线程的中断状态,并清除中断标志
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

acquireQueued方法的流程是这样的:

  1. 循环尝试获取锁:acquireQueued方法通过CAS无限循环(自旋)尝试获取锁。
  2. 检查前驱是否为头节点:如果当前Node的前驱节点是头节点,则尝试通过tryAcquire方法获取锁。成功获取锁后,将当前Node节点设置为新的头节点,并清除原头节点的引用。
  3. 判断是否需要阻塞等待:如果当前Node节点的前驱节点不是头节点,或者尝试获取锁失败,调用shouldParkAfterFailedAcquire来判断当前Node节点是否应该被阻塞等待。该方法还负责将前驱节点的状态调整为合适的值以确保当前节点可以安全地被阻塞。
  4. 进入等待状态:如果决定阻塞当前线程,使用LockSupport.park挂起线程。如果线程被唤醒(由于unpark或被中断),检查线程的中断状态。
  5. 处理中断:如果线程在等待过程中被中断,循环继续,interrupted标志被设置为true。如果最终获取锁成功,方法返回时将返回interrupted标志,以便调用者可以进一步处理中断。
  6. 获取锁失败的清理:如果出于任何原因获取锁失败(例如抛出异常或其他情况),在finally块中调用cancelAcquire清理当前节点,避免内存泄露。

注意这个阶段线程A还没有释放锁,所以线程B还在等待队列中被阻塞挂起,图示如下:

img

到这里,加锁的流程就分析完了,其实整体来说也并不复杂,而且当你理解了独占模式加锁的过程,后面释放锁和共享模式的运行机制也没什么难懂的了,所以整个加锁的过程还是有必要多消化下的,也是AQS的重中之重。

为了方便你们更加清晰理解,我加多一张流程图吧

AQS

# 5.2 释放锁

说完了加锁,我们来看看释放锁是怎么做的,AQS中释放锁的方法是release(),当调用该方法时会释放指定量的资源 (也就是锁) ,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

还是一步步看源码吧








 



 





public void unlock() {
    // 调用AQS的release方法来释放锁
    sync.release(1);
}

public final boolean release(int arg) {
    // 尝试释放锁,具体逻辑由子类实现的tryRelease方法完成
    if (tryRelease(arg)) {  // tryRelease方法需要具体的子类去实现
        Node h = head;  // 获取等待队列的头节点
        if (h != null && h.waitStatus != 0)
            // 如果队列中有等待的线程,尝试唤醒队列中的后继线程
            unparkSuccessor(h);
        return true;  // 释放锁成功
    }
    return false;  // 释放锁失败
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 5.2.1 tryRelease

代码上可以看出,核心的逻辑都在tryRelease方法中,该方法的作用是释放资源,AQS里该方法没有具体的实现,需要由自定义的同步器去实现,我们看下ReentrantLock代码中对应方法的源码:

protected final boolean tryRelease(int releases) {
    // 计算释放锁后的state值
    int c = getState() - releases;
    // 检查当前线程是否持有独占锁
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();  // 如果不是,抛出异常
    boolean free = false;  // 标记锁是否完全释放
    if (c == 0) {  // 如果state为0,表示锁已完全释放
        free = true;  // 设置标记为true
        setExclusiveOwnerThread(null);  // 清除独占锁的持有线程
    }
    setState(c);  // 更新锁的state值
    return free;  // 返回锁是否完全释放的标记
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

释放锁的流程总结

  1. 调用unlock()方法:开始解锁流程,通常是在同步代码执行完毕后调用。
  2. 释放资源:通过调用AQS的release(int arg)方法尝试释放资源(锁)。这个过程中,核心逻辑是调用了tryRelease(arg)方法,由具体的同步器(如ReentrantLock)实现。
  3. tryRelease(int releases)逻辑:这个方法首先计算释放锁后的state值,判断是否完全释放了资源(state为0)。如果完全释放,清除持有独占锁的线程标记,并标记锁已完全释放。
  4. 唤醒等待队列中的线程:如果成功释放了锁,并且等待队列中存在等待的线程(waitStatus不为0),则调用unparkSuccessor(h)方法唤醒队列中的后继线程。
  5. 返回结果:如果锁被完全释放,release()方法返回true,否则返回false。

此时AQS中的数据就会变成这样:

图片

# 5.2.2 unparkSuccessor

当完全释放锁资源后,当前线程要做的就是唤醒CLH队列中第一个在等待资源的线程,也就是head结点后面的线程,此时调用的方法是unparkSuccessor(),

private void unparkSuccessor(Node node) {
    // 检查并尝试将节点的等待状态更新为0,以便于后续的唤醒操作
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 寻找需要被唤醒的后继节点,优先检查当前节点的直接后继
    Node s = node.next;
    // 如果直接后继节点不存在或已取消(waitStatus > 0),则从队列尾部向前查找第一个有效的后继节点
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev) {
            if (t.waitStatus <= 0)
                s = t; // 找到了有效的后继节点
        }
    }
    // 如果找到有效的后继节点,则唤醒该节点所代表的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

方法的逻辑很简单,就是先将head的结点状态置为0,避免下面找结点的时候再找到head,然后找到队列中最前面的有效结点,然后唤醒,我们假设这个时候线程A已经释放锁,那么此时队列中排最前边竞争锁的线程B就会被唤醒。然后被唤醒的线程B就会尝试用CAS获取锁,回到acquireQueued方法的逻辑

for (;;) {
final Node p = node.predecessor(); // 获取当前节点的前驱节点
// 如果前驱是头节点,并且尝试获取锁成功
if (p == head && tryAcquire(arg)) {
    setHead(node); // 将当前节点设置为头节点
    p.next = null; // 帮助GC,断开原头节点的next链接
    failed = false;
    return interrupted;
}
// 如果应该阻塞等待,则进入等待状态
if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true; // 如果在等待过程中被中断,记录中断状态
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

当线程B获取锁之后,会把当前结点赋值给head,然后原先的前驱结点 (也就是原来的head结点) 去掉引用链,方便回收,这样一来,线程B获取锁的整个过程就完成了,此时AQS的数据就会变成这样;

img

到这里,我们已经分析完了AQS独占模式下加锁和释放锁的过程,也就是tryAccquire->tryRelease这一链条的逻辑,除此之外,AQS中还支持共享模式的同步,这种模式下关于锁的操作核心其实就是tryAcquireShared->tryReleaseShared这两个方法,我们可以简单看下

# 6. 共享模式 源码讲解

# 6.1 获取锁

AQS中,共享模式获取锁的顶层入口方法是acquireShared,该方法会获取指定数量的资源,成功的话就直接返回,失败的话就进入等待队列,直到获取资源。

public final void acquireShared(int arg) {
 if (tryAcquireShared(arg) < 0)
  doAcquireShared(arg);
}
1
2
3
4

该方法里包含了两个方法的调用,

tryAcquireShared:尝试获取一定资源的锁,返回的值代表获取锁的状态。

doAcquireShared:进入等待队列,并循环尝试获取锁,直到成功。

# 6.1.1 tryAcquireShared

tryAcquireShared在AQS里没有实现,同样由自定义的同步器去完成具体的逻辑,像一些较为常见的并发工具Semaphore、CountDownLatch里就有对该方法的自定义实现,虽然实现的逻辑不同,但方法的作用是一样的,就是获取一定资源的资源,然后根据返回值判断是否还有剩余资源,从而决定下一步的操作。

返回值有三种定义:

  • 负值代表获取失败;
  • 0代表获取成功,但没有剩余的资源,也就是state已经为0;
  • 正值代表获取成功,而且state还有剩余,其他线程可以继续领取

当返回值小于0时,证明此次获取一定数量的锁失败了,然后就会走doAcquireShared方法

# 6.1.2 doAcquireShared

此方法的作用是将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回,这是它的源码:

private void doAcquireShared(int arg) {
 // 加入队列尾部
 final Node node = addWaiter(Node.SHARED);
 boolean failed = true;
 try {
  boolean interrupted = false;
  // CAS自旋
  for (;;) {
   final Node p = node.predecessor();
   // 判断前驱结点是否是head
   if (p == head) {
    // 尝试获取一定数量的锁
    int r = tryAcquireShared(arg);
    if (r >= 0) {
     // 获取锁成功,而且还有剩余资源,就设置当前结点为head,并继续唤醒下一个线程
     setHeadAndPropagate(node, r);
     // 让前驱结点去掉引用链,方便被GC
     p.next = null; // help GC
     if (interrupted)
      selfInterrupt();
     failed = false;
     return;
    }
   }
   // 跟独占模式一样,改前驱结点waitStatus为-1,并且当前线程挂起,等待被唤醒
   if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
  }
 } finally {
  if (failed)
   cancelAcquire(node);
 }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    // head指向自己
    setHead(node);
     // 如果还有剩余量,继续唤醒下一个邻居线程
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

看到这里,你会不会一点熟悉的感觉,这个方法的逻辑怎么跟上面那个acquireQueued() 那么类似啊?对的,其实两个流程并没有太大的差别。只是doAcquireShared()比起独占模式下的获取锁上多了一步唤醒后继线程的操作,当获取完一定的资源后,发现还有剩余的资源,就继续唤醒下一个邻居线程,这才符合"共享"的思想嘛。

这里我们可以提出一个疑问,共享模式下,当前线程释放了一定数量的资源,但这部分资源满足不了下一个等待结点的需要的话,那么会怎么样?

按照正常的思维,共享模式是可以多个线程同时执行的才对,所以,多个线程的情况下,如果老大释放完资源,但这部分资源满足不了老二,但能满足老三,那么老三就可以拿到资源。可事实是,从源码设计中可以看出,如果真的发生了这种情况,老三是拿不到资源的,因为等待队列是按顺序排列的,老二的资源需求量大,会把后面量小的老三以及老四、老五等都给卡住。从这一个角度来看,虽然AQS严格保证了顺序,但也降低了并发能力

接着往下说吧,唤醒下一个邻居线程的逻辑在doReleaseShared()中,我们放到下面的释放锁来解析。

# 6.2 释放锁

共享模式释放锁的顶层方法是releaseShared,它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。下面是releaseShared()的源码:

public final boolean releaseShared(int arg) {
 if (tryReleaseShared(arg)) {
  doReleaseShared();
  return true;
 }
 return false;
}
1
2
3
4
5
6
7

该方法同样包含两部分的逻辑:

tryReleaseShared:释放资源。

doAcquireShared:唤醒后继结点。

跟tryAcquireShared方法一样,tryReleaseShared在AQS中没有具体的实现,由子同步器自己去定义,但功能都一样,就是释放一定数量的资源。

释放完资源后,线程不会马上就收工,而是唤醒等待队列里最前排的等待结点。

# 6.2.1 doAcquireShared

唤醒后继结点的工作在doReleaseShared()方法中完成,我们可以看下它的源码:

private void doReleaseShared() {
 for (;;) {
  // 获取等待队列中的head结点
  Node h = head;
  if (h != null && h != tail) {
   int ws = h.waitStatus;
   // head结点waitStatus = -1,唤醒下一个结点对应的线程
   if (ws == Node.SIGNAL) {
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
     continue;            // loop to recheck cases
    // 唤醒后继结点
    unparkSuccessor(h);
   }
   else if (ws == 0 &&
      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue;                // loop on failed CAS
  }
  if (h == head)                   // loop if head changed
   break;
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

代码没什么特别的,就是如果等待队列head结点的waitStatus为-1的话,就直接唤醒后继结点,唤醒的方法unparkSuccessor()在上面已经讲过了,这里也没必要再复述。

总的来看,AQS共享模式的运作流程和独占模式很相似,只要掌握了独占模式的流程运转,共享模式什么的不就那样吗,没难度。这也是我为什么共享模式讲解中不画流程图的原因,没必要嘛。

# 7. Condition

介绍完了AQS的核心功能,我们再扩展一个知识点,在AQS中,除了提供独占/共享模式的加锁/解锁功能,它还对外提供了关于Condition的一些操作方法。

Condition是个接口,在jdk1.5版本后设计的,基本的方法就是await()和signal()方法,功能大概就对应Object的wait()和notify(),Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。

一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现 ,AQS中就定义了一个类ConditionObject来实现了这个接口。

image-20240323235644081

以下是 Condition 的基本使用方法:

  1. await():类似于 Object.wait(),使当前线程进入等待状态,并且释放其持有的锁,直到其他线程调用 Condition.signal() 或 Condition.signalAll() 方法,线程才有可能返回。
  2. signal():类似于 Object.notify(),随机唤醒一个等待在此 Condition 上的线程。
  3. signalAll():类似于 Object.notifyAll(),唤醒所有等待在此 Condition 上的线程。

笔记

ReentrantLock搭配Condition类使用时,虽然仍然不能指定唤醒特定的线程,但可以通过多个Condition对象实现对等待线程更细粒度的管理,允许分组唤醒等待线程,这在某种程度上提供了更高的灵活性和控制能力。

以下是一个使用 ReentrantLock 和 Condition 的示例:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionExample {
    private final ReentrantLock lock = new ReentrantLock();
    // 为不同的条件创建不同的Condition
    private final Condition conditionA = lock.newCondition();
    private final Condition conditionB = lock.newCondition();

    // 等待条件A
    public void awaitA() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + ": 等待条件A");
            conditionA.await(); // 等待
            System.out.println(Thread.currentThread().getName() + ": 条件A满足,继续执行");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    // 等待条件B
    public void awaitB() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + ": 等待条件B");
            conditionB.await(); // 等待
            System.out.println(Thread.currentThread().getName() + ": 条件B满足,继续执行");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    // 唤醒等待条件A的所有线程
    public void signalAllA() {
        lock.lock();
        try {
            System.out.println("唤醒所有等待条件A的线程");
            conditionA.signalAll();
        } finally {
            lock.unlock();
        }
    }

    // 唤醒等待条件B的所有线程
    public void signalAllB() {
        lock.lock();
        try {
            System.out.println("唤醒所有等待条件B的线程");
            conditionB.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionExample example = new ConditionExample();
        new Thread(example::awaitA, "Thread-A1").start();
        new Thread(example::awaitB, "Thread-B1").start();

        Thread.sleep(3000); // 模拟其他操作

        example.signalAllA(); // 只唤醒等待条件A的线程
        example.signalAllB(); // 只唤醒等待条件B的线程
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

在这个示例中,通过使用两个不同的Condition实例conditionA和conditionB,可以精确控制哪一组线程被唤醒。比如,signalAllA方法只会唤醒那些等待conditionA条件的线程,而不会影响等待conditionB条件的线程。这种方式提供了比使用单一的Object监视器方法(wait, notify, notifyAll)更灵活的线程间协调机制。

翻看AQS的源码,我们会发现Condition中定义了两个属性firstWaiter和lastWaiter,前面说了,AQS中包含了一个FIFO的CLH等待队列,每个Conditon对象就包含这样一个等待队列,而这两个属性分别表示的是等待队列中的首尾结点,

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
1
2
3
4

注意:Condition当中的等待队列和AQS主体的同步等待队列是分开的,两个队列虽然结构体相同,但是作用域是分开的

# 7.1 await

先看await()的源码:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 将当前线程加入到等待队列中
    Node node = addConditionWaiter();
    // 完全释放占有的资源,并返回资源数
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 循环判断当前结点是不是在Condition的队列中,是的话挂起
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

当一个线程调用Condition.await()方法,将会以当前线程构造结点,这个结点的waitStatus赋值为 Node.CONDITION,也就是-2,并将结点从尾部加入等待队列,然后尾部结点就会指向这个新增的结点。

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

我们依然用上面的demo来演示,此时,线程A获取锁并调用**Condition.await()**方法后,AQS内部的数据结构会变成这样:

img

在Condition队列中插入对应的结点后,线程A会释放所持有的资源,走到while循环那层逻辑

while (!isOnSyncQueue(node)) {
 LockSupport.park(this);
 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  break;
}
1
2
3
4
5

isOnSyncQueue方法的会判断当前的线程节点是不是在同步队列中,这个时候此结点还在Condition队列中,所以该方法返回false,这样的话循环会一直持续下去,线程被挂起,等待被唤醒,此时,线程A的流程暂时停止了。

当线程A调用await()方法挂起的时候,线程B获取到了线程A释放的资源,然后执行signal()方法:

# 7.2 signal

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
1
2
3
4
5
6
7

先判断当前线程是否为获取锁的线程,如果不是则直接抛出异常。接着调用doSignal()方法来唤醒线程。

private void doSignal(Node first) {
 // 循环,从队列一直往后找不为空的首结点
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
 // CAS循环,将结点的waitStatus改为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
 // 上面已经分析过,此方法会把当前结点加入到等待队列中,并返回前驱结点
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

从doSignal的代码中可以看出,这时候程序寻找的是Condition等待队列中首结点firstWaiter的结点,此时该结点指向的是线程A的结点,所以之后的流程作用的都是线程A的结点。

这里分析下transferForSignal方法,先通过CAS自旋将结点waitStatus改为0,然后就把结点放入到同步队列 (此队列不是Condition的等待队列) 中,然后再用CAS将同步队列中该结点的前驱结点waitStatus改为Node.SIGNAL,也就是-1,此时AQS的数据结构大概如下(少画了个箭头,大家就当head结点是线程A结点的前驱结点就好):

img

回到await()方法,当线程A的结点被加入同步队列中时,isOnSyncQueue()会返回true,跳出循环,

while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
1
2
3
4
5
6
7
8
9
10
11

接着执行acquireQueued()方法,这里就不用多说了吧,尝试重新获取锁,如果获取锁失败继续会被挂起,直到另外线程释放锁才被唤醒。

所以,当线程B释放完锁后,线程A被唤醒,继续尝试获取锁,至此流程结束。

对于这整个通信过程,我们可以画一张流程图展示下:

img

# 7.3 总结

说完了Condition的使用和底层运行机制,我们再来总结下它跟普通 wait/notify 的比较,一般这也是问的比较多的,Condition大概有以下两点优势:

  • Condition 需要结合 Lock 进行控制,使用的时候要注意一定要对应的unlock(),可以对多个不同条件进行控制,只要new 多个 Condition对象就可以为多个线程控制通信,wait/notify 只能和 synchronized 关键字一起使用,并且只能唤醒一个或者全部的等待队列;
  • Condition 有类似于 await 的机制,因此不会产生加锁方式而产生的死锁出现,同时底层实现的是 park/unpark 的机制,因此也不会产生先唤醒再挂起的死锁,一句话就是不会产生死锁,但是 wait/notify 会产生先唤醒再挂起的死锁。

以上便是本文的全部内容,本人才疏学浅,文章有什么错误的地方,欢迎大佬们批评指正!我是scholar,一个在互联网行业的小白,立志成为更好的自己。

如果你想了解更多关于scholar (opens new window) (opens new window),可以关注公众号-书生带你学编程,后面文章会首先同步至公众号。

公众号封面

编辑此页 (opens new window)
上次更新: 2024/12/28, 18:32:08
第三章 synchronized的总结
第五章 深入ReentrantReadWriteLock

← 第三章 synchronized的总结 第五章 深入ReentrantReadWriteLock→

Theme by Vdoing | Copyright © 2019-2025 程序员scholar
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式