程序员scholar 程序员scholar
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • Web 标准

    • HTML
    • CSS
    • JavaScript
  • 前端框架

    • Vue2
    • Vue3
    • Vue3 + TS
    • 微信小程序
    • uni-app
  • 工具与库

    • jQuery
    • Ajax
    • Axios
    • Webpack
    • Vuex
    • WebSocket
    • 第三方登录
  • 后端与语言扩展

    • ES6
    • Typescript
    • node.js
  • Element-UI
  • Apache ECharts
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • Web 标准

    • HTML
    • CSS
    • JavaScript
  • 前端框架

    • Vue2
    • Vue3
    • Vue3 + TS
    • 微信小程序
    • uni-app
  • 工具与库

    • jQuery
    • Ajax
    • Axios
    • Webpack
    • Vuex
    • WebSocket
    • 第三方登录
  • 后端与语言扩展

    • ES6
    • Typescript
    • node.js
  • Element-UI
  • Apache ECharts
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
npm

(进入注册为作者充电)

  • JUC并发编程

    • 第一章 线程的基础概念
    • 第二章 并发编程的三大特性
    • 第三章 synchronized的总结
    • 第四章 深入ReentrantLock
    • 第五章 深入ReentrantReadWriteLock
    • 第六章 阻塞队列
    • 第七章 线程池
    • 第八章 并发集合
    • 第九章 并发工具
      • 一、CountDownLatch(倒计数器)
        • 1.1 CountDownLatch介绍
        • 1.2 CountDownLatch应用
        • 1.3 CountDownLatch源码分析
        • 1.3.1 有参构造
        • 1.3.2 await方法
        • 1.3.3 countDown方法
      • 二、CyclicBarrier(循环屏障)
        • 2.1 CyclicBarrier介绍
        • 2.2 CyclicBarrier应用
        • 2.3 CyclicBarrier源码分析
        • 2.3.1 CyclicBarrier的核心属性
        • 2.3.2 CyclicBarrier的有参构造
        • 2.3.3 CyclicBarrier中的await方法
      • 三、Semaphone(信号量/许可证)
        • 3.1 Semaphore介绍
        • 3.2 Semaphore应用
        • 3.3 Semaphore源码分析
        • 3.3.1 Semaphore的整体结构
        • 3.3.2 Semaphore的非公平的获取资源
        • 3.3.3 Semaphore公平实现
        • 3.3.4 Semaphore释放资源
        • 3.4 AQS中PROPAGATE节点
        • 3.4.1 掌握JDK1.5-Semaphore执行流程图
        • 3.4.2 分析JDK1.8的变化
      • 四、线程间通信
        • 4.1 synchronized 方案
        • 4.2 Lock 方案
        • 4.3 三个线程交替打印 ABC
        • 1. 使用Object的wait和notifyAll方法
        • 2. 使用ReentrantLock和Condition
        • 3. 使用Semaphore(信号量)
        • 4. 使用LockSupport
        • 4.4 线程间定制化通信
    • 第十章 异步编程
  • JavaEE
  • JUC并发编程
scholar
2024-03-14
目录

第九章 并发工具

# 一、CountDownLatch(倒计数器)

# 1.1 CountDownLatch介绍

前言

CountDownLatch 的适用场景通常是多线程环境下,某些任务需要等待其他任务完成后才能开始执行。例如,一个主线程需要等待所有子线程执行完毕后再进行下一步操作,或者一个线程池需要等待所有任务都执行完毕后再进行资源回收等操作。

方法 作用
CountDownLatch(int count) 在构造的时候,传入一个计数的个数
await() 等待计数器为0在继续下面操作
await(long timeout, TimeUnit unit) 在给定的时间内等待计数器达到0,如果超时则不会等待
countDown() 计数器减1
getCount() 返回当前计数器的计数

CountDownLatch确实是Java并发包java.util.concurrent中提供的一个同步辅助类,用来协调多个线程之间的同步,或者在某些操作之前在多个线程之间等待。

  • 初始化:CountDownLatch的构造函数接收一个整型的参数count,这个参数是计数器的初始值,表示需要等待的事件数量。
  • await()方法:调用await()方法的线程会被阻塞,直到计数器的值变为0。这使得一个或多个线程可以等待其他线程完成各自的工作。
  • countDown()方法:每当一个事件完成了,就可以调用countDown()方法使计数器的值减1。当计数器的值减到0时,所有因调用await()方法而等待的线程会被唤醒并继续执行。

注意事项

CountDownLatch是一次性的,计数器的值只能在构造时设置一次,而且CountDownLatch没有提供任何机制来重置计数器的值。

场景: 6 个同学陆续离开教室后值班同学才可以关门。

public class CountDownLatchDemo {
    //6个同学陆续离开教室之后,班长锁门
    public static void main(String[] args) throws InterruptedException {

        //创建CountDownLatch对象,设置初始值
        CountDownLatch countDownLatch = new CountDownLatch(6);

        //6个同学陆续离开教室之后
        for (int i = 1; i <=6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" 号同学离开了教室");

                //计数  -1
                countDownLatch.countDown();

            },String.valueOf(i)).start();
        }

        //主线程等待
        countDownLatch.await();

        System.out.println(Thread.currentThread().getName()+" 班长锁门走人了");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

image-20240316145529701

# 1.2 CountDownLatch应用

  • 如果有三个业务需要并行处理,并且需要知道三个业务全部都处理完毕了。需要一个并发安全的计数器来操作。

  • CountDownLatch就可以实现。给CountDownLatch设置一个数值。可以设置3。

  • 每个业务处理完毕之后,执行一次countDown方法,指定的3每次在执行countDown方法时,对3进行-1。

  • 主线程可以在业务处理时,执行await,主线程会阻塞等待任务处理完毕。

  • 当设置的3基于countDown方法减为0之后,主线程就会被唤醒,继续处理后续业务。

image-20240316144742153

当咱们的业务中,出现2个以上允许并行处理的任务,并且需要在任务都处理完毕后,再做其他处理时,可以采用CountDownLatch去实现这个功能。

public class CompanyTest {

    // 创建固定大小为3的线程池,用于并发执行任务
    static ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);

    // 使用CountDownLatch等待3个任务完成
    static CountDownLatch countDownLatch = new CountDownLatch(3);

    public static void main(String[] args) throws InterruptedException {
        System.out.println("主业务开始执行");
        sleep(1000); // 模拟主业务执行前的准备工作

        // 提交三个并行任务到线程池执行
        executor.execute(CompanyTest::a);
        executor.execute(CompanyTest::b);
        executor.execute(CompanyTest::c);
        System.out.println("三个任务并行执行,主业务线程等待");

        // 使用CountDownLatch等待所有任务完成,或超时
        if (countDownLatch.await(10, TimeUnit.SECONDS)) {
            // 所有任务在规定时间内完成
            System.out.println("三个任务处理完毕,主业务线程继续执行");
        } else {
            // 任务没有在规定时间内全部完成
            System.out.println("三个任务没有全部处理完毕,执行其他的操作");
        }
    }

    // 任务A的具体执行逻辑
    private static void a() {
        System.out.println("A任务开始");
        sleep(1000); // 模拟任务执行耗时
        System.out.println("A任务结束");
        countDownLatch.countDown(); // 任务完成,计数器减1
    }

    // 任务B的具体执行逻辑
    private static void b() {
        System.out.println("B任务开始");
        sleep(1500); // 模拟任务执行耗时
        System.out.println("B任务结束");
        countDownLatch.countDown(); // 任务完成,计数器减1
    }

    // 任务C的具体执行逻辑
    private static void c() {
        System.out.println("C任务开始");
        sleep(2000); // 模拟任务执行耗时
        System.out.println("C任务结束");
        countDownLatch.countDown(); // 任务完成,计数器减1
    }

    // 模拟任务执行过程中的延时
    private static void sleep(long timeout){
        try {
            Thread.sleep(timeout);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

image-20240316145457256

# 1.3 CountDownLatch源码分析

保证CountDownLatch就是一个计数器,没有什么特殊的功能,查看源码也只是查看计数器实现的方式

发现CountDownLatch的内部类Sync继承了AQS,CountDownLatch就是基于AQS实现的计数器。

AQS就是一个state属性,以及AQS双向链表

猜测计数器的数值实现就是基于state去玩的。

主线程阻塞的方式,也是阻塞在了AQS双向链表中。

# 1.3.1 有参构造

就是构建内部类Sync,并且给AQS中的state赋值

// CountDownLatch的有参构造
public CountDownLatch(int count) {
    // 健壮性校验
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 构建内部类,Sync传入count
    this.sync = new Sync(count);
}

// AQS子类,Sync的有参构造
Sync(int count) {
    // 就是给AQS中的state赋值
    setState(count);
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 1.3.2 await方法

await方法就时判断当前CountDownLatch中的state是否为0,如果为0,直接正常执行后续任务

如果不为0,以共享锁的方式,插入到AQS的双向链表,并且挂起线程

// 一般主线程await的方法,阻塞主线程,等待state为0
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// 执行了AQS的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    // 判断线程是否中断,如果中断标记位是true,直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        // 共享锁挂起的操作
        doAcquireSharedInterruptibly(arg);
}

// tryAcquireShared在CountDownLatch中的实现
protected int tryAcquireShared(int acquires) {
    // 查看state是否为0,如果为0,返回1,不为0,返回-1
    return (getState() == 0) ? 1 : -1;
}

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 封装当前先成为Node,属性为共享锁
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 在这,就需要挂起当前线程。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 1.3.3 countDown方法

countDown方法本质就是对state - 1,如果state - 1后变为0,需要去AQS的链表中唤醒挂起的节点

// countDown对计数器-1
public void countDown() {
    // 是-1。
    sync.releaseShared(1);
}

// AQS提供的功能
public final boolean releaseShared(int arg) {
    // 对state - 1
    if (tryReleaseShared(arg)) {
        // state - 1后,变为0,执行doReleaseShared
        doReleaseShared();
        return true;
    }
    return false;
}
// CountDownLatch的tryReleaseShared实现
protected boolean tryReleaseShared(int releases) {
    // 死循环是为了避免CAS并发问题
    for (;;) {
        // 获取state
        int c = getState();
        // state已经为0,直接返回false
        if (c == 0)
            return false;
        // 对获取到的state - 1
        int nextc = c-1;
        // 基于CAS的方式,将值赋值给state
        if (compareAndSetState(c, nextc))
            // 赋值完,发现state为0了。此时可能会有线程在await方法处挂起,那边挂起,需要这边唤醒
            return nextc == 0;
    }
}

// 如何唤醒在await方法处挂起的线程
private void doReleaseShared() {
    // 死循环
    for (;;) {
        // 拿到head
        Node h = head;
        // head不为null,有值,并且head != tail,代表至少2个节点
        // 一个虚拟的head,加上一个实质性的Node
        if (h != null && h != tail) {
            // 说明AQS队列中有节点
            int ws = h.waitStatus;
            // 如果head节点的状态为 -1.
            if (ws == Node.SIGNAL) {
                // 先对head节点将状态从-1,修改为0,避免重复唤醒的情况
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;  
                // 正常唤醒节点即可,先看head.next,能唤醒就唤醒,如果head.next有问题,从后往前找有效节点
                unparkSuccessor(h);
            }
            // 会在Semaphore中谈到这个位置
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;  
        }
        // 会在Semaphore中谈到这个位置
        if (h == head)  
            break;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

# 二、CyclicBarrier(循环屏障)

# 2.1 CyclicBarrier介绍

CyclicBarrier 是一个同步工具,它允许一组线程互相等待,达到一个共同点后再一起继续执行。 CyclicBarrier还支持一个可选的Runnable参数,当所有线程都到达屏障后,在屏障打开之前,由最后一个到达屏障的线程执行完Runnable 任务以后才会打开屏障。

方法 参数 返回类型 描述
CyclicBarrier(int parties) int parties - 构造一个CyclicBarrier实例,指定通过屏障需要的线程数量。
CyclicBarrier(int parties, Runnable barrierAction) int parties, Runnable barrierAction - 构造一个CyclicBarrier实例,指定通过屏障需要的线程数量,并在所有线程到达屏障时执行一个预定义的任务。
await() - int 调用线程在所有参与者线程都到达屏障之前,进行等待。
await(long timeout, TimeUnit unit) long timeout, TimeUnit unit int 调用线程在所有参与者线程都到达屏障之前进行等待,或者直到超出指定的等待时间。
getParties() - int 获取需要多少个线程到达才能触发屏障线程总数。
getNumberWaiting() - int 获取当前在屏障前等待的线程数量。
isBroken() - boolean 检查屏障是否因为其中一个参与线程的中断或超时而进入损坏状态。
reset() - void 重置屏障到其初始状态。

CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数 parties 就表示被屏障拦截的线程数量,每个线程执行完各自的逻辑后可以调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程就会被阻塞。直到抵达屏障的数量达到 parties,屏障打开,被阻塞的线程才可以继续往下执行。

CyclicBarrier 适用的场景

当一组线程需要互相等待至某个状态之后才能各自继续执行,而且这种等待-执行的过程可能不止一次,这时候就可以使用 CyclicBarrier。

下面是演示使用CyclicBarrier,模拟了多个线程(代表召唤者)集合召唤龙珠的场景。当所有召唤者都集齐龙珠后(即所有线程到达屏障点),屏障触发并执行召唤神龙的任务:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class DragonBallSummonExample {
    // 假设需要7个召唤者(线程)集齐7颗龙珠
    private static final int SUMMONERS = 7;
    // 创建CyclicBarrier对象,并设置当屏障触发时执行的任务,即召唤神龙
    private static CyclicBarrier barrier = new CyclicBarrier(SUMMONERS, new Runnable() {
        @Override
        public void run() {
            // 所有召唤者都集齐龙珠后,执行这里的代码召唤神龙
            System.out.println("所有龙珠已集齐,现在召唤神龙!");
        }
    });

    public static void main(String[] args) {
        // 启动7个线程,每个线程代表一个召唤者
        for (int i = 1; i <= SUMMONERS; i++) {
            final int dragonBall = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("召唤者" + Thread.currentThread().getId() + "已经收集到第" + dragonBall + "颗龙珠");
                    try {
                        // 调用await方法等待其他召唤者集齐龙珠
                        barrier.await();
                        // 当Runnable任务执行完成以后,所有召唤者下线
                        System.out.println("召唤者" + Thread.currentThread().getId() + "下线");
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

CyclicBarrier 的原理是什么?

CyclicBarrier 内部通过一个 count 变量作为计数器,count 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减 1。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。

CyclicBarrier对比CountDownLatch区别

  • 底层实现不同:CyclicBarrier基于ReentrantLock做的。CountDownLatch直接基于AQS做的。
  • 应用场景不同:CountDownLatch的计数器只能使用一次。而CyclicBarrier在计数器达到0之后,可以通过重置计数器。CyclicBarrier可以实现相比CountDownLatch更复杂的业务,执行业务时出现了错误,可以重置CyclicBarrier计数器,再次执行一次。
  • CyclicBarrier还提供了很多其他的功能:
    • 通过getNumberWaiting()方法可以获取到正在阻塞的线程有多少。
    • 在线程互相等待时,如果有等待的线程中断,可以isBroken()方法检查并抛出异常,避免无限等待的问题。
  • CountDownLatch一般是让主线程等待,让子线程对计数器--。CyclicBarrier更多的让子线程也一起计数和等待,等待的线程达到数值后,再统一唤醒。

# 2.2 CyclicBarrier应用

CyclicBarrier使用的几个注意事项

  1. 指定等待时间:线程在等待屏障解除时,可以设置一个超时时间。如果在这段时间内屏障没有解除,线程会继续执行。
  2. 线程中断处理:如果一个等待中的线程被中断,CyclicBarrier 会唤醒所有等待的线程。这一点确保了即使出现中断,线程也不会无限期地等待。
  3. 自动重置:与 CountDownLatch 不同,当所有等待线程都达到屏障后,CyclicBarrier 会自动重置其计数器,可以被再次使用。这使得 CyclicBarrier 适合重复使用的场景,如循环的屏障等待。
  4. 中断后的重置:如果等待期间有线程被中断,CyclicBarrier 变为不可用状态。如果希望继续使用,需要调用 reset() 方法来重置 CyclicBarrier,恢复其为初始状态。

下面使用CyclicBarrier来模拟一群人(Tom、Jack、Rose)出国旅游的场景。在所有人都到达集合点之后,导游会发放护照和签证,然后大家一起出发。如果在等待的过程中,任何一个人未能到达(例如因为中断),则会标记为悲剧,表示人没齐,然后提前退出等待。

public class TravelExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建CyclicBarrier对象,设置屏障点为3,以及屏障点达到后执行的任务
        CyclicBarrier barrier = new CyclicBarrier(3, () -> {
            System.out.println("等到各位大佬都到位之后,分发护照和签证等内容!");
        });

        // Tom到位的线程
        new Thread(() -> {
            System.out.println("Tom到位!!!");
            try {
                barrier.await(); // 等待其他人到位
            } catch (Exception e) {
                System.out.println("悲剧,人没到齐!");
                return;
            }
            System.out.println("Tom出发!!!"); // 所有人都到齐后继续执行
        }).start();

        Thread.sleep(100); // 确保线程按顺序启动

        // Jack到位的线程
        new Thread(() -> {
            System.out.println("Jack到位!!!");
            try {
                barrier.await(); // 等待其他人到位
            } catch (Exception e) {
                System.out.println("悲剧,人没到齐!");
                return;
            }
            System.out.println("Jack出发!!!"); // 所有人都到齐后继续执行
        }).start();

        Thread.sleep(100); // 确保线程按顺序启动

        // Rose到位的线程
        new Thread(() -> {
            System.out.println("Rose到位!!!");
            try {
                barrier.await(); // 等待其他人到位
            } catch (Exception e) {
                System.out.println("悲剧,人没到齐!");
                return;
            }
            System.out.println("Rose出发!!!"); // 所有人都到齐后继续执行
        }).start();
        // 输出预期顺序:Tom到位,Jack到位,Rose到位 -> 导游分发护照和签证 -> Tom出发,Jack出发,Rose出发
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

# 2.3 CyclicBarrier源码分析

分成两块内容去查看,首先查看CyclicBarrier的一些核心属性,然后再查看CyclicBarrier的核心方法

# 2.3.1 CyclicBarrier的核心属性

public class CyclicBarrier {
   // 这个静态内部类是用来标记是否中断的
    private static class Generation {
        boolean broken = false;
    }

    /** CyclicBarrier是基于ReentrantLock实现的互斥操作,以及计数原子性操作 */
    private final ReentrantLock lock = new ReentrantLock();
    /** 基于当前的Condition实现线程的挂起和唤醒 */
    private final Condition trip = lock.newCondition();
    /** 记录有参构造传入的屏障数值,不会对这个数值做操作 */
    private final int parties;
    /** 当屏障数值达到0之后,优先执行当前任务  */
    private final Runnable barrierCommand;
    /** 初始化默认的Generation,用来标记线程中断情况 */
    private Generation generation = new Generation();
    /** 每来一个线程等待,就对count进行-- */
    private int count;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 2.3.2 CyclicBarrier的有参构造

掌握构建CyclicBarrier之后,内部属性的情况

// 这个是CyclicBarrier的有参构造
// 在内部传入了parties,屏障点的数值
// 还传入了barrierAction,屏障点的数值达到0,优先执行barrierAction任务
public CyclicBarrier(int parties, Runnable barrierAction) {
    // 健壮性判
    if (parties <= 0) throw new IllegalArgumentException();
    // 当前类中的属性parties是保存屏障点数值的
    this.parties = parties;
    // 将parties赋值给属性count,每来一个线程,继续count做-1操作。
    this.count = parties;
    // 优先执行的任务
    this.barrierCommand = barrierAction;
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 2.3.3 CyclicBarrier中的await方法

在CyclicBarrier中,提供了2个await方法

  • 第一个是无参的方式,线程要死等,直屏障点数值为0,或者有线程中断
  • 第二个是有参方式,传入等待的时间,要么时间到位了,要不就是直屏障点数值为0,或者有线程中断

无论是哪种await方法,核心都在于内部调用的dowait方法

dowait方法主要包含了线程互相等待的逻辑,以及屏障点数值到达0之后的操作

// 包含了线程互相等到的逻辑,以及屏障点数值到达0后的操作
private int dowait(boolean timed, long nanos)throws 
            // 当前新编程中断,抛出这个异常
            InterruptedException, 
            // 其他线程中断,当前线程抛出这个异常
            BrokenBarrierException,
            // await时间到位,抛出这个异常
            TimeoutException {
    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 拿到Generation对象的引用
        final Generation g = generation;

        // 判断下线程中断了么?如果中断了,直接抛出异常
        if (g.broken)
            throw new BrokenBarrierException();

        // 当前线程中断了么?
        if (Thread.interrupted()) {
            // 做了三个实现,
            // 设置broken为true,将count重置,唤醒其他等待的线程
            breakBarrier();
            // 抛出异常
            throw new InterruptedException();
        }

        // 屏障点做--
        int index = --count;
        // 如果屏障点为0,打开屏障啦!!
        if (index == 0) {  
            // 标记
            boolean ranAction = false;
            try {
                // 拿到有参构造中传递的任务
                final Runnable command = barrierCommand;
                // 任务不为null,优先执行当前任务
                if (command != null)
                    command.run();
                // 上述任务执行没问题,标记位设置为true
                ranAction = true;
                // 执行nextGeneration
                // 唤醒所有线程,重置count,重置generation
                nextGeneration();
                return 0;
            } finally {
                // 如果优先执行的任务出了问题i,就直接抛出异常
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 死循环
        for (;;) {
            try {
                //  如果调用await方法,死等
                if (!timed)
                    trip.await();
                // 如果调用await(time,unit),基于设置的nans时长决定await的时长
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 到这,说明线程被中断了
                // 查看generation有没有被重置。
                // 并且当前broken为false,需要做线程中断后的操作。
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }
            // 是否是中断唤醒,是就抛异常。
            if (g.broken)
                throw new BrokenBarrierException();
            // 说明被reset了,返回index的数值。或者任务完毕也会被重置
            if (g != generation)
                return index;
            // 指定了等待的时间内,没有等到所有线程都到位
            if (timed && nanos <= 0L) {
                // 中断任务
                breakBarrier();
                // 抛出异常
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91

# 三、Semaphone(信号量/许可证)

# 3.1 Semaphore介绍

  • sync,ReentrantLock是互斥锁,保证一个资源同一时间只允许被一个线程访问
  • Semaphore(信号量)保证1个或多个资源可以被指定数量的线程同时访问

Semaphore 维护了一个许可集,我们在初始化 Semaphore 时需要为这个许可集传入一个数量值,该数量值代表同一时间能访问共享资源的线程数量。

  • 线程可以通过 acquire() 方法获取到一个许可,然后对共享资源进行操作。注意如果许可集已分配完了,那么线程将进入等待状态,直到其他线程释放许可才有机会再获取许可。
  • 当线程完成对共享资源的访问后,通过 release() 方法释放它持有的许可,“许可” 将被归还给许可集。

注意即使Semaphore初始化时没有许可证,通过多次调用release()方法可以增加许可证数量,允许线程动态获取执行权限。

假设有一个停车场只有3个停车位,现在有5辆车(5个线程)想要停车,我们可以用Semaphore来模拟这个停车场的停车过程。









 



 














public class Main {
    public static void main(String[] args) {
        //三个停车位
        Semaphore semaphore = new Semaphore(3);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire();  // 获取一个许可
                    System.out.println(Thread.currentThread().getName() + " 占据了停车位");
                    // 模拟车辆停留时间
                    Thread.sleep((long) (Math.random() * 20000));
                    semaphore.release(); // 释放一个许可
                    System.out.println(Thread.currentThread().getName() + " 离开了停车位");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        };
        //假设5个线程相当于5辆车
        for (int i = 1; i <= 5; i++) {
            Thread t = new Thread(runnable);
            t.start();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

Semaphore底层实现是基于AQS去做的。

Semaphore底层也是基于AQS的state属性做一个计数器的维护。state的值就代表当前共享资源的个数。如果一个线程需要获取的1或多个资源,直接查看state的标识的资源个数是否足够,如果足够的,直接对state - 1拿到当前资源。如果资源不够,当前线程就需要挂起等待。知道持有资源的线程释放资源后,会归还给Semaphore中的state属性,挂起的线程就可以被唤醒。

Semaphore也分为公平和非公平的概念。

使用场景:连接池对象就可以基础信号量去实现管理。在一些流量控制上,也可以采用信号量去实现。再比如去迪士尼或者是环球影城,每天接受的人流量是固定的,指定一个具体的人流量,可能接受10000人,每有一个人购票后,就对信号量进行--操作,如果信号量已经达到了0,或者是资源不足,此时就不能买票。

# 3.2 Semaphore应用

以环球影城每日人流量为例子去测试一下。

public static void main(String[] args) throws InterruptedException {
    // 初始化一个信号量,设置10个许可,模拟环球影城最多接纳10个人
    Semaphore semaphore = new Semaphore(10);

    // 模拟一家三口来到环球影城
    new Thread(() -> {
        System.out.println("一家三口来到环球影城~~");
        try {
            // 尝试获取3个许可,模拟一家三口占用3个名额
            semaphore.acquire(3);
            System.out.println("一家三口进去了~~~");
            Thread.sleep(10000); // 模拟游玩时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            System.out.println("一家三口走了~~~");
            // 释放3个许可,让其他游客能够进入
            semaphore.release(3);
        }
    }).start();

    // 模拟其他7个游客陆续到达环球影城
    for (int i = 0; i < 7; i++) {
        int j = i;
        new Thread(() -> {
            System.out.println(j + "大哥来了。");
            try {
                // 每个游客尝试获取1个许可
                semaphore.acquire();
                System.out.println(j + "大哥进去了~~~");
                Thread.sleep(10000); // 模拟游玩时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                System.out.println(j + "大哥走了~~~");
                // 释放1个许可
                semaphore.release();
            }
        }).start();
    }

    Thread.sleep(10);

    // 主线程模拟额外的游客尝试进入环球影城
    System.out.println("main大哥来了。");
    // 尝试获取1个许可,如果获取失败,则表示没有足够的许可
    if (semaphore.tryAcquire()) {
        System.out.println("main大哥进来了。");
    }else{
        System.out.println("资源不够,main大哥没能进来。"); // 注意这里的文本更正
    }
    Thread.sleep(10000); // 模拟等待时间

    System.out.println("main大哥又来了。");
    // 再次尝试获取1个许可
    if (semaphore.tryAcquire()) {
        System.out.println("main大哥进来了。");
        // 释放1个许可
        semaphore.release();
    }else{
        System.out.println("资源不够,main大哥没能进来。"); // 注意这里的文本更正
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

其实Semaphore整体就是对构建Semaphore时,指定的资源数的获取和释放操作

获取资源方式:

  • acquire():获取一个资源,没有资源就挂起等待,如果中断,直接抛异常
  • acquire(int):获取指定个数资源,资源不够,或者没有资源就挂起等待,如果中断,直接抛异常
  • tryAcquire():获取一个资源,没有资源返回false,有资源返回true
  • tryAcquire(int):获取指定个数资源,没有资源返回false,有资源返回true
  • tryAcquire(time,unit):获取一个资源,如果没有资源,等待time.unit,如果还没有,就返回false
  • tryAcquire(int,time,unit):获取指定个数资源,如果没有资源,等待time.unit,如果还没有,就返回false
  • acquireUninterruptibly():获取一个资源,没有资源就挂起等待,中断线程不结束,继续等
  • acquireUninterruptibly(int):获取指定个数资源,没有资源就挂起等待,中断线程不结束,继续等

归还资源方式:

  • release():归还一个资源
  • release(int):归还指定个数资源

# 3.3 Semaphore源码分析

先查看Semaphore的整体结构,然后基于获取资源,以及归还资源的方式去查看源码

# 3.3.1 Semaphore的整体结构

Semaphore内部有3个静态内类。

首先是向上抽取的Sync

其次还有两个Sync的子类NonFairSync以及FairSync两个静态内部类

Sync内部主要提供了一些公共的方法,并且将有参构造传入的资源个数,直接基于AQS提供的setState方法设置了state属性。

NonFairSync以及FairSync区别就是tryAcquireShared方法的实现是不一样。

# 3.3.2 Semaphore的非公平的获取资源

在构建Semaphore的时候,如果只设置资源个数,默认情况下是非公平。

如果在构建Semaphore,传入了资源个数以及一个boolean时,可以选择非公平还是公平。

public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
1
2
3

从非公平的acquire方法入手

首先确认默认获取资源数是1个,并且acquire是允许中断线程时,抛出异常的。获取资源的方式,就是直接用state - 需要的资源数,只要资源足够,就CAS的将state做修改。如果没有拿到锁资源,就基于共享锁的方式去将当前线程挂起在AQS双向链表中。如果基于doAcquireSharedInterruptibly拿锁成功,会做一个事情。会执行setHeadAndPropagate方法。一会说

// 信号量的获取资源方法(默认获取一个资源)
public void acquire() throws InterruptedException {
    // 跳转到了AQS中提供共享锁的方法
    sync.acquireSharedInterruptibly(1);
}

// AQS提供的
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    // 判断线程的中断标记位,如果已经中断,直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 先看非公平的tryAcquireShared实现。
    // tryAcquireShared:
    //     返回小于0,代表获取资源失败,需要排队。
    //     返回大于等于0,代表获取资源成功,直接执行业务代码
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

// 信号量的非公平获取资源方法
final int nonfairTryAcquireShared(int acquires) {
    // 死循环。
    for (;;) {
        // 获取state的数值,剩余的资源个数
        int available = getState();
        // 剩余的资源个数 - 需要的资源个数
        int remaining = available - acquires;
        // 如果-完后,资源个数小于0,直接返回这个负数
        if (remaining < 0 ||
            // 说明资源足够,基于CAS的方式,将state从原值,改为remaining
            compareAndSetState(available, remaining))
            return remaining;
    }
}
// 获取资源失败,资源不够,当前线程需要挂起等待
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 构建Node节点,线程和共享锁标记,并且到AQS双向链表中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 拿到上一个节点
            final Node p = node.predecessor();
            // 如果是head.next,就抢一手
            if (p == head) {
                // 再次基于非公平的方式去获取一次资源
                int r = tryAcquireShared(arg);
                // 到这,说明拿到了锁资源
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; 
                    failed = false;
                    return;
                }
            }
            // 如果上面没拿到,或者不是head的next节点,将前继节点的状态改为-1,并挂起当前线程
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                // 如果线程中断会抛出异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

acquire()以及acquire(int)的方式,都是执行acquireSharedInterruptibly方法去尝试获取资源,区别只在于是否传入了需要获取的资源个数。

tryAcquire()以及tryAcquire(int因为这两种方法是直接执行tryAcquire,只使用非公平的实现,只有非公平的情况下,才有可能在有线程排队的时候获取到资源

但是tryAcquire(int,time,unit)这种方法是正常走的AQS提供的acquire。因为这个tryAcquire可以排队一会,即便是公平锁也有可能拿到资源。这里的挂起和acquire挂起的区别仅仅是挂起的时间问题。

  • acquire是一直挂起直到线程中断,或者线程被唤醒。
  • tryAcquire(int,time,unit)是挂起一段时间,直到线程中断,要么线程被唤醒,要么阻塞时间到了

还有acquireUninterruptibly()以及acquireUninterruptibly(int)只是在挂起线程后,不会因为线程的中断而去抛出异常

# 3.3.3 Semaphore公平实现

公平与非公平只是差了一个方法的实现tryAcquireShared实现

这个方法的实现中,如果是公平实现,需要先查看AQS中排队的情况

// 信号量公平实现
protected int tryAcquireShared(int acquires) {
    // 死循环。
    for (;;) {
        // 公平实现在走下述逻辑前,先判断队列中排队的情况
        // 如果没有排队的节点,直接不走if逻辑
        // 如果有排队的节点,发现当前节点处在head.next位置,直接不走if逻辑
        if (hasQueuedPredecessors())
            return -1;

        // 下面这套逻辑和公平实现是一模一样的。
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 3.3.4 Semaphore释放资源

因为信号量从头到尾都是共享锁的实现……

释放资源操作,不区分公平和非公平

// 信号量释放资源的方法入口
public void release() {
    sync.releaseShared(1);
}

// 释放资源不分公平和非公平,都走AQS的releaseShared
public final boolean releaseShared(int arg) {
    // 优先查看tryReleaseShared,这个方法是信号量自行实现的。
    if (tryReleaseShared(arg)) {
        // 只要释放资源成功,执行doReleaseShared,唤醒AQS中排队的线程,去竞争Semaphore的资源
        doReleaseShared();
        return true;
    }
    return false;
}

// 信号量实现的释放资源方法
protected final boolean tryReleaseShared(int releases) {
    // 死循环
    for (;;) {
        // 拿到当前的state
        int current = getState();
        // 将state + 归还的资源个数,新的state要被设置为next
        int next = current + releases;
        // 如果归还后的资源个数,小于之前的资源数。
        // 避免出现归还资源后,导致next为负数,需要做健壮性判断
        if (next < current) 
            throw new Error("Maximum permit count exceeded");
        // CAS操作,保证原子性,只会有一个线程成功的就之前的state修改为next
        if (compareAndSetState(current, next))
            return true;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 3.4 AQS中PROPAGATE节点

为了更好的了解PROPAGATE节点状态的意义,优先从JDK1.5去分析一下释放资源以及排队后获取资源的后置操作

# 3.4.1 掌握JDK1.5-Semaphore执行流程图

首先查看4个线程获取信号量资源的情况

image.png

往下查看释放资源的过程会触发什么问题

首先t1释放资源,做了进一步处理

image.png

当线程3获取锁资源后,线程2再次释放资源,因为执行点问题,导致线程4无法被唤醒

# 3.4.2 分析JDK1.8的变化

image.png

====================================JDK1.5实现============================================.
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0) 
            unparkSuccessor(h);
        return true;
    }
    return false;
}


private void setHeadAndPropagate(Node node, int propagate) {
    setHead(node);
    if (propagate > 0 && node.waitStatus != 0) {
        Node s = node.next; 
        if (s == null || s.isShared())
            unparkSuccessor(node);
    }
}

====================================JDK1.8实现============================================.
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
private void doReleaseShared() {
    for (;;) {
        // 拿到head节点
        Node h = head;
        // 判断AQS中有排队的Node节点
        if (h != null && h != tail) {
            // 拿到head节点的状态
            int ws = h.waitStatus;
            // 状态为-1
            if (ws == Node.SIGNAL) {
                // 将head节点的状态从-1,改为0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;  
                // 唤醒后继节点
                unparkSuccessor(h);
            }
            // 发现head状态为0,将head状态从0改为-3,目的是为了往后面传播
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 没有并发的时候。head节点没变化,正常完成释放排队的线程
        if (h == head)  
            break;
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    // 拿到head
    Node h = head; 
    // 将线程3的Node设置为新的head
    setHead(node);
    // 如果propagate 大于0,代表还有剩余资源,直接唤醒后续节点,如果不满足,也需要继续往后判断看下是否需要传播
    // h == null:看成健壮性判断即可
    // 之前的head节点状态为负数,说明并发情况下,可能还有资源,需要继续向后唤醒Node
    // 如果当前新head节点的状态为负数,继续释放后续节点
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        // 唤醒当前节点的后继节点
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

# 四、线程间通信

线程间通信的模型有两种:共享内存和消息传递,以下方式都是基本这两种模型来实现的。我们来基本一道面试常见的题目来分析

场景一

两个线程,一个线程对当前数值加 1,另一个线程对当前数值减 1,要求用线程间通信

# 4.1 synchronized 方案

public class TestVolatile {

    public static void main(String[] args){
        DemoClass demoClass = new DemoClass();
        // 启动第一个线程执行加法操作
        new Thread(() ->{
            for (int i = 0; i < 5; i++) {
                demoClass.increment(); // 调用增加方法
            }
        }, "线程 A").start();
        // 启动第二个线程执行减法操作
        new Thread(() ->{
            for (int i = 0; i < 5; i++) {
                demoClass.decrement(); // 调用减少方法
            }
        }, "线程 B").start();
    }
}

class DemoClass{
    private int number = 0; // 定义一个变量作为加减对象

    /**
     * 加 1 方法
     */
    public synchronized void increment() {
        try {
            // 如果number不为0,则当前线程等待
            while (number != 0){
                this.wait();
            }
            number++; // 执行加一操作
            System.out.println("--------" + Thread.currentThread().getName() + "加一成功----------,值为:" + number);
            notifyAll(); // 唤醒所有等待线程
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    /**
     * 减一方法
     */
    public synchronized void decrement(){
        try {
            // 如果number为0,则当前线程等待
            while (number == 0){
                this.wait();
            }
            number--; // 执行减一操作
            System.out.println("--------" + Thread.currentThread().getName() + "减一成功----------,值为:" + number);
            notifyAll(); // 唤醒所有等待线程
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

# 4.2 Lock 方案

class DemoClass {
    private int number = 0; // 定义一个共享变量用于加减操作
    private Lock lock = new ReentrantLock(); // 创建锁对象
    private Condition condition = lock.newCondition(); // 获取条件变量,用于控制加减

    /**
     * 加 1 操作
     */
    public void increment() {
        try {
            lock.lock(); // 获取锁
            while (number != 0) {
                condition.await(); // 当number不为0时,当前线程等待
            }
            number++; // 执行加 1 操作
            System.out.println(Thread.currentThread().getName() + " 加一成功,值为: " + number); // 打印操作结果
            condition.signalAll(); // 唤醒所有等待的线程
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock(); // 释放锁
        }
    }

    /**
     * 减 1 操作
     */
    public void decrement() {
        try {
            lock.lock(); // 获取锁
            while (number == 0) {
                condition.await(); // 当number为0时,当前线程等待
            }
            number--; // 执行减 1 操作
            System.out.println(Thread.currentThread().getName() + " 减一成功,值为: " + number); // 打印操作结果
            condition.signalAll(); // 唤醒所有等待的线程
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock(); // 释放锁
        }
    }

    public static void main(String[] args) {
        DemoClass demoClass = new DemoClass();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                demoClass.increment(); // 线程A进行加 1 操作
            }
        }, "线程 A").start();

        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                demoClass.decrement(); // 线程B进行减 1 操作
            }
        }, "线程 B").start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

# 4.3 三个线程交替打印 ABC

实现三个线程交替打印ABC的方案有多种,这里介绍几种典型的实现方法:

# 1. 使用Object的wait和notifyAll方法

这种方法利用Object类的等待/通知机制。三个线程共享一个打印位置标记,根据标记判断是否轮到自己打印,并使用wait方法等待或notifyAll唤醒其他线程。

  • state变量:用来记录当前应该打印哪个字母。它通过余数的方式与每个字母的targetState关联,例如当state % 3 == 0时,表示应该打印A;state % 3 == 1时,表示应该打印B;state % 3 == 2时,表示应该打印C。

  • printLetter方法:这个方法是同步的(synchronized),确保了在任何时刻只有一个线程可以执行这个方法里的代码。它首先检查当前线程是否应该打印字母(通过state % 3与targetState比较)。如果不是,当前线程通过wait()方法等待。当state值与当前线程应该打印的字母对应时,线程打印字母,state值加1,并通过notifyAll()唤醒所有正在等待的线程。

  • 循环打印:每个线程都会尝试打印字母10次(由times变量控制)。打印一次后,线程会增加state值并唤醒其他线程,然后根据state值判断是否继续等待或是继续打印。

public class PrintABCUsingWaitNotify {
    private int state = 0; // 用来确定打印哪个字母
    private final int times = 10; // 打印次数

    // 打印指定字母的方法
    public synchronized void printLetter(String name, int targetState) {
        for (int i = 0; i < times;) {
            // 如果state不是此字母对应的状态,当前线程等待
            while (state % 3 != targetState) {
                try {
                    wait(); // 当前线程等待,直到其他线程调用notifyAll()
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 打印字母,增加state,唤醒所有等待的线程
            state++;
            i++; // 记录已打印次数
            System.out.print(name);
            notifyAll(); // 唤醒所有等待的线程
        }
    }

    public static void main(String[] args) {
        PrintABCUsingWaitNotify printABC = new PrintABCUsingWaitNotify();
        // 创建三个线程分别打印A,B,C
        new Thread(() -> printABC.printLetter("A", 0), "A").start();
        new Thread(() -> printABC.printLetter("B", 1), "B").start();
        new Thread(() -> printABC.printLetter("C", 2), "C").start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 2. 使用ReentrantLock和Condition

这种方法通过ReentrantLock提供的条件变量Condition实现,更加灵活地控制线程间的协调。

public class LockCondition {
    private final ReentrantLock lock = new ReentrantLock();
    // 使用一个条件变量
    private final Condition condition = lock.newCondition();
    // 状态变量来控制打印顺序
    private int state = 0;
    private final int times = 10; // 打印次数

    public void printLetter(String name, int targetState) {
        for (int i = 0; i < times;) {
            lock.lock();
            try {
                // 循环检查当前状态是否符合当前线程打印条件
                while (state % 3 != targetState) {
                    condition.await();
                }
                // 打印字母并更新状态和打印次数
                System.out.print(name);
                state++;
                i++;
                // 唤醒所有等待线程
                condition.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        LockCondition printABC = new LockCondition();
        // 创建并启动三个线程
        new Thread(() -> printABC.printLetter("A", 0), "A").start();
        new Thread(() -> printABC.printLetter("B", 1), "B").start();
        new Thread(() -> printABC.printLetter("C", 2), "C").start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 3. 使用Semaphore(信号量)

这种方法通过控制信号量的许可,来实现线程间的同步。

public class PrintABCUsingSemaphore {
    // 初始化信号量,semA允许一个许可,即首先允许打印A
    private final Semaphore semA = new Semaphore(1);
    // semB和semC初始化时不允许任何许可,这样就阻止了B和C的打印
    private final Semaphore semB = new Semaphore(0);
    private final Semaphore semC = new Semaphore(0);

    public void printA() {
        try {
            for (int i = 0; i < 10; i++) {
                semA.acquire(); // 获取A的打印许可
                System.out.print("A");
                semB.release(); // 释放B的打印许可
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void printB() {
        try {
            for (int i = 0; i < 10; i++) {
                semB.acquire(); // 获取B的打印许可
                System.out.print("B");
                semC.release(); // 释放C的打印许可
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void printC() {
        try {
            for (int i = 0; i < 10; i++) {
                semC.acquire(); // 获取C的打印许可
                System.out.print("C");
                semA.release(); // 释放A的打印许可,使得下一轮循环可以继续进行
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        PrintABCUsingSemaphore printABC = new PrintABCUsingSemaphore();
        // 分别启动三个线程负责打印A、B和C
        new Thread(printABC::printA).start();
        new Thread(printABC::printB).start();
        new Thread(printABC::printC).start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

# 4. 使用LockSupport

LockSupport提供了基于线程的park和unpark操作,可以精确地控制线程的挂起和唤醒。使用LockSupport可以精确地指定哪个线程被唤醒。

public class AlternatePrintABCDemo {

	private static Thread threadA, threadB, threadC;

	public static void main(String[] args) {
		threadA = new Thread(() -> printLetter("A", threadB), "Thread-A");
		threadB = new Thread(() -> printLetter("B", threadC), "Thread-B");
		threadC = new Thread(() -> printLetter("C", threadA), "Thread-C");

		threadA.start();
		threadB.start();
		threadC.start();

		// 初始唤醒线程A开始打印
		LockSupport.unpark(threadA);

	}

	private static void printLetter(String letter, Thread nextThread) {
		for (int i = 0; i < 3; i++) {
			try {
				// 当前线程阻塞,等待被唤醒
				LockSupport.park();
				System.out.println(letter);
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
			// 唤醒下一个线程
			LockSupport.unpark(nextThread);
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 4.4 线程间定制化通信

问题: A 线程打印 5 次 A,B 线程打印 10 次 B,C 线程打印 15 次 C,按照 此顺序循环 10 轮

public class DemoClass {
    // 控制打印顺序的标志
    private int state = 0;
    private Lock lock = new ReentrantLock();
    // 创建条件变量,分别控制打印A、B、C
    private Condition conditionA = lock.newCondition();
    private Condition conditionB = lock.newCondition();
    private Condition conditionC = lock.newCondition();

    // 打印A的方法
    public void printA(int round) {
        lock.lock();
        try {
            // 当不是A打印的轮次时,A等待
            while (state != 0) {
                conditionA.await();
            }
            System.out.println(Thread.currentThread().getName() + "输出A, 第" + round + "轮开始");
            // 打印5次A
            for (int i = 0; i < 5; i++) {
                System.out.println("A");
            }
            // 更改轮次,唤醒打印B的线程
            state = 1;
            conditionB.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    // 打印B的方法
    public void printB(int round) {
        lock.lock();
        try {
            // 当不是B打印的轮次时,B等待
            while (state != 1) {
                conditionB.await();
            }
            System.out.println(Thread.currentThread().getName() + "输出B, 第" + round + "轮开始");
            // 打印10次B
            for (int i = 0; i < 10; i++) {
                System.out.println("B");
            }
            // 更改轮次,唤醒打印C的线程
            state = 2;
            conditionC.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    // 打印C的方法
    public void printC(int round) {
        lock.lock();
        try {
            // 当不是C打印的轮次时,C等待
            while (state != 2) {
                conditionC.await();
            }
            System.out.println(Thread.currentThread().getName() + "输出C, 第" + round + "轮开始");
            // 打印15次C
            for (int i = 0; i < 15; i++) {
                System.out.println("C");
            }
            // 打印结束,开始下一轮,唤醒打印A的线程
            System.out.println("-----------------------------------------");
            state = 0;
            conditionA.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        DemoClass demoClass = new DemoClass();
        // 创建三个线程分别执行打印A、B、C的任务
        new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                demoClass.printA(i);
            }
        }, "线程A").start();

        new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                demoClass.printB(i);
            }
        }, "线程B").start();

        new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                demoClass.printC(i);
            }
        }, "线程C").start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101

以上便是本文的全部内容,本人才疏学浅,文章有什么错误的地方,欢迎大佬们批评指正!我是scholar,一个在互联网行业的小白,立志成为更好的自己。

如果你想了解更多关于scholar (opens new window) (opens new window),可以关注公众号-书生带你学编程,后面文章会首先同步至公众号。

公众号封面

编辑此页 (opens new window)
上次更新: 2024/12/28, 18:32:08
第八章 并发集合
第十章 异步编程

← 第八章 并发集合 第十章 异步编程→

Theme by Vdoing | Copyright © 2019-2025 程序员scholar
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式