在不阻塞的情况下“通知”处理器的正确方法是什么?

2024-04-20

假设我有很多东西,我必须对所有这些东西进行一些操作。 如果一个元素的操作失败,我想停止整个阵列的工作[这项工作分布在多个处理器上]。

我想实现这一目标,同时将发送/接收的消息数量保持在最低限度。 另外,如果没有必要,我不想阻止处理器。

我该如何使用 MPI 来做到这一点?


这似乎是一个常见问题,没有简单的答案。其他两个答案都存在可扩展性问题。环形通信方式的通信成本是线性的,而在单向通信方式中MPI_Win-解决方案,单个进程将受到所有工作线程的内存请求的影响。这对于低数量的排名来说可能没问题,但在增加排名数量时会带来问题。

非阻塞集体可以提供更具可扩展性的更好的解决方案。主要思想是发布一个MPI_Ibarrier除了一个指定的根之外的所有等级。该根将通过以下方式侦听点对点停止消息MPI_Irecv并完成MPI_Ibarrier一旦它收到它。

棘手的部分是需要处理四种不同的情况“{root,non-root} x {found,not-found}”。也可能发生多个队列发送停止消息,需要在根上进行未知数量的匹配接收。这可以通过额外减少来解决,该减少计算发送停止请求的等级数量。

下面是一个在 C 语言中的示例:

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

const int iter_max = 10000;
const int difficulty = 20000;

int find_stuff()
{
    int num_iters = rand() % iter_max;
    for (int i = 0; i < num_iters; i++) {
        if (rand() % difficulty == 0) {
            return 1;
        }
    }
    return 0;
}

const int stop_tag = 42;
const int root = 0;

int forward_stop(MPI_Request* root_recv_stop, MPI_Request* all_recv_stop, int found_count)
{
    int flag;
    MPI_Status status;
    if (found_count == 0) {
        MPI_Test(root_recv_stop, &flag, &status);
    } else {
        // If we find something on the root, we actually wait until we receive our own message.
        MPI_Wait(root_recv_stop, &status);
        flag = 1;
    }
    if (flag) {
        printf("Forwarding stop signal from %d\n", status.MPI_SOURCE);
        MPI_Ibarrier(MPI_COMM_WORLD, all_recv_stop);
        MPI_Wait(all_recv_stop, MPI_STATUS_IGNORE);
        // We must post some additional receives if multiple ranks found something at the same time
        MPI_Reduce(MPI_IN_PLACE, &found_count, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
        for (found_count--; found_count > 0; found_count--) {
            MPI_Recv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &status);
            printf("Additional stop from: %d\n", status.MPI_SOURCE);
        }
        return 1;
    }
    return 0;
}

int main()
{
    MPI_Init(NULL, NULL);

    int rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    srand(rank);

    MPI_Request root_recv_stop;
    MPI_Request all_recv_stop;
    if (rank == root) {
        MPI_Irecv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &root_recv_stop);
    } else {
        // You may want to use an extra communicator here, to avoid messing with other barriers
        MPI_Ibarrier(MPI_COMM_WORLD, &all_recv_stop);
    }

    while (1) {
        int found = find_stuff();
        if (found) {
            printf("Rank %d found something.\n", rank);
            // Note: We cannot post this as blocking, otherwise there is a deadlock with the reduce
            MPI_Request req;
            MPI_Isend(NULL, 0, MPI_CHAR, root, stop_tag, MPI_COMM_WORLD, &req);
            if (rank != root) {
                // We know that we are going to receive our own stop signal.
                // This avoids running another useless iteration
                MPI_Wait(&all_recv_stop, MPI_STATUS_IGNORE);
                MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
                MPI_Wait(&req, MPI_STATUS_IGNORE);
                break;
            }
            MPI_Wait(&req, MPI_STATUS_IGNORE);
        }
        if (rank == root) {
            if (forward_stop(&root_recv_stop, &all_recv_stop, found)) {
                break;
            }
        } else {
            int stop_signal;
            MPI_Test(&all_recv_stop, &stop_signal, MPI_STATUS_IGNORE);
            if (stop_signal)
            {
                MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
                printf("Rank %d stopping after receiving signal.\n", rank);
                break;
            }
        }
    };

    MPI_Finalize();
}

虽然这不是最简单的代码,但它应该:

  • 不引入额外的阻塞
  • 通过实施屏障(通常O(log N))
  • 最坏情况下的延迟发现一个, to all stop2 * 循环时间(+ 1 p2p + 1 障碍 + 1 减少)。
  • 如果许多/所有等级同时找到解决方案,它仍然有效,但可能效率较低。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在不阻塞的情况下“通知”处理器的正确方法是什么? 的相关文章

  • 带有可运行对象和结果的 FutureTask

    我用谷歌搜索了这个 但仍然无法得到充分的理解 我找不到任何使用的特定示例FutureTask Runnable runnable V result 构造函数 Java 文档说 未来提交 可运行任务 T结果 提交一个 Runnable 任务来

随机推荐