这似乎是一个常见问题,没有简单的答案。其他两个答案都存在可扩展性问题。环形通信方式的通信成本是线性的,而在单向通信方式中MPI_Win
-解决方案,单个进程将受到所有工作线程的内存请求的影响。这对于低数量的排名来说可能没问题,但在增加排名数量时会带来问题。
非阻塞集体可以提供更具可扩展性的更好的解决方案。主要思想是发布一个MPI_Ibarrier
除了一个指定的根之外的所有等级。该根将通过以下方式侦听点对点停止消息MPI_Irecv
并完成MPI_Ibarrier
一旦它收到它。
棘手的部分是需要处理四种不同的情况“{root,non-root} x {found,not-found}”。也可能发生多个队列发送停止消息,需要在根上进行未知数量的匹配接收。这可以通过额外减少来解决,该减少计算发送停止请求的等级数量。
下面是一个在 C 语言中的示例:
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
const int iter_max = 10000;
const int difficulty = 20000;
int find_stuff()
{
int num_iters = rand() % iter_max;
for (int i = 0; i < num_iters; i++) {
if (rand() % difficulty == 0) {
return 1;
}
}
return 0;
}
const int stop_tag = 42;
const int root = 0;
int forward_stop(MPI_Request* root_recv_stop, MPI_Request* all_recv_stop, int found_count)
{
int flag;
MPI_Status status;
if (found_count == 0) {
MPI_Test(root_recv_stop, &flag, &status);
} else {
// If we find something on the root, we actually wait until we receive our own message.
MPI_Wait(root_recv_stop, &status);
flag = 1;
}
if (flag) {
printf("Forwarding stop signal from %d\n", status.MPI_SOURCE);
MPI_Ibarrier(MPI_COMM_WORLD, all_recv_stop);
MPI_Wait(all_recv_stop, MPI_STATUS_IGNORE);
// We must post some additional receives if multiple ranks found something at the same time
MPI_Reduce(MPI_IN_PLACE, &found_count, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
for (found_count--; found_count > 0; found_count--) {
MPI_Recv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &status);
printf("Additional stop from: %d\n", status.MPI_SOURCE);
}
return 1;
}
return 0;
}
int main()
{
MPI_Init(NULL, NULL);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
srand(rank);
MPI_Request root_recv_stop;
MPI_Request all_recv_stop;
if (rank == root) {
MPI_Irecv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &root_recv_stop);
} else {
// You may want to use an extra communicator here, to avoid messing with other barriers
MPI_Ibarrier(MPI_COMM_WORLD, &all_recv_stop);
}
while (1) {
int found = find_stuff();
if (found) {
printf("Rank %d found something.\n", rank);
// Note: We cannot post this as blocking, otherwise there is a deadlock with the reduce
MPI_Request req;
MPI_Isend(NULL, 0, MPI_CHAR, root, stop_tag, MPI_COMM_WORLD, &req);
if (rank != root) {
// We know that we are going to receive our own stop signal.
// This avoids running another useless iteration
MPI_Wait(&all_recv_stop, MPI_STATUS_IGNORE);
MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
MPI_Wait(&req, MPI_STATUS_IGNORE);
break;
}
MPI_Wait(&req, MPI_STATUS_IGNORE);
}
if (rank == root) {
if (forward_stop(&root_recv_stop, &all_recv_stop, found)) {
break;
}
} else {
int stop_signal;
MPI_Test(&all_recv_stop, &stop_signal, MPI_STATUS_IGNORE);
if (stop_signal)
{
MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
printf("Rank %d stopping after receiving signal.\n", rank);
break;
}
}
};
MPI_Finalize();
}
虽然这不是最简单的代码,但它应该:
- 不引入额外的阻塞
- 通过实施屏障(通常
O(log N)
)
- 最坏情况下的延迟发现一个, to all stop2 * 循环时间(+ 1 p2p + 1 障碍 + 1 减少)。
- 如果许多/所有等级同时找到解决方案,它仍然有效,但可能效率较低。