Java并发工具之CyclicBarrier

2023-10-31

一、简介

摘自《Java并发编程的艺术》一书中

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行

CyclicBarrier,一个同步辅助类,在API中是这么介绍的: 它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。 通俗点讲就是:让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活

举个栗子,想必很多小伙伴都会玩英雄联盟或者农药手游,大家在选完英雄的时候会需要进行等待加载,等到10位玩家加载准备完成之后才能正式开始游戏。这个10位玩家可以理解为10个线程,在加载过程中,10个线程互相等待,直到最后一位玩家加载完成,即所有线程都达到某一个屏障,此时被等待的线程才能继续执行,即大家才能开始happy起来。

二、类总览

1. 类的继承关系

CyclicBarrier没有显示继承哪个父类或者实现哪个父接口

public class CyclicBarrier {}

CyclicBarrier中有个内部类Generation,定义如下

private static class Generation {
    //表示当前屏障是否被损坏,默认为false
    boolean broken = false;
}

这个内部类从字面意思理解是“”的意思,为什么要有这个内部类呢?我们知道CyclicBarrier可重复使用的,每次重复使用都会新建一个Generation,它的broken属性默认为false。举个栗子,很多人都玩过过山车吧。假设一个过山车是20个座位,工作人员一般会等到栏杆外排队够20人才会打开栏杆让这20人通过;然后就会将栏杆重新关闭,后面新来的继续等待。这里的前面已经通过的人就是一“代”,后面再继续等待的20人就是另外一“代”。每次栏杆打开关闭一次,就会产生新的一“代”CyclicBarrier,开启新的一代使用的是nextGeneration方法,定义如下

private void nextGeneration() {
    // 唤醒当前这一代中所有等待在条件队列里的线程
    trip.signalAll();
    // 重置count值
    count = parties;
    //新建Generation,开启新的一代
    generation = new Generation();
}

该方法用于开启新的一代,通常是被最后一个调用await方法的线程调用。在该方法中,我们的主要工作就是唤醒当前这一代中所有等待在条件队列里的线程,将count的值恢复为parties,以及开启新的一代

2. 核心属性

//可重入独占锁
private final ReentrantLock lock = new ReentrantLock();
//Condition实例,表示条件队列
private final Condition trip = lock.newCondition();
//参与线程的总数
private final int parties;
//代表了一个任务,表示当barrier开启时就会执行这个对象的run方法
//这个参数不是必须的,如果不需要执行,可以为null
private final Runnable barrierCommand;
//generation实例,表示当前代
private Generation generation = new Generation();
//还需要等待的线程数,初始值为parties
private int count;

属性包括ReentrantLockCondition,即CyclicBarrier基于独占锁ReentrantLock和条件队列实现的所有相互等待的线程都会在同样的条件队列trip上挂起,被唤醒后将会被添加到sync queue中去争取独占锁lock,获得锁的线程将继续往下执行

3. 构造方法

CyclicBarrier两个构造函数

  • 指定parties参数

    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    

    调用第二个构造方法,传递barrierAction参数为null

  • 指定partiesbarrierAction参数

    public CyclicBarrier(int parties, Runnable barrierAction) {
        //parties即参与的线程数必须大于0
        if (parties <= 0) throw new IllegalArgumentException();
        //初始化parties、count和barrierCommand属性
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    

三、使用案例

假设有一家公司要全体员工进行团建活动,活动内容为翻越三个障碍物,参与活动的一共有五名员工,要求所有人在翻越当前障碍物之后再开始翻越下一个障碍物,代码如下(混个脸熟,先学会使用,原理后面讲解

public static void main(String[] args) {
    //参与的线程数
    int threadNum = 5;
    //创建cyclicBarrier实例,定义barrierAction
    CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum, () -> System.out.println(
        "所有员工通过当前屏障,继续前进!"));
    //创建线程开始执行
    for (int i = 1; i <= threadNum; i++) {
        new Thread(() -> {
            for (int j = 1; j <= 3; j++) {
                try {
                    Random rand = new Random();
                    //产生1000到3000之间的随机整数,模拟跨越障碍的耗时
                    int randomNum = rand.nextInt((3000 - 1000) + 1) + 1000;
                    Thread.sleep(randomNum);
                    System.out.println(Thread.currentThread().getName() + ", 通过了第" + j +
                                       "个障碍物, " +
                                       "使用了 " + ((double) randomNum / 1000) + "s");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }, i + "号员工").start();
    }
}

执行结果如下

1号员工, 通过了第1个障碍物, 使用了 1.046s
3号员工, 通过了第1个障碍物, 使用了 1.276s
2号员工, 通过了第1个障碍物, 使用了 2.298s
4号员工, 通过了第1个障碍物, 使用了 2.394s
5号员工, 通过了第1个障碍物, 使用了 2.818s
所有员工通过当前屏障,继续前进!
4号员工, 通过了第2个障碍物, 使用了 1.021s
2号员工, 通过了第2个障碍物, 使用了 2.014s
5号员工, 通过了第2个障碍物, 使用了 2.335s
3号员工, 通过了第2个障碍物, 使用了 2.557s
1号员工, 通过了第2个障碍物, 使用了 2.573s
所有员工通过当前屏障,继续前进!
4号员工, 通过了第3个障碍物, 使用了 1.46s
5号员工, 通过了第3个障碍物, 使用了 2.098s
3号员工, 通过了第3个障碍物, 使用了 2.66s
2号员工, 通过了第3个障碍物, 使用了 2.796s
1号员工, 通过了第3个障碍物, 使用了 2.896s
所有员工通过当前屏障,继续前进!

这里每个员工相当于每个参与的线程,每个线程执行完当前任务时会调用await方法,该方法的作用是如果存在没有到达Barrier的线程就会自我阻塞;如果不存在则会唤醒所有阻塞的线程,同时执行barrierAction的run方法。同时我们看到CyclicBarrier可以重复使用,印证了它的循环屏障的含义。

四、核心方法

根据上面的分析我们知道CyclicBarrier的使用只需要它的一个await方法即可完成它强大的功能,下面我们来分析一下它的await方法。

1. 辅助方法

在介绍await方法前,我们先来了解一下一些辅助方法

1.1 breakBarrier方法

方法定义如下

private void breakBarrier() {
    //设置broken状态为true,即表示屏障被打破
    generation.broken = true;
    //重置count值
    count = parties;
    //唤醒当前这一代中所有等待在条件队列里的线程(因为栅栏已经打破了)
    trip.signalAll();
}

继续拿上面的过山车的例子来说明,假如当天是工作日,游客较少,有时候很难凑够20人,为了避免当前等待的游客着急,这个时候工作人员也会打开栏杆(此时人数不够20人)让当前等待的游客通过。这个工作人员的行为就相当于调用方法breakBarrier,因为此时不是在凑够20人的情况下打开屏障,所以我们把这一代的broken状态设置为true,表示屏障被打破

1.2 reset方法

reset方法用于将barrier恢复成初始的状态,它的内部就是简单地调用了breakBarrier方法和nextGeneration方法

public void reset() {
    final ReentrantLock lock = this.lock;
    //需要先获取锁
    lock.lock();
    try {
        //打破当前屏障
        breakBarrier();
        //开启新的一代
        nextGeneration();
    } finally {
        lock.unlock();
    }
}

注意:如果在我们执行该方法时有线程正等待在barrier上,则它将立即返回并抛出BrokenBarrierException异常

2. await

先看方法定义

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

await方法内部调用dowait方法

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
TimeoutException {
    final ReentrantLock lock = this.lock;
    //获取重入锁
    //所有执行await方法的线程必须是已经持有了锁,所以这里必须先获取锁
    lock.lock();
    try {
        final Generation g = generation;
        //前面介绍到调用breakBarrier方法会将当前代的broken属性设置为true,表示当前屏障被打破了
		//如果发现当前的barrier已经被打破了,则直接抛出异常
        if (g.broken)
            throw new BrokenBarrierException();
        
		//如果当前线程被中断了,则需要将屏障打破,再抛出中断异常
        //这里这么做的原因是:由于在barrier上的线程是互相等待的,如果其中一个被中断了,那么其他的就不用再等待了
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
		
        //当前线程到达屏障出,将等待的线程数减一
        int index = --count;
        //如果等待的线程数为0,表示所有的线程都到齐了,则可以唤醒所有等待的线程,同时重置屏障
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                //如果有设置barrierCommand属性,则会调用它的run方法
                if (command != null)
                    command.run();
                ranAction = true;
                //唤醒所有线程,开启新的一代
                nextGeneration();
                return 0;
            } finally {
                //这里是防止barrierCommand的run方法执行出了异常,导致无法唤醒其余等待的线程,这里做一下兜底,直接打破屏障
                if (!ranAction)
                    breakBarrier();
            }
        }

        //能执行到这说明此时等待的线程数还不为0,需要将线程挂起
        for (;;) {
            try {
                //如果没有设置超时机制,直接调用Condition的await方法
                if (!timed)
                    trip.await();
                //否则,则等待指定的时间
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } 
            //如果在等待的过程中线程被中断了,执行下面代码
            catch (InterruptedException ie) {
                //如果线程处于当前这一“代”,并且当前这一代还没有被broken,则先打破栅栏
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    //重新抛出异常
                    throw ie;
                } 
                //否则无需处理,直接恢复中断即可
                // 注意来到这里有两种情况
                // 一种是g!=generation,说明新的一代已经产生了,所以我们没有必要处理这个中断,只要再自我中断一下就好,交给后续的人处理
               // 一种是g.broken = true, 说明中断前栅栏已经被打破了,既然中断发生时栅栏已经被打破了,也没有必要再处理这个中断了
                else {
                    //自我中断
                    Thread.currentThread().interrupt();
                }
            }

            //能够执行到此处说明线程被唤醒了
            //这里检测一下broken状态是否为true,如果是抛出异常
            //能使broken状态变为true的,只有调用breakBarrier()方法
            if (g.broken)
                //BrokenBarrierException异常一般表示某个线程在等待某个处于“打破”状态的barrier
                throw new BrokenBarrierException();

            //如果线程被唤醒时,新一代已经被开启了,说明一切正常,直接返回
            if (g != generation)
                return index;
			//如果是超时等待且已经超时,则打破屏障,抛出超时异常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

正常情况下的await的逻辑很简单,就是线程间互相等待,知道所有线程都到达屏障后,屏障打开,各线程继续执行。但是await方法的难点在于屏障被打破的情况下的处理。我们知道如果在参与者(线程)在调用await方法的过程中,barrier被破坏,就会抛出BrokenBarrierException异常。在await方法中,抛出该异常的代码只有一种方式

//该判断存在await方法两处
//1)第一处是刚开始获取锁之后的判断,即await方法的最前端处,也可以理解为到达屏障前的位置
//2)第二次是唤醒之后的判断
if (g.broken)
    throw new BrokenBarrierException();

即当generation实例中的broken标识为true时,才会抛出异常。但是generation实例中的broken标识默认为false,只有当调用breakBarrier方法才会修改标识为true,因此得出结论:当前线程如果刚开始执行await方法或者唤醒之后发现自己等待的屏障已经被打破了,会直接抛出BrokenBarrierException异常。

下面我们来分析一下breakBarrier方法调用的几种情况。

  • 第一种情况:当前线程达到屏障前发现自己被中断了

    这种情况下,意味着后续的线程以及等待的线程再也不可能达到屏障开启的条件了,所以当前线程会主动打破屏障,唤醒等待的线程,避免等待的线程一直等待

  • 第二种情况:最后一个到达的线程在执行barrierCommand的run方法时发生了错误

    该情况下,首先所有的线程肯定都已经就位了,只是在执行用户自定义的屏障处的执行方法时报错,为了避免报错导致所有等待的线程没有人去唤醒,会主动去唤醒。

  • 第三种情况:线程在调用Condition.await方法的时候发现自己被中断了,会抛出中断异常,此时如果当前代没有更新为下一代,且当前代没有被打破

    会调用breakBarrier方法主动打破屏障

  • 第四种情况:reset方法被调用

    reset方法的作用是重置CyclicBarrier,类似清除历史重新来,这个方法JDK内部不会调用,可能是用户代码调用

上面前三张种情况我们在dowait方法中标注一下

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        //1)第一种情况:当前线程达到屏障前发现自己被中断了
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
		
        //到达屏障的位置
        int index = --count;
        if (index == 0) {
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    //第二种情况:最后一个到达的线程在执行barrierCommand的run方法时发生了错误
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    //第三种情况:挂起的过程中发生了中断
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

总结一下:CyclicBarrier使用了“all-or-none breakage model”,所有互相等待的线程,要么一起通过barrier,要么一个都不要通过,如果有一个线程因为中断,失败或者超时而过早的离开了barrier,则该barrier会被broken掉,所有等待在该barrier上的线程都会抛出BrokenBarrierException(或者InterruptedException

五、CyclicBarrier与CountdownLatch的区别

CyclicBarrier的功能与CountDownLatch类似,它可以使得一组线程之间相互等待,直到所有的线程都到齐了之后再继续往下执行。但是二者还是存在些许区别。

  • CyclicBarrier基于条件队列和独占锁来实现;而CountDownLatch基于共享锁实现
  • CyclicBarrier可以重复使用,当所有线程就位完成时,会开启下一代;而CountDownLatch一次性的,不可重复使用
  • CyclicBarrier加计数CountDownLatch减计数
  • CyclicBarrier操作计数和阻塞的是同一个线程,调用方法只有一个await方法;而CountDownLatch操作计数和阻塞等待是两个线程,控制计数调用方法countDown,且不会被阻塞挂起,阻塞等待调用方法await方法,会根据计数值选择是否阻塞等待。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java并发工具之CyclicBarrier 的相关文章

随机推荐

  • Qt 获取顶级窗口(top-level window)

    在Qt中 可以使用 QWidget window 函数来获取窗口的顶级窗口 top level window 顶级窗口是指没有父窗口的窗口 通常是应用程序的主窗口或独立的对话框窗口 include
  • The type org.springframework.dao.DataAccessException cannot be resolved. It is indirectly referenced

    今天使用Spring Cloud Mybatis Plus3 x 搭建微服务项目时 提示如下错误信息 The type org springframework dao DataAccessException cannot be resolv
  • vue-cli3搭建多入口应用项目搭建以及webpack配置

    我们平时开发 vue项目的时候 就有一种感觉就是 vue就像是专门为单页应用而诞生的 因为人家的官方文档中也说了 其实不是的 因为vue在工程化开发的时候依赖于 webpack 而webpack是将所有的资源整合到一块后形成一个html文件
  • Python基础(三)_函数和代码复用

    三 函数和代码复用 一 函数的基本使用 1 函数的定义 函数是一段具有特定功能的 可重用的语句组 用函数名来表示并通过函数名进行功能调用 函数也可以看作是一段具有名字的子程序 可以在需要的地方调用执行 不需要在每个执行的地方重复编写这些语句
  • HJ32 密码截取(java详解)(动态规划)

    hello world 你好 世界 想要了解这题的动态规划 提议先了解这题的 中心扩散法 解题思路 最长回文子串的中心扩散法 遍历每个字符作为中间位 进行左右比较 算法流程 从右到左 对每个字符进行遍历处理 并且每个字符要处理两次 因为回文
  • 二手打印机如何挑选?

    打印机作为生产力工具 最重要的是 稳定性 可靠性 以及使用成本 常用的打印机分为三种 分别是 激光打印机 喷墨打印机 针式打印机 不管你是去网店还是实体店铺购买打印机 首先你要了解自己的需求 打印机作为商品 没有好与不好 只有适不适合你 一
  • Python编程基础题(20-宇宙无敌加法器)

    Description Input 输入首先在第一行给出一个 N 位的进制表 0 lt N 20 以回车结束 随后两行 每行给出一个不超过 N 位的非负的 PAT 数 Output 在一行中输出两个 PAT 数之和 Sample Input
  • 点位运动

    梯形速度规划是最简单的速度规划方法 加速度是常数 规划过程中只需要控制速度和位移与时间的关系 如图所示 整个过程分为 加速段 匀速段 减速段 每一个轴在规划静止时都可以设置为点位运动 在点位运动模式下 各轴可以独立设置目标位置 目标速度 加
  • linux/windows下查看目标文件.a/.lib的函数符号名称

    1 linux下 objdump t 查看对象文件所有的符号列表 例如 objdump t libtest o 2 nm列出目标文件 o 的符号清单 例如 nm s filename a filename o a out 3 列出所有定义的
  • jq中快速返回祖先元素

    div class one div class two div class three div class focus 我是这个div div div div div
  • 解决页面favicon.ico文件不存在提示404问题

    所谓favicon 即Favorites Icon的缩写 顾名思义 便是其可以让浏览器的收藏夹中除显示相应的标题外 还以图标的方式区别不同的网站 当然 这不是Favicon的全部 根据浏览器的不同 Favicon显示也有所区别 在大多数主流
  • 逗号和分号

    上面的程序
  • 将python代码打包成可执行文件

    文章目录 打包工具 使用 pyinstaller 安装pyinstaller库 打包 Python是一种高级编程语言 它具有易学易用 跨平台等优点 因此在开发中得到了广泛的应用 然而 Python代码需要在Python解释器中运行 这对于一
  • UML类图几种关系的总结

    UML类图几种关系的总结 转载链接 http blog csdn net sunboy 2050 article details 9211457 UML类图 描述对象和类之间相互关系的方式包括 依赖 Dependency 关联 Associ
  • mac生成树形结构

    第一步 安装tree brew install tree 第二步 在要展示树结构的文件里面打开终端 运行命令 tree d 只显示文件夹 tree L n 显示项目的层级 n表示层级数 比如想要显示项目三层结构 可以用tree l 3 tr
  • firefox安装selenium插件

    1 目前新版类似Firefox58不兼容 打开 https addons mozilla org en US firefox addon selenium ide 网址 显示add to firefox为灰色 下载Firefox48即可 2
  • R:RStudio和RStudio Server

    RStudio是R语言开发中的利器 是最好用的R语言IDE集成环境 RStudio Server更是利器中的神器 不仅提供了Web的功能 可以安装到远程服务器上 通过Web进行访问 还支持多用户的协作开发 RStudio 是一个强大的 免费
  • IDEA——手把手教你mybatis的使用(新手教程)

    说到Mybatis 很多人不知道这是用来干什么的 简单来说就是用来优化JDBC的使用 我们可以理解为一个这样的流程 数据库 gt JDBC gt MyBatis gt Java 今天来教一下简单的mybatis使用方法 对于初学者很友好 目
  • C++基础(11)类模板

    1 类模板 类模板和函数模板的定义和使用类似 我们已经进行了介绍 有时 有两个或多个类 其功能是相同的 仅仅是数据类型不同 类模板用于实现类所需数据的类型参数化 include
  • Java并发工具之CyclicBarrier

    一 简介 摘自 Java并发编程的艺术 一书中 CyclicBarrier的字面意思是可循环使用 Cyclic 的屏障 Barrier 它要做的事情是 让一组线程到达一个屏障 也可以叫同步点 时被阻塞 直到最后一个线程到达屏障时 屏障才会开