多处理器编程的艺术(二)-并行程序设计

2023-11-20

当处理器的性能的发展受到各方面因素的限制的时候,计算机产业开始用多处理器结构实现并行计算来提高计算的效率。我们使用多处理器共享存储器的方式实现了多处理器编程,也就是多核编程。当然在这样的系统结构下我们面临着各种各样的挑战,例如如何协调各个处理器之间的数据调度以及现代计算机系统固有的异步特征等等。

在接下来的一系列文章中,我将会介绍一些基础的原理以及并行程序的设计和并发程序的设计及实现,写这篇文章是对近期学习课程的总结,方便自己温故时习,感谢USTC付明老师的《多核并行计算》课程,了解更多推荐《The Art of Multiprocessor Programming, Revised Reprint》。

实例:大数组元素的求和

思想:给出4个线程同时对数组的1/4求和。

  • 注意:这是一个低级的算法
  • 创建4个线程,每个线程负责部分的工作
  • 调用start(),启动每个线程并行的运行
  • 使用join()方法等待每个线程运行结束
  • 将4个结果相加在一起
class SumThread extends java.lang.Thread {
  int lo, int hi, int[] arr; // arguments
  int ans = 0; // result
  SumThread(int[] a, int l, int h) { … }
  public void run(){ … } // override
}
int sum(int[] arr){// can be a static method
  int len = arr.length;
  int ans = 0;
  SumThread[] ts = new SumThread[4];
  for(int i=0; i < 4; i++){// do parallel computations
    ts[i] = new SumThread(arr,i*len/4,(i+1)*len/4);
    ts[i].start(); //必须使用start(),而不是run()方法
  }
  for(int i=0; i < 4; i++) { // combine results
    ts[i].join(); // wait for helper to finish!
    ans += ts[i].ans;
  }
  return ans;
}

join()方法使得调用者阻塞,直到receiver 完成了执行;
线程的启动需要用start(),而不是run()

Fork-join framework

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+…+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。

这里写图片描述

  • Fork-join 程序不需要关注线程间的共享内存

EX:求一个数组的最大值。

多线程方法:

public class MyThread implements Runnable {

    //下面两个静态成员变量需要通过用关键字synchronized修饰 的方法来访问
    private volatile static int max = Integer.MIN_VALUE;            //初始最大值

    //下面为成员变量,每个线程对象私有
    private int[] nums;            //待查找数组
    private int low;                  //当前线程对象需要处理的数组开始下标
    private int high;                 //当前线程对象需要处理的数组结束下标

    //构造方法,传入待查找数组、开始下标和结束下标
    public MyThread(int[] nums, int low, int high){
        this.nums = nums;
        this.low = low;
        this.high = high;
    }

    @Override
    public synchronized void run() {

        for(int i=low;i<=high;i++)
        {
            if(nums[i] > max)
            {
                MyThread.setMax(nums[i]);
            }
        }

    }

    public static synchronized int getMax() {
        return max;
    }

    public static synchronized void setMax(int max) {
        MyThread.max = max;
    }

}

Fork-join框架:

public class ForkThread extends RecursiveTask<Integer> {

    private static final int SEQUENTIAL_THRESHOLD = 5;//子数组到分割最终大小

    private final int[] data;
    private final int start;
    private final int end;

public ForkThread(int[] data, int start, int end) {
  this.data = data;
  this.start = start;
  this.end = end;
}

public ForkThread(int[] data) {
  this(data, 0, data.length);
}

    @Override
    protected Integer compute() {
        final int length = end - start;
        if (length < SEQUENTIAL_THRESHOLD) {
            return computeDirectly();
        }
        final int split = length / 2;
        final ForkThread left = new ForkThread(data, start, start + split);
        left.fork();
        final ForkThread right = new ForkThread(data, start + split, end);
        return Math.max(right.compute(), left.join());
    }

private Integer computeDirectly() {
  /*System.out.println(Thread.currentThread() + "computing: " + start
                     + " to " + end);*/
  int max = Integer.MIN_VALUE;
  for (int i = start; i < end; i++) {
    if (data[i] > max) {
      max = data[i];
    }
  }
  return max;
}

    public static void main(String[] args) {
        // create a random data set
        final int[] data = new int[10000];
        final Random random = new Random();
        for (int i = 0; i < data.length; i++) {
            data[i] = (int)Math.floor((random.nextDouble()*100000.0)); 
            System.out.println(data[i]);
        }
        /*int data[] = new int[10000];
        for(int i=0;i<10000;i++)
            data[i] =(int) Math.random()*10000;*/

        // submit the task to the pool
        final ForkJoinPool pool = new ForkJoinPool(4);
        final ForkThread finder = new ForkThread(data);
//      System.out.println(pool.invoke(finder));
    }
}

Main方法线程:

public class SearchMax {

    /**
     * @param args
     */
    //初始化数组
    public static int[] InitialArr(){
        final int[] data = new int[10000];
        final Random random = new Random();
        for (int i = 0; i < data.length; i++) {
            data[i] = (int)Math.floor((random.nextDouble()*100000.0)); 
        }
        return data;
    }
    //分治法
    public static long DivideMax(int arr[]){

        int size = arr.length;
        long startTime=System.nanoTime();
        Thread t1 = new Thread(new MyThread(arr,0,size/2));
        Thread t2 = new Thread(new MyThread(arr,size/2+1,size-1));

        t1.start();
        t2.start();

        try {
            t1.join();
            t2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long endTime=System.nanoTime();
        System.out.println( "分治法: "+MyThread.getMax());

        return (endTime-startTime);
    }


    //顺序查找
    public static long OrderMax(int arr[]){

        int size = arr.length;
        int ret = 0;
        long startTime=System.nanoTime();
        for(int i=0;i<size;i++){
            if(arr[i]>ret)
                ret = arr[i];
        }
        long endTime=System.nanoTime();
        System.out.println( "顺序查找: "+ret);
        return (endTime-startTime);
    }

    //Fork-Join框架
    public static long ForkMax(int arr[]){

        int ret = 0;
        long startTime=System.nanoTime();
        // submit the task to the pool
        final ForkJoinPool pool = new ForkJoinPool(4);
        final ForkThread finder = new ForkThread(arr);

        ret = pool.invoke(finder);
        long endTime=System.nanoTime();
        System.out.println( "Fork_join: "+ret);
        return (endTime-startTime);
    }

    public static void main(String[] args) {

        int data[] = InitialArr();
        System.out.println("分治法花费时间 "+DivideMax(data)+" ns");
        System.out.println("顺序查找花费时间 "+OrderMax(data)+" ns");
        System.out.println("Fork_Join花费时间 "+ForkMax(data)+" ns");

    }

}

运行结果:

这里写图片描述

设计一个Fork/Join框架的步骤:

第一步:分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割的子任务足够小;

第二步:执行任务并合并结果,分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里分别获取任务执行,子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join框架使用两个类来完成以上两件事:

(1)ForkJoinTask:我们要使用Fork Join框架,必须首先创建一个Fork Join任务,它提供在任务中执行fork() 和join()操作的机制,通常情况下我们不需要继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供以下两个子类:

  • RecursiveAction:用于没有返回结果的任务。

  • RecursiveTask:用于有返回结果的任务。

(2)ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

Fork/Join框架的异常处理:

Fork JoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以Fork JoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过Fork JoinTask的getException方法获取异常,使用如下代码:

if(task.isCompletedAbnormally())
{
System.out.println(task.getException());
}
getException方法返回Throwable对象,如果任务被取消了则返回CancellationException,如果任务没有完成或者没有抛出异常则返回null。

EX:大数组求和

class SumArray extends RecursiveTask<Integer> {
  int lo; int hi; int[] arr; // arguments
  SumArray(int[] a, int l, int h) { … }
  protected Integer compute(){// return answer
    if(hi – lo < SEQUENTIAL_CUTOFF) {
      int ans = 0;
      for(int i=lo; i < hi; i++)
        ans += arr[i];
      return ans;
    } else {
      SumArray left = new SumArray(arr,lo,(hi+lo)/2);
      SumArray right= new SumArray(arr,(hi+lo)/2,hi);
      left.fork();
      int rightAns = right.compute();
      int leftAns  = left.join(); 
      return leftAns + rightAns;
    }
  }
}
static final ForkJoinPool fjPool = new ForkJoinPool();
int sum(int[] arr){
  return fjPool.invoke(new SumArray(arr,0,arr.length));
}

算法设计

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

多处理器编程的艺术(二)-并行程序设计 的相关文章

随机推荐

  • 百度飞桨图像分割七日打卡营学习笔记

    百度飞桨图像分割七日打卡营学习笔记 来源 飞桨图像分割教程 课程链接https aistudio baidu com aistudio course introduce 1767 1 语义分割概念 图像分割是计算机视觉中除了分类和检测外的另
  • java项目抠图功能实现

    java项目抠图功能 项目中需要一个上传文字签名并且抠掉背景图的功能 当初第一次听到这个需求时 差点惊掉下巴 我压根都不会觉得java里能实现这功能 但是既然客户需要 那就照办吧 经过这次功能的实现 我也更加坚定了一个想法 再奇葩的需求 也
  • win 11bitlocker恢复密匙一般情况的解决方式(这里指的是你现在使用的微软账户一直没有变更过)

    本来没有打算写这一篇解决帖子的 但是最近好多微博的友友都在私我询问解决方法 孩子虽然很热心 但是真的回复不过来了 打字太累了 这里给大家简单指个路 希望能够对大家有所帮助 当时出现这个问题的时候 一搜网上的解决方法都是重装系统什么的 真是吓
  • flink学习之state

    state作用 保留当前key的历史状态 state用法 ListState
  • 关于Socket通信客户端是否需要绑定端口号

    参见http blog chinaunix net uid 23193900 id 3199173 html 无连接的socket的客户端和服务端以及面向连接socket的服务端通过调用bind函数来配置本地信息 使用bind函数时 通过将
  • C++输出二进制数

    示例 include
  • Lattice Diamond安装

    1 下载 到Lattice官网 http www latticesemi com 注册一个lattice的账号后就可以去下载Diamond 登陆后如下图 根据自己系统情况选择对应的版本 我用的是32位win8 Diamond软件安装包和La
  • DenseFusion复现-可以在30系/40系显卡运行

    笔者电脑显卡为4060 因为使用DenseFusion作者pytorch1 0的代码没有成功 发现很多人在30系显卡上复现失败 经过查资料后发现是因为cuda版本与显卡算力不匹配 需要提高cuda版本 因此也需要提高pytorch版本 后来
  • vue动态添加style样式

    注意 凡是有 的style属性名都要变成驼峰式 比如font size要变成fontSize 除了绑定值 其他的属性名的值要用引号括起来 比如backgroundColor 00a2ff 而不是 backgroundColor 00a2ff
  • 数据库服务器操作系统升级方案,PostgreSQL 数据库跨版本升级常用方案解析

    大家好 我是只谈技术不剪发的 Tony 老师 对于企业而言 将数据库系统升级到新版本通常可以获得更好的性能 更多的功能 最新的安全补丁和错误修复等 因此 本文就来介绍一下 PostgreSQL 数据库版本升级的 3 种常用方案 升级方案概述
  • Element Plus 实例详解(五)___Scrollbar 滚动条

    Element Plus 实例详解 五 Scrollbar 滚动条 本文目录 一 前言 二 搭建Element Plus试用环境 1 搭建Vue3项目 基于Vite Vue 2 安装Element Plus 三 Element Plus S
  • 讲解+可执行完整代码 C++单链表(2)查找、插入、删除元素

    目录 一 查找元素 代码部分 核心代码 完整代码 二 插入元素 核心思路 代码部分 核心代码 完整代码 编辑 三 删除元素 核心思路 代码部分 核心代码 完整代码 一 查找元素 此段代码仅实现查找元素的功能 代码部分 核心代码 node l
  • 探究 Nginx 中 reload 流程的真相

    点击上方 程序员小乐 关注 星标或置顶一起成长 每天凌晨00点00分 第一时间与你相约 每日英文 Try to hold the right hand with your left hand and gave yourself most s
  • web前端技术笔记(二)html 表单 和页面嵌套

    1 相对路径和绝对路径 2 有序和无序列表 3 表格 4 注册表单 表单用于搜集不同类型的用户输入 表单由不同类型的标签组成 相关标签及属性用法如下 1
  • 基于 Zipkin的链路追踪

    Zipkin介绍 Zipkin 是 Twitter 的一个开源项目 它基于 Google Dapper 实现 它致力于收集服务的定时数据 以 解决微服务架构中的延迟问题 包括数据的收集 存储 查找和展现 我们可以使用它来收集各个服务器 上请
  • 每日一练-仓库日志

    仓库日志 题目描述 解题思路 Python源码 Summary Date 2023年1月9日 Author 小 y 同 学 Classify 蓝桥杯每日一练 Language Python 题目描述 题意 M海运公司最近要对旗下仓库的货物进
  • ST公司 Lis2dh12 三轴加速度传感器,计算加速度值转成角度值

    目录 概述 项目上使用了一款Lis2dh12三轴加速度传感器 开发前要准备的工作 1 原理图 1 1 创建lis2dh12 c文件 1 2 在此重点说明 如果想调传感器的中断灵敏度 注意 关注1 INT1 THS 32h 2 INT1 DU
  • 华为OD机试真题- 任务混部【2023Q1】【JAVA、Python、C++】

    题目描述 公司创新实验室正在研究如何最小化资源成本 最大化资源利用率 请你设计算法帮他们解决一个任务混部问题 有taskNum项任务 每个任务有开始时间 startTime 结束时间 endTime 并行度 parallelism 三个属性
  • 什么是分布式架构

    一 分布式架构定义 什么是分布式架构 分布式系统 distributed system 是建立在网络之上的软件系统 内聚性 是指每一个数据库分布节点高度自治 有本地的数据库管理系统 透明性 是指每一个数据库分布节点对用户的应用来说都是透明的
  • 多处理器编程的艺术(二)-并行程序设计

    当处理器的性能的发展受到各方面因素的限制的时候 计算机产业开始用多处理器结构实现并行计算来提高计算的效率 我们使用多处理器共享存储器的方式实现了多处理器编程 也就是多核编程 当然在这样的系统结构下我们面临着各种各样的挑战 例如如何协调各个处