

假设我有很多东西,我必须对所有这些东西进行一些操作。 如果一个元素的操作失败,我想停止整个阵列的工作[这项工作分布在多个处理器上]。

我想实现这一目标,同时将发送/接收的消息数量保持在最低限度。 另外,如果没有必要,我不想阻止处理器。

我该如何使用 MPI 来做到这一点?



棘手的部分是需要处理四种不同的情况“{root,non-root} x {found,not-found}”。也可能发生多个队列发送停止消息,需要在根上进行未知数量的匹配接收。这可以通过额外减少来解决,该减少计算发送停止请求的等级数量。

下面是一个在 C 语言中的示例:

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

const int iter_max = 10000;
const int difficulty = 20000;

int find_stuff()
    int num_iters = rand() % iter_max;
    for (int i = 0; i < num_iters; i++) {
        if (rand() % difficulty == 0) {
            return 1;
    return 0;

const int stop_tag = 42;
const int root = 0;

int forward_stop(MPI_Request* root_recv_stop, MPI_Request* all_recv_stop, int found_count)
    int flag;
    MPI_Status status;
    if (found_count == 0) {
        MPI_Test(root_recv_stop, &flag, &status);
    } else {
        // If we find something on the root, we actually wait until we receive our own message.
        MPI_Wait(root_recv_stop, &status);
        flag = 1;
    if (flag) {
        printf("Forwarding stop signal from %d\n", status.MPI_SOURCE);
        MPI_Ibarrier(MPI_COMM_WORLD, all_recv_stop);
        MPI_Wait(all_recv_stop, MPI_STATUS_IGNORE);
        // We must post some additional receives if multiple ranks found something at the same time
        MPI_Reduce(MPI_IN_PLACE, &found_count, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
        for (found_count--; found_count > 0; found_count--) {
            MPI_Recv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &status);
            printf("Additional stop from: %d\n", status.MPI_SOURCE);
        return 1;
    return 0;

int main()
    MPI_Init(NULL, NULL);

    int rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    MPI_Request root_recv_stop;
    MPI_Request all_recv_stop;
    if (rank == root) {
        MPI_Irecv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &root_recv_stop);
    } else {
        // You may want to use an extra communicator here, to avoid messing with other barriers
        MPI_Ibarrier(MPI_COMM_WORLD, &all_recv_stop);

    while (1) {
        int found = find_stuff();
        if (found) {
            printf("Rank %d found something.\n", rank);
            // Note: We cannot post this as blocking, otherwise there is a deadlock with the reduce
            MPI_Request req;
            MPI_Isend(NULL, 0, MPI_CHAR, root, stop_tag, MPI_COMM_WORLD, &req);
            if (rank != root) {
                // We know that we are going to receive our own stop signal.
                // This avoids running another useless iteration
                MPI_Wait(&all_recv_stop, MPI_STATUS_IGNORE);
                MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
                MPI_Wait(&req, MPI_STATUS_IGNORE);
            MPI_Wait(&req, MPI_STATUS_IGNORE);
        if (rank == root) {
            if (forward_stop(&root_recv_stop, &all_recv_stop, found)) {
        } else {
            int stop_signal;
            MPI_Test(&all_recv_stop, &stop_signal, MPI_STATUS_IGNORE);
            if (stop_signal)
                MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
                printf("Rank %d stopping after receiving signal.\n", rank);



  • 不引入额外的阻塞
  • 通过实施屏障(通常O(log N))
  • 最坏情况下的延迟发现一个, to all stop2 * 循环时间(+ 1 p2p + 1 障碍 + 1 减少)。
  • 如果许多/所有等级同时找到解决方案,它仍然有效,但可能效率较低。

