使用 MPI 在 C 中发送二维数组块

2024-03-06

如何将二维数组块发送到不同的处理器?假设 2D 数组大小为 400x400,我想将大小为 100X100 的块发送到不同的处理器。这个想法是每个处理器将在其单独的块上执行计算并将其结果发送回第一个处理器以获得最终结果。
我在 C 程序中使用 MPI。


首先我要说的是,您通常并不真正想要这样做 - 从某个“主”进程中分散和收集大量数据。通常,您希望每个任务都能解决自己的难题,并且您的目标应该是永远不要让一个处理器需要整个数据的“全局视图”;一旦你需要这样做,你就限制了可扩展性和问题的规模。如果您正在为 I/O 执行此操作 - 一个进程读取数据,然后将其分散,然后将其收集回来进行写入,那么您最终将需要研究 MPI-IO。

不过,回到你的问题,MPI 有非常好的方法从内存中提取任意数据,并将其分散到一组处理器或从一组处理器收集数据。不幸的是,这需要相当多的 MPI 概念——MPI 类型、范围和集体操作。这个问题的答案中讨论了很多基本想法——MPI_Type_create_subarray 和 MPI_Gather https://stackoverflow.com/questions/5585630/mpi-type-create-subarray-and-mpi-gather .

Update- 在寒冷的日子里,这是很多代码,而不是很多解释。让我扩展一下。

考虑任务 0 拥有的一维整数全局数组,您希望将其分配给多个 MPI 任务,以便每个任务在其本地数组中获得一个片段。假设你有 4 个任务,全局数组是[01234567]。您可以让任务 0 发送四条消息(其中一条发送给它自己)来分发此消息,并在需要重新组装时接收四条消息以将其捆绑在一起;但这显然会在大量进程中变得非常耗时。对于此类操作(分散/聚集操作)有优化的例程。所以在这种 1d 情况下,你会做这样的事情:

int global[8];   /* only task 0 has this */
int local[2];    /* everyone has this */
const int root = 0;   /* the processor with the initial global data */

if (rank == root) {
   for (int i=0; i<7; i++) global[i] = i;
}

MPI_Scatter(global, 2, MPI_INT,      /* send everyone 2 ints from global */
            local,  2, MPI_INT,      /* each proc receives 2 ints into local */
            root, MPI_COMM_WORLD);   /* sending process is root, all procs in */
                                     /* MPI_COMM_WORLD participate */

之后,处理器的数据将如下所示

task 0:  local:[01]  global: [01234567]
task 1:  local:[23]  global: [garbage-]
task 2:  local:[45]  global: [garbage-]
task 3:  local:[67]  global: [garbage-]

也就是说,分散操作采用全局数组并将连续的 2-int 块发送到所有处理器。

为了重新组装数组,我们使用MPI_Gather()操作,其工作原理完全相同,但方向相反:

for (int i=0; i<2; i++) 
   local[i] = local[i] + rank;

MPI_Gather(local,  2, MPI_INT,      /* everyone sends 2 ints from local */
           global, 2, MPI_INT,      /* root receives 2 ints each proc into global */
           root, MPI_COMM_WORLD);   /* recv'ing process is root, all procs in */
                                    /* MPI_COMM_WORLD participate */

现在数据看起来像

task 0:  local:[01]  global: [0134679a]
task 1:  local:[34]  global: [garbage-]
task 2:  local:[67]  global: [garbage-]
task 3:  local:[9a]  global: [garbage-]

Gather 将所有数据带回来,这里 a 是 10,因为在开始这个示例时我认为我的格式化不够仔细。

如果数据点的数量不能均匀划分进程的数量,并且我们需要向每个进程发送不同数量的项目,会发生什么情况?然后你需要一个通用版本的分散,MPI_Scatterv(),它允许您指定每个的计数 处理器和位移——该数据在全局数组中的起始位置。假设你有一个字符数组[abcdefghi]有 9 个字符,您将为每个进程分配两个字符,除了最后一个,它有 3 个。那么你需要

char global[9];   /* only task 0 has this */
char local[3]={'-','-','-'};    /* everyone has this */
int  mynum;                     /* how many items */
const int root = 0;   /* the processor with the initial global data */

if (rank == 0) {
   for (int i=0; i<8; i++) global[i] = 'a'+i;
}

int counts[4] = {2,2,2,3};   /* how many pieces of data everyone has */
mynum = counts[rank];
int displs[4] = {0,2,4,6};   /* the starting point of everyone's data */
                             /* in the global array */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] pts from displs[i] */
            MPI_INT,      
            local, mynum, MPI_INT;   /* I'm receiving mynum MPI_INTs into local */
            root, MPI_COMM_WORLD);

现在数据看起来像

task 0:  local:[ab-]  global: [abcdefghi]
task 1:  local:[cd-]  global: [garbage--]
task 2:  local:[ef-]  global: [garbage--]
task 3:  local:[ghi]  global: [garbage--]

您现在已经使用 scatterv 来分发不规则数量的数据。每种情况下的位移都是从数组开头开始的二*秩(以字符为单位测量;位移以分散发送的类型或聚集接收的类型为单位;通常不是字节或其他单位),并且计数为 {2,2,2,3}。如果它是我们想要拥有 3 个字符的第一个处理器,我们将设置 counts={3,2,2,2} 且位移将为 {0,3,5,7}。 Gatherv 的工作原理完全相同,但方向相反; counts 和 displs 数组将保持不变。

现在,对于 2D,这有点棘手。如果我们想发送二维数组的二维子块,我们现在发送的数据不再是连续的。如果我们将 6x6 数组的 3x3 子块发送到 4 个处理器,则我们发送的数据中存在漏洞:

2D Array

   ---------
   |000|111|
   |000|111|
   |000|111|
   |---+---|
   |222|333|
   |222|333|
   |222|333|
   ---------

Actual layout in memory

   [000111000111000111222333222333222333]

(请注意,所有高性能计算都归结为理解内存中数据的布局。)

如果我们要将标记为“1”的数据发送到任务1,我们需要跳过三个值,发送三个值,跳过三个值,发送三个值,跳过三个值,发送三个值。第二个复杂问题是子区域在哪里停止和开始;请注意,区域“1”并非从区域“0”停止的地方开始;在区域“0”的最后一个元素之后,内存中的下一个位置是区域“1”的中途。

让我们首先解决第一个布局问题 - 如何仅提取我们想要发送的数据。我们总是可以将所有“0”区域数据复制到另一个连续数组中,然后发送;如果我们计划得足够仔细,我们甚至可以用这样的方式来做到这一点:MPI_Scatter关于结果。但我们宁愿不必以这种方式转置整个主要数据结构。

到目前为止,我们使用的所有 MPI 数据类型都是简单的 - MPI_INT 指定(例如)连续 4 个字节。但是,MPI 允许您创建自己的数据类型来描述内存中任意复杂的数据布局。这种情况——数组的矩形子区域——很常见,因此有一个特定的调用。对于二维 我们上面描述的情况,

    MPI_Datatype newtype;
    int sizes[2]    = {6,6};  /* size of global array */
    int subsizes[2] = {3,3};  /* size of sub-region */
    int starts[2]   = {0,0};  /* let's say we're looking at region "0",
                                 which begins at index [0,0] */

    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &newtype);
    MPI_Type_commit(&newtype);

这将创建一个从全局数组中仅挑选出区域“0”的类型;我们可以 现在只将该数据发送到另一个处理器

    MPI_Send(&(global[0][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "0" */

接收进程可以将其接收到本地数组中。请注意,如果接收进程仅将其接收到 3x3 数组中,则可以not描述它所接收的内容的类型newtype;不再描述内存布局。相反,它只是接收一个 3*3 = 9 个整数块:

    MPI_Recv(&(local[0][0]), 3*3, MPI_INT, 0, tag, MPI_COMM_WORLD);

请注意,我们也可以通过创建不同的类型(具有不同的startarray)用于其他块,或者仅通过在特定块的起点发送:

    MPI_Send(&(global[0][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "1" */
    MPI_Send(&(global[3][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "2" */
    MPI_Send(&(global[3][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "3" */

最后,请注意,这里我们要求全局和局部是连续的内存块;那是,&(global[0][0]) and &(local[0][0])(或者,等价地,*global and *local指向连续的 6*6 和 3*3 内存块;分配动态多维数组的常用方法不能保证这一点。下面展示了如何执行此操作。

现在我们了解了如何指定子区域,在使用分散/聚集操作之前只剩下一件事需要讨论,那就是这些类型的“大小”。我们不能只使用MPI_Scatter()(甚至是 scatterv)这些类型,因为这些类型的范围为 16 个整数;也就是说,它们结束的位置是它们开始后的 16 个整数 - 并且它们结束的位置与下一个块的开始位置不能很好地对齐,所以我们不能只使用 scatter - 它会选择错误的位置来开始发送数据到下一个处理器。

当然,我们可以使用MPI_Scatterv()并自己指定位移,这就是我们要做的 - 除了位移以发送类型大小为单位,这对我们也没有帮助;这些块从距全局数组开头的 (0,3,18,21) 个整数的偏移量开始,并且块以距其起始位置 16 个整数结束的事实根本不允许我们以整数倍的形式表示这些位移。

为了解决这个问题,MPI 允许您设置类型的范围以进行这些计算。它不会截断类型;它只是用于确定给定最后一个元素的下一个元素的开始位置。对于此类带有孔的类型,将范围设置为小于内存中到类型实际末尾的距离通常很方便。

我们可以将范围设置为任何对我们方便的值。我们可以将范围设置为整数,然后以整数为单位设置位移。不过,在这种情况下,我喜欢将范围设置为 3 个整数(子行的大小),这样,块“1”在块“0”之后立即开始,块“3”在块“之后立即开始” 2”。不幸的是,当从块“2”跳转到块“3”时,它并不能很好地工作,但这也是无济于事的。

因此,为了在这种情况下分散子块,我们将执行以下操作:

    MPI_Datatype type, resizedtype;
    int sizes[2]    = {6,6};  /* size of global array */
    int subsizes[2] = {3,3};  /* size of sub-region */
    int starts[2]   = {0,0};  /* let's say we're looking at region "0",
                                 which begins at index [0,0] */

    /* as before */
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);  
    /* change the extent of the type */
    MPI_Type_create_resized(type, 0, 3*sizeof(int), &resizedtype);
    MPI_Type_commit(&resizedtype);

在这里,我们创建了与之前相同的块类型,但我们调整了它的大小;我们没有改变类型“开始”的位置(0),但我们改变了它“结束”的位置(3 个整数)。我们之前没有提到过这一点,但是MPI_Type_commit需要能够使用该类型;但您只需要提交实际使用的最终类型,而不需要任何中间步骤。你用MPI_Type_free完成后释放类型。

现在,我们终于可以分散块了:上面的数据操作有点复杂,但是一旦完成,分散看起来就像以前一样:

int counts[4] = {1,1,1,1};   /* how many pieces of data everyone has, in units of blocks */
int displs[4] = {0,1,6,7};   /* the starting point of everyone's data */
                             /* in the global array, in block extents */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] types from displs[i] */
            resizedtype,      
            local, 3*3, MPI_INT;   /* I'm receiving 3*3 MPI_INTs into local */
            root, MPI_COMM_WORLD);

在简要介绍了分散、聚集和 MPI 派生类型之后,现在我们已经完成了。

下面是一个示例代码,显示了使用字符数组的聚集和分散操作。运行程序:

$ mpirun -n 4 ./gathervarray
Global array is:
0123456789
3456789012
6789012345
9012345678
2345678901
5678901234
8901234567
1234567890
4567890123
7890123456
Local process on rank 0 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Local process on rank 1 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 2 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 3 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Processed grid:
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD

代码如下。

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"

int malloc2dchar(char ***array, int n, int m) {

    /* allocate the n*m contiguous items */
    char *p = (char *)malloc(n*m*sizeof(char));
    if (!p) return -1;

    /* allocate the row pointers into the memory */
    (*array) = (char **)malloc(n*sizeof(char*));
    if (!(*array)) {
       free(p);
       return -1;
    }

    /* set up the pointers into the contiguous memory */
    for (int i=0; i<n; i++)
       (*array)[i] = &(p[i*m]);

    return 0;
}

int free2dchar(char ***array) {
    /* free the memory - the first element of the array is at the start */
    free(&((*array)[0][0]));

    /* free the pointers into the memory */
    free(*array);

    return 0;
}

int main(int argc, char **argv) {
    char **global, **local;
    const int gridsize=10; // size of grid
    const int procgridsize=2;  // size of process grid
    int rank, size;        // rank of current process and no. of processes

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);


    if (size != procgridsize*procgridsize) {
        fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
        MPI_Abort(MPI_COMM_WORLD,1);
    }


    if (rank == 0) {
        /* fill in the array, and print it */
        malloc2dchar(&global, gridsize, gridsize);
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                global[i][j] = '0'+(3*i+j)%10;
        }


        printf("Global array is:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                putchar(global[i][j]);

            printf("\n");
        }
    }

    /* create the local array which we'll process */
    malloc2dchar(&local, gridsize/procgridsize, gridsize/procgridsize);

    /* create a datatype to describe the subarrays of the global array */

    int sizes[2]    = {gridsize, gridsize};         /* global size */
    int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize};     /* local size */
    int starts[2]   = {0,0};                        /* where this one starts */
    MPI_Datatype type, subarrtype;
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &type);
    MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(char), &subarrtype);
    MPI_Type_commit(&subarrtype);

    char *globalptr=NULL;
    if (rank == 0) globalptr = &(global[0][0]);

    /* scatter the array to all processors */
    int sendcounts[procgridsize*procgridsize];
    int displs[procgridsize*procgridsize];

    if (rank == 0) {
        for (int i=0; i<procgridsize*procgridsize; i++) sendcounts[i] = 1;
        int disp = 0;
        for (int i=0; i<procgridsize; i++) {
            for (int j=0; j<procgridsize; j++) {
                displs[i*procgridsize+j] = disp;
                disp += 1;
            }
            disp += ((gridsize/procgridsize)-1)*procgridsize;
        }
    }


    MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
                 gridsize*gridsize/(procgridsize*procgridsize), MPI_CHAR,
                 0, MPI_COMM_WORLD);

    /* now all processors print their local data: */

    for (int p=0; p<size; p++) {
        if (rank == p) {
            printf("Local process on rank %d is:\n", rank);
            for (int i=0; i<gridsize/procgridsize; i++) {
                putchar('|');
                for (int j=0; j<gridsize/procgridsize; j++) {
                    putchar(local[i][j]);
                }
                printf("|\n");
            }
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    /* now each processor has its local array, and can process it */
    for (int i=0; i<gridsize/procgridsize; i++) {
        for (int j=0; j<gridsize/procgridsize; j++) {
            local[i][j] = 'A' + rank;
        }
    }

    /* it all goes back to process 0 */
    MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize),  MPI_CHAR,
                 globalptr, sendcounts, displs, subarrtype,
                 0, MPI_COMM_WORLD);

    /* don't need the local data anymore */
    free2dchar(&local);

    /* or the MPI data type */
    MPI_Type_free(&subarrtype);

    if (rank == 0) {
        printf("Processed grid:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++) {
                putchar(global[i][j]);
            }
            printf("\n");
        }

        free2dchar(&global);
    }


    MPI_Finalize();

    return 0;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 MPI 在 C 中发送二维数组块 的相关文章

随机推荐