JUC中对线程的协同合作控制

2023-05-16

线程的协同合作控制

    • CountDownLatch
      • 使用
      • 代码演示
      • 小结
    • Semaphore
      • 使用
      • 小结
    • Condition
      • 使用
      • 代码
    • CyclicBarrier
      • 使用:
      • 代码演示
    • 小结

在使用多线程的时候,我们可以使用一些工具来达到对资源的访问量控制线程之间的相互等待线程之间的通信唤醒

控制线程的合作并发,主要有四大工具;CountDownLatch,Semaphore,Condition以及CyclicBarrier。下面对这四个工具进行逐一介绍。

CountDownLatch

CountDownLatch主要是用于线程之间的等待协作。可以实现多等一,也可以实现一等多

例如,我们需要启动多个线程进行资源加载,等资源加载完成后主线程才能继续执行;这就可以用CountDownLatch的一等多的机制来完成;或者我们想自实现并发情况可以通过多等一的方式来使线程阻塞,等主线程经过一系列操作(休眠,或者准备其他资源)后,进行放行

使用

CountDownLatch在构造的时候需要传入一个参数,这个参数就是需要等待线程的个数,并且提供了一个阻塞的方法await()来让线程进行阻塞,只有当等待数减为0的时候,被阻塞的线程才能往下执行

而等待数可以通过CountDownLatch的countdown()来进行减一的操作。

代码演示

一等多代码示例:
可用于等待资源

public class CountDownLatchTest {

    // 构造一个倒数器
    static CountDownLatch countDownLatch = new CountDownLatch(2);
    public static void main(String[] args) throws InterruptedException {
        // 启动两个线程来加载资源
        new Thread(new TaskOne()).start();
        new Thread(new TaskTwo()).start();
        System.out.println("资源加载线程启动完成,Time:"+new SimpleDateFormat("hh:mm:ss").format(new Date()));
        countDownLatch.await();
        System.out.println("主线程执行完成,Time:"+new SimpleDateFormat("hh:mm:ss").format(new Date()));

    }
}

// 任务二
class TaskOne implements  Runnable{
    @Override
    public void run() {
        // 模拟处理资源要5秒
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("TaskOne Finish");
        // 完成工作,线程数减一
        CountDownLatchTest.countDownLatch.countDown();
    }
}

// 任务一
class TaskTwo implements  Runnable{
    @Override
    public void run() {
        // 模拟处理资源要10秒
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 完成工作,线程数减一
        CountDownLatchTest.countDownLatch.countDown();
        System.out.println("TaskTwo Finish");

    }
}

结果演示:
在这里插入图片描述
可以看到,主线程确实等待了子线程完成了对应的工作才能继续走下去。否则,会一直阻塞在当前状态下等待

多等一代码示例:
可用于模拟并发的场景

public class CountDownLatchMuch {
    // 构造一个倒数器
    static CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws InterruptedException {
        // 用newCachedThreadPool 开多个任务线程(这里可以先了解一下JDK提供的线程池)
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100 ; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("线程已到达 , Time:"+new SimpleDateFormat("hh:mm:ss").format(new Date()));
                        // 先让所有线程在这里阻塞
                        countDownLatch.await();
                        System.out.println("线程出发 , Time:"+new SimpleDateFormat("hh:mm:ss").format(new Date()));
                        //TODO 这里可以调用任意接口HttpClient来模拟高并发
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        // 休眠5秒,让所有线程都准备好
        Thread.sleep(5000);
        // 放行所有的线程
        countDownLatch.countDown();
    }
}

执行结果:
在这里插入图片描述

我们可以看到,子线程在我们休眠五秒后,主线程调用countdown()方法,然后同一时间被放行的

小结

CountDownLatch通过我们实现定义好的计数器,当我们达到某种条件后可让计数器-·;最终当计数器为0时,激活调用await()方法的线程进行下一步操作。

Semaphore

Semaphore可以简单的理解为许可证

只有拿到对应数量许可证的人才能通行,其他拿不到对应数量的人统统会被阻塞住;并且,还要等别人释放了许可证,自己才有机会拿到。

这就好比,有一些大的资源要限制更少的人拿去,有一些较小的资源允许多人拿去,并且这两个资源共用一种许可证。

使用

Semaphore提供了许多方法可以使用:
在这里插入图片描述
常用的

  • acquire():拿取一个许可证
  • acquire(int permits):拿取permits个许可证
  • release():释放一个许可证
  • release(int permits):释放permits个许可证
  • tryAcquire():尝试拿取一个许可证
  • tryAcquire(int permits):尝试拿取permits个许可证
  • tryAcquire(long timeout, TimeUnit unit):尝试在timeout unit 内拿取一个许可证;unit表示时间单位
  • tryAcquire(int permits, long timeout, TimeUnit unit):尝试在timeout unit 内拿取permits个许可证
  • 带有try的获取许可证的方法都是尝试去拿,会立马返回布尔值类型的结果;而不带try的则会阻塞等待别人释放,在去拿。

代码演示:

不释放许可证

public class SemaphoreOneTest {
    // 允许开放两个人去操作
    static Semaphore semaphore = new Semaphore(2);
    public static void main(String[] args) throws InterruptedException {
        new Thread(new SemaTaskOne()).start();
        new Thread(new SemaTaskTwo()).start();
        Thread.sleep(500);
        semaphore.acquire();
        System.out.println(Thread.currentThread().getName()+"拿到了许可证");
        System.out.println("Main Finish");
    }
}

class SemaTaskOne implements Runnable{

    @Override
    public void run() {
        try {
            SemaphoreOneTest.semaphore.acquire();
            System.out.println(Thread.currentThread().getName()+"拿到了许可证");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

class SemaTaskTwo implements Runnable{

    @Override
    public void run() {
        try {
            SemaphoreOneTest.semaphore.acquire();
            System.out.println(Thread.currentThread().getName()+"拿到了许可证");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在这里插入图片描述
我们看到如果不释放许可证的话,即使子线程执行完,主线程仍然拿不到许可证;如果主线程有机会先拿到许可证,即使主线程执行完了,程序仍然处于运行状态。因为拿到许可证的线程都没有释放,仍有子线程处于阻塞状态。

增加释放:

class SemaTaskOne implements Runnable{
    @Override
    public void run() {
        try {
            SemaphoreOneTest.semaphore.acquire();
            System.out.println(Thread.currentThread().getName()+"拿到了许可证");
            Thread.sleep(5000);
            
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
       		SemaphoreOneTest.semaphore.release();
        }
    }
}

信号的释放必须放在finally块中,否则程序发生异常不会释放许可证

正确释放许可证结果:
在这里插入图片描述

错误释放许可证结果:
在这里插入图片描述

带有参数的许可证演示:

public class SemaphoreOneTest {
    // 允许开放两个人去操作
    static Semaphore semaphore = new Semaphore(2);
    public static void main(String[] args) throws InterruptedException {
        new Thread(new SemaTaskOne()).start();
        new Thread(new SemaTaskTwo()).start();
        System.out.println(Thread.currentThread().getName()+"等待中,Time:"+new SimpleDateFormat("hh:mm:ss").format(new Date()));
        semaphore.acquire(2);
        System.out.println(Thread.currentThread().getName()+"拿到了许可证"+new SimpleDateFormat("hh:mm:ss").format(new Date()));
        Thread.sleep(4000);
        semaphore.release(2);
        System.out.println("Main Finish");
    }
}

class SemaTaskOne implements Runnable{
    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName()+"等待中,Time:"+new SimpleDateFormat("hh:mm:ss").format(new Date()));
            SemaphoreOneTest.semaphore.acquire(2);
            System.out.println(Thread.currentThread().getName()+"拿到了许可证"+new SimpleDateFormat("hh:mm:ss").format(new Date()));
            Thread.sleep(4000);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            SemaphoreOneTest.semaphore.release(2);
        }
    }
}

class SemaTaskTwo implements Runnable{

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName()+"等待中,Time:"+new SimpleDateFormat("hh:mm:ss").format(new Date()));
            SemaphoreOneTest.semaphore.acquire(2);
            System.out.println(Thread.currentThread().getName()+"拿到了许可证"+new SimpleDateFormat("hh:mm:ss").format(new Date()));
            Thread.sleep(4000);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            SemaphoreOneTest.semaphore.release(2);
        }
    }
}

演示结果:
在这里插入图片描述

小结

非阻塞的尝试拿取就不做演示了,大体上差不多。只不过不会阻塞,也可以设置超时时间的拿,如果在这段时间拿不到的话线程就继续往下执行

Semaphore在使用的过程中,一定要保证许可证能正确的释放,否则会出现死锁的问题导致程序无法运行下去。

Condition

Condition本质上是一个接口,所以只能实例化其子类或者根据需求重写对应的方法

使用

其实跟Object的wait和notify方法类似,Condition提供了await()signal()。在await()中提供了可设置超时时间的重载方法。

代码

下面用Condition来演示一个生产者和消费者模式


public class ConditionTest {

    static ReentrantLock reentrantLock = new ReentrantLock();
    // 生产者锁
    static Condition conditionProvier =  reentrantLock.newCondition();
    // 消费者锁
    static Condition conditionConsumer =  reentrantLock.newCondition();
    static Queue arrayList = new ArrayBlockingQueue(10);

    public static void main(String[] args) {

        new Thread(new ConditionTask()).start();
        new Thread(new ConditionTaskTwo()).start();
    }
}

// 消费者
class ConditionTask implements Runnable{
    @Override
    public void run() {
        while(true) {
            try {
                ConditionTest.reentrantLock.lock();
                if (ConditionTest.arrayList.size() <= 0) {
                    System.out.println("无了,先不拿");
                    ConditionTest.conditionConsumer.await();
                }
                int poll = (int) ConditionTest.arrayList.poll();
                System.out.println("弹出元素," + poll);
                ConditionTest.conditionProvier.signalAll();
            } catch (Exception e) {
            } finally {
                ConditionTest.reentrantLock.unlock();
            }
        }
    }
}

// 生产者
class ConditionTaskTwo implements Runnable{
    int i = 0;
    @Override
    public void run() {
        while(true) {
            try {
                ConditionTest.reentrantLock.lock();
                //Thread.sleep((long) (Math.random() * 10000));
                if (ConditionTest.arrayList.size() >= 10) {
                    System.out.println("满了,暂停一下");
                    ConditionTest.conditionProvier.await();
                }
                System.out.println("插入元素");
                ConditionTest.arrayList.offer(i++);
                ConditionTest.conditionConsumer.signalAll();
            } catch (Exception e) {
            } finally {
                ConditionTest.reentrantLock.unlock();
            }
        }
    }
}

结果演示:
在这里插入图片描述

CyclicBarrier

CyclicBarrier比较容易理解,凑够了一拨人就出发

使用:

  • 只需要要在构造参数中传入要凑够多少人(线程),具体线程调用await()方法,如果凑够了指定数的线程就一起出发。
  • 也可以传入凑够的人数和执行线程;这个执行线程是凑够人数后,await()线程会继续往下执行,当前线程开启执行线程,相当于多一个线程用于提醒或执行其他操作
  • 后面凑不够指定的也会全部出发了

代码演示

无执行线程

public class CyclicBarrierOneTask {

    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
		// 一次凑够5个人出发
        CyclicBarrier cyclicBarrierWithOutRunnable = new CyclicBarrier(5);
		// 开10个线程
        for (int i = 0; i < 10; i++) {
            new Thread(new CyclicBarrierSmailTask(cyclicBarrierWithOutRunnable)).start();
        }
    }
}

class CyclicBarrierSmailTask implements Runnable{
    private  CyclicBarrier cyclicBarrier;

    public CyclicBarrierSmailTask(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        try {
            // 让线程随机休眠,显示出先来后到的凑够一拨人
            long time = (long) (Math.random() * 10000);
            Thread.sleep(time);
            System.out.println(Thread.currentThread().getName()+"等待中");
            cyclicBarrier.await();
            System.out.println(Thread.currentThread().getName()+"出发了");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

在这里插入图片描述

增加执行线程

public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5,new Runnable(){
            @Override
            public void run() {
                System.out.println("凑够了一拨人,出发");
            }
        });
        for (int i = 0; i < 10; i++) {
            new Thread(new CyclicBarrierSmailTask(cyclicBarrier)).start();
        }
    }

结果演示:
在这里插入图片描述

当调用await()的线程数达到指定数时,就会出发执行线程。并且之前调用await()的线程会一起运行,不在阻塞。

小结

合理运用好线程控制工具可以很好的帮助我们使用线程的合作,如果需要搞懂内部原理的话需要对AQS进一步掌握。

因为里面的控制数就是通过AQS中的state类变量来计数。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

JUC中对线程的协同合作控制 的相关文章

  • 11-JUC中的Condition对象

    文章目录 ConditionCondition常用方法总结参考 Condition 任何一个java对象都天然继承于Object类 xff0c 在线程间实现通信的往往会应用到Object的几个方法 xff0c 比如wait wait lon
  • JUC中对线程的协同合作控制

    线程的协同合作控制 CountDownLatch使用代码演示小结 Semaphore使用小结 Condition使用代码 CyclicBarrier使用 xff1a 代码演示 小结 在使用多线程的时候 xff0c 我们可以使用一些工具来达到
  • JUC快速学习笔记

    JUC快速学习笔记 狂神说JUC 个人学习笔记 介绍 JUC是指javaUtil包中的三个操作线程的包 并发操作 不加锁 方法 属性 方法 private int number 50 买票的方式 public void norSale if
  • JUC源码分析2-原子变量-AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray

    JUC针对数组元素的原子封装 先看AtomicIntegerArray private static final Unsafe unsafe Unsafe getUnsafe arrayBaseOffset获取数组首个元素地址偏移 priv
  • 【源码】走一遍源码弄清ArrayList容器的扩容机制

    源码 走一遍源码弄清ArrayList容器的扩容机制 首先我们来看看ArraysList容器在整个Java集合框架中所处的位置 由此可见ArrayList是Java集合框架中 两大派系中Collection接口的子接口List的实现类 我们
  • 并发容器(一):普通容器&&同步容器&&并发容器

    前言 之前我们学习过了集合 并发编程 现在我们来学习并发容器 在并发编程中 经常听到Java集合类 同步容器 并发容器 那么他们之间有哪些分类 优劣呢 我们先把这个框架给分清楚了 这样后面学习的时候不会乱 集合容器 大家熟知的集合类Arra
  • ReentrantLock 源码解析

    前言 注 本文的源码来自 JDK11 ReentrantLock 是 Java 中的一个可重入锁 它可以用于替代传统的 synchronized 关键字来进行线程同步 下面是与 synchronized 关键字的一些对比 名称 实现 重入性
  • 详解ThreadLocal

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 1 ThreadLocal介绍 1 1 官方介绍 1 2 基本用法 1 2 1 常用方法 1 2 2 使用案例 1 3 ThreadLocal与synchroniz
  • ThreadLocal从变量副本的角度解决多线程并发安全问题

    ThreadLocal从变量副本的角度解决多线程并发安全问题 之前我们讲的高并发场景下的线程安全问题 可以使用Synchronized同步关键字 Lock手动加锁的方式去解决 什么轻量级锁 偏向锁 重量级锁 可重入锁等等 实际上本质都是控制
  • AQS相关工实现类的使用及其原理

    文章目录 1 AQS 1 1 概述 1 2 自定义不可重入锁 2 ReentrantLock 2 1 非公平锁 2 1 1 加锁解锁流程 2 1 1 1 加锁失败 2 1 1 2 解锁竞争 2 2 可重入原理 2 3 可打断原理 2 3 1
  • juc并发包整理

    目录 JUC提供了java并发编程需要的类 主要分几个大模块 1 原子类操作 2 锁 3 阻塞队列 4 并发集合 5 同步器 6 线程池 7组合式异步编程 JUC的作者Doug Lea神一样的人物 其中以上很多类的实现底层实现都是基于AQS
  • Java线程池源码解析及使用

    1 线程池的用处 Java 引入 Excutor 框架将任务的提交和执行进行解耦 只需要定义好任务 然后提交给线程池即可 使用线程池的时机 单个任务处理时间比较短 需要处理的任务数量很大 线程池的优点 降低资源消耗 通过重复利用已创建的线程
  • JUC并发编程之AQS原理

    1 AQS 原理 1 1 概述 全称是 AbstractQueuedSynchronizer 是阻塞式锁和相关的同步器工具的框架 特点 用 state 属性来表示资源的状态 分独占模式和共享模式 子类需要定义如何维护这个生态 控制如何获取锁
  • Java JUC概述

    Java JUC Java Util Concurrent 是 Java 平台提供的并发编程工具包 它提供了一系列的工具类和接口 用于简化多线程编程 JUC 中的类和接口都是基于 Java 平台的底层并发原语 如锁 信号量 原子变量等 实现
  • 黑马并发编程(AQS源码分析、线程池)

    AQS源码分析 线程池 8 线程池 1 自定义线程池 阻塞队列 优化队列 线程池 执行和线程处理设计 线程池执行的整个思路 阻塞添加 拒绝策略 2 ThreadExecutor 线程池状态 线程池参数 拒绝策略 newFixedThread
  • JUC(2): 阻塞队列+线程池(重点)+新时代程序员必会

    一 阻塞队列 ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列 LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列 PriorityBlockingQueue 一个支持优先级排序的无界阻塞队列 D
  • wait notify正确使用方式

    wait notify正确使用方式 假设 当线程 Thread03 在1 100中找出77并输出后 Thread01 输出所有1 100中的奇数 当线程 Thread03 在1 100中找出88并输出后 Thread02 输出所有1 100
  • 线程安全分析

    1 成员变量和静态变量是否线程安全 如果它们没有被共享 则线程安全 如果它们被共享了 根据它们的状态是否能够改变 又分两种情况 如果只有读操作 则线程安全 如果有读写操作 则这段代码是临界区 需要考虑线程安全 2 局部变量是否线程安全 局部
  • 第5节 实现Callable 接口

    Java 5 0 在java util concurrent 提供了一个新的创建执行 线程的方式 Callable 接口 Callable 接口类似于Runnable 两者都是为那些其实例可能被另一个线程执行的类设计的 但是 Runnabl
  • 万文详解JUC(超详细)

    生命无罪 健康万岁 我是laity 我曾七次鄙视自己的灵魂 第一次 当它本可进取时 却故作谦卑 第二次 当它在空虚时 用爱欲来填充 第三次 在困难和容易之间 它选择了容易 第四次 它犯了错 却借由别人也会犯错来宽慰自己 第五次 它自由软弱

随机推荐