跳转到内容

消息传递接口

25% developed
来自维基教科书,开放的世界开放的书籍

本指南假设您具备 C 编程的先前知识,并将通过几个示例向您介绍 消息传递接口 (MPI)。

什么是 MPI ?

[编辑 | 编辑源代码]

MPI 是一个标准化的、可移植的消息传递系统。消息传递系统特别用于具有独立内存的分布式机器上,用于执行并行应用程序。使用此系统,每个执行的进程将通过发送和接收消息与其他进程通信和共享其数据。MPI 是由 MPI 论坛产生的规范,该论坛涉及多个组织设计可移植系统(允许程序在异构网络上运行)。

由于数据只能通过交换消息来共享,因此此标准不打算用于共享内存系统,例如多处理器计算机(虽然它可以工作,但这不是主要目标,并且存在更强大的替代方案,例如 OpenMP)。基本上,MPI 包括点对点通信、集体通信(在进程网络上)、进程组、Fortran 和 C 语言绑定以及其他高级功能。另一方面,该标准没有指定明确的共享内存操作、调试工具、对线程的明确支持、I/O 函数。

当前版本是 2.0,尽管 1.1 仍在使用。

建议使用 William Gropp、Ewing Lusk 和 Anthony Skjellum 撰写的《使用 MPI:消息传递接口的可移植并行编程》作为 MPI 入门介绍。有关更完整的信息,请阅读 Snir、Otto、Huss-Lederman、Walker 和 Dongarra 撰写的 MPI:完整参考。此外,该标准本身可以在 [1][2] 中找到。有关其他互联网参考资料,例如教程,请参阅 开放目录项目阿贡国家实验室的教程列表

目前,有两个主要的实现:MPICH 和 LAM。后者的官方网站 ([3]) 包含关于 LAM 和 MPI 的大量信息。

补充库

[编辑 | 编辑源代码]

此外,还存在用于解决数值问题或在分布式机器上使用 I/O 函数的库。我们可以提到:ScaLAPACK 库、FFTW(一个可移植的 C 语言 FFT 库)、PETSc 科学计算库。可以在 [4] 网站上找到更完整的列表。

开发工具

[编辑 | 编辑源代码]

编译器

[编辑 | 编辑源代码]

每个实现都提供一个编译器,或者至少提供一个现有编译器的前端。

由于 MPI 没有指定调试工具,因此可以使用一些程序来完成此目的:XMPI 是 LAM/MPI 的运行/调试 GUI;MPI-CHECK 是 Fortran 的调试器;Etnus, Inc. 提供支持 MPICH(以及其他一些 MPI 实现)的 Totalview 调试器的版本;并且,Streamline Computing 的分布式调试工具与多个 MPI 实现一起工作,包括 MPICH 和 LAM/MPI。

基准测试

[编辑 | 编辑源代码]

Mpptest、SKaMPI 和 MPBench 测量 MPI 实现的性能。它们可用于确定使用哪种实现、如何实现可移植且高效的 MPI 程序,以及预测 MPI 程序的性能。

著名的流行 MPI 基准测试包括 NASA 并行基准测试、SpecMPI 2007 和 HPCC 基准测试套件。

它可以在大型异构 parc 上使用。

数组求和

[编辑 | 编辑源代码]

此第一个简单示例的目的是计算存储在数组中的所有值的总和。在 C 编程中,顺序版本如下所示

 
 /* Sum of an array - Sequential version */
 #include <stdio.h>
 #define N 100000
 
 int main (void) {
   float array[N];
   double mysum = 0;
   unsigned long long i;
   //Initialization of the array
   for (i = 0; i < N; i++)
     array[i] = i + 1;
   //Calculation of the sum
   for (i = 0; i < N; i++)
     mysum += array[i];
   printf ("%lf\n", mysum);
   return 0;
 }

第一个版本 - 基本点对点通信

[编辑 | 编辑源代码]

MPI 实现允许用户启动多个进程,每个进程都有一个定义的数字标识符,即其。按照惯例,我们将认为主进程是秩为 '0' 的进程。所有其他进程都是从进程。在这个程序中,主进程将数组分成子数组并将其发送给每个从进程。然后,每个从进程将计算其子数组的总和。在数组大小不是从进程数量的偶数倍的情况下,主进程将通过计算数组最后几个值的总和来完成剩余的工作(见下图)。

从数组到进程的数据分配

对于每个进程,程序的源代码是相同的,因此可执行代码也是相同的。因此,代码必须包含从进程和主进程部分。将执行的部分取决于进程的秩。

在这个第一个版本中,工作被分成两个部分

 /* Sum of an array - First parallel version */
 
 #include <stdio.h>
 #include <mpi.h>
 
 #define N 100000
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   //Initialization of MPI
   MPI_Init (&argc, &argv);
   //myrank will contain the rank of the process
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   //The part of the program which will be executed is decided
   if (myrank == 0)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void master (void) {
   float array[N];
   double mysum = 0, tmpsum;
   unsigned long long i;
   MPI_Status status;
   //Initialization of the array
   for (i = 0; i < N; i++)
     array[i] = i + 1;
   //The array is divided by two and each part is sent to the slaves
   MPI_Send (array, N / 2, MPI_FLOAT, 1, MSG_DATA, MPI_COMM_WORLD);
   MPI_Send (array + N / 2, N / 2, MPI_FLOAT, 2, MSG_DATA, MPI_COMM_WORLD);
   //The master receive the result from the slaves
   MPI_Recv (&tmpsum, 1, MPI_DOUBLE, 1, MSG_RESULT, MPI_COMM_WORLD, &status);
   mysum += tmpsum;
   MPI_Recv (&tmpsum, 1, MPI_DOUBLE, 2, MSG_RESULT, MPI_COMM_WORLD, &status);
   mysum += tmpsum;
   printf ("%lf\n", mysum);
 }
 
 void slave (void) {
   float array[N];
   double sum;
   unsigned long long i;
   MPI_Status status;
   //The slave receives the array from the master
   MPI_Recv (array, N / 2, MPI_FLOAT, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0, sum = 0; i < N / 2; i++)
     sum += array[i];
   //The result is send to the master
   MPI_Send (&sum, 1, MPI_DOUBLE, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

这个程序将逐步解释。

首先,为了使用 MPI 函数,我们必须包含文件mpi.h,它包含所需函数的原型。数组大小由N定义。MSG_DATAMSG_RESULT将被MPI_SendMPI_RECV函数使用。声明了两个函数原型。这些函数分别包含主进程和从进程的代码。

主函数选择它应该执行主进程部分(如果进程秩为 0)还是从进程部分(对于任何其他进程秩)。该函数从MPI_INIT例程开始,该例程必须在任何其他 MPI 函数之前调用。这个例程执行各种初始化函数。接下来,通过调用MPI_COMM_RANK函数获得进程的秩,并根据结果执行主进程部分或从进程部分。在计算结束时,调用最终例程MPI_FINALIZE。它执行与结束使用 MPI 相关的各种管理任务。

在程序的顺序版本中没有找到的两个声明包含在主进程中:tmpsum变量,它将存储两个从进程中的每个结果,以及status变量(其类型由 MPI 定义),它由MPI_RECV函数需要。

计算部分现在被对MPI_SENDMPI_RECV的两次调用替换。MPI_SEND函数的参数是第一个要发送的元素的地址,要发送的元素数量,它们的类型(最常见的 MPI 类型是MPI_INTMPI_CHARMPI_FLOATMPI_DOUBLE,...),接收者的秩,消息的标记,以及通信器。在最简单的用法中,通信器将是MPI_COMM_WORLD,它包含所有参与程序执行的进程。MPI_RECV函数的参数是接收缓冲区的地址,要接收的最大元素数量,它们的类型,发送者的秩,消息的标记,通信器,以及状态。对于通用消息,类型、标记和通信器必须相同(见下图)。

同一消息的MPI_SendMPI_RECV函数的参数

数组的前一半被发送到第一个从进程,数组的后一半被发送到第二个从进程。在每种情况下,子数组的大小都是N/2。接下来,从从进程接收结果,存储并添加到最终总和中。

然而,在它可以从从进程接收计算后的总和之前,每个从进程必须首先对它从主进程接收到的子数组中的元素求和。然后,从进程将结果发送回主进程。

当多个消息以任意顺序发送到同一个进程时,标记参数允许该进程区分这些消息并接收它们。

要使用 lam 实现执行此程序,必须启动 lam 守护程序

$ lamboot

接下来,mpicc 和 mpirun 用于编译和执行

$ mpicc ./source.c
$ mpirun -np 3 ./a.out

选项 -np 指定要启动的进程数量。在这个例子中,它必须是三个(一个主进程,两个从进程)。程序mpirun创建一个 MPI 环境,将a.out复制到所有三个处理器并运行每个处理器,并设置适当的处理器编号。

第二个版本 - 适应进程数量

[edit | edit source]

这个基本程序有两个问题

  • 程序需要两个从进程才能正确运行,并且无法自行适应可用的进程数量
  • 主进程以预定义的顺序接收数据(第一个从进程,然后是第二个从进程)。但是,第二个从进程可能比第一个从进程先完成

这导致了这个第二个版本

 /* Sum of an array - Second parallel version */
 
 #include <stdio.h>
 #include <mpi.h>
 
 #define N 100000
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void master (void) {
   float array[N];
   double mysum, tmpsum;
   unsigned long long step, i;
   int size;
   MPI_Status status;
   //Initialization of the array
   for (i = 0; i < N; i++)
     array[i] = i + 1;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   if (size != 1)
     step = N / (size - 1);
   //The array is divided by the number of slaves
   for (i = 0; i < size - 1; i++)
     MPI_Send (array + i * step, step, MPI_FLOAT, i + 1, MSG_DATA, MPI_COMM_WORLD);
   //The master completes the work if necessary
   for (i = (size - 1) * step, mysum = 0; i < N; i++)
     mysum += array[i];
   //The master receives the results in any order
   for (i = 1; i < size; mysum += tmpsum, i++)
     MPI_Recv (&tmpsum, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MSG_RESULT, MPI_COMM_WORLD, &status);
   printf ("%lf\n", mysum);
 }
 
 void slave (void) {
   float array[N];
   double sum;
   unsigned long long i;
   int count;
   MPI_Status status;
   MPI_Recv (array, N, MPI_FLOAT, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   //The slave finds the size of the array
   MPI_Get_count (&status, MPI_FLOAT, &count);
   for (i = 0, sum = 0; i < count; i++)
     sum += array[i];
   MPI_Send (&sum, 1, MPI_DOUBLE, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

对于第一个问题,主进程需要进程数量来确定子数组的大小(它将存储在 step 变量中)。MPI_COMM_SIZE函数在 size 变量中给出这个指示,因此N被从进程数量(即size−1)除。接下来,数组被发送到从进程,并且当从进程计算时,主进程将在必要时完成工作。然后,它将接收结果并将其添加到最终总和中。为了解决第二个问题,主进程将从任何源接收结果。这导致在MPI_Recv函数中使用 MPI_ANY_SOURCE 来代替进程的秩。

从进程不需要知道进程数量,但需要知道它接收的子数组的大小。为此,我们将使用 status 参数。实际上,status 是一个包含三个数据的结构:消息的来源(如果使用MPI_ANY_SOURCE,这很有用),消息的标记(如果使用MPI_ANY_TAG,这很有用),以及错误代码。此外,我们可以使用MPI_Get_count函数和 status 参数访问接收数据的实际长度。count 变量是子数组的长度。

这个版本的程序可以用任何数量的进程使用,甚至可以用 1 个进程。在这种特殊情况下,主进程将执行所有计算。

不幸的是,这个程序不适合 MPI,因为,存在大量数据传输和少量计算。虽然 MPI 是为非共享内存操作而设计的,但为了在消息中尽可能减少时间损失,计算必须比数据传输多。我们可以通过只发送第一个数字和最后一个数字来改进这个程序(并添加这些数字之间的所有整数),但在这种情况下,只能计算几何总和。

通过辛普森方法进行积分

[edit | edit source]

这个程序的目的是通过辛普森方法计算给定函数的积分。这个例子更适合 MPI,因为它比以前需要更多的计算和更少的数据传输。数学方程是

在 C 编程中,顺序版本是

 /* Integration - Simpson method - Sequential version */
 
 #include <stdio.h>
 #include <math.h>
 #define n 10000 //Must be even
 
 double f (double x);
 
 int main (void) {
   double integration,step;
   int i;
   printf ("Enter the domain of integration: ");
   scanf ("%lf %lf", &a, &b);
   step = (b - a) / n;
   for (i = 1, integration = 0; i < n / 2; i++)
     integration += 2*f(a+(2*i-1)*step) + f(a+2*i*step);
   integration *= 2;
   integration += f(a) + f(b) + 4*f(b - step);
   integration *= step / 3;
   printf ("%.15lf\n", integration);
   return 0;
 }
 
 double f (double x) {
   return exp(-1*pow(x,2));
 }

第一个版本 - 用户定义的类型

[edit | edit source]

要完成的工作就像前面的例子一样(一个数组,每个盒子都有不同的系数)。工作将按以下图所示的方式划分。

从总和到进程的数据分配。

从属进程会将总和进行分割并计算,主进程则计算其他值,并在必要时像之前一样完成工作。

总和的域将被发送到每个从属进程,它们将评估函数的多个返回值并将其求和。将发送的信息包括总和的起始位置(begin)、每个值之间的差值(step)以及必须评估函数的值的数量(两个双精度数和一个整数)。但是,MPI_Send 例程发送的数据必须具有相同的类型。然后,为了最小化发送的消息数量,从而优化程序,将创建并使用 MPI 类型来发送这些数据。

通常,为了减少通信中浪费的时间(发送最少的消息,并使用最少的数据),需要对要发送的数据进行分组并最小化。因此,需要将两个整数存储在数组中并发送到一条唯一的消息中,而不是分别发送到两个不同的消息中。

由于工作分割的速度快于工作本身,因此,在使用少数进程时,主进程将等待较长时间才能获得从属进程的结果。在本程序中,如果进程数少于定义的进程数(LIMIT_PROC),则主进程将参与计算(即,工作将根据进程数进行分割,而不是根据从属进程数)。

 /* Integration - Simpson method - First parallel version */
 
 #include <stdio.h>
 #include <math.h>
 #include <mpi.h>
 #define n 10000 //Must be even
 #define LIMIT_PROC 4 //Number of process for which the master stop to work and only divide the work
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 struct domain_of_sum {
   double begin;
   double step;
   int number; //Number of iteration
 };
 
 double f (double x);
 MPI_Datatype Init_Type_Domain (void); //Declaration of a MPI type which will contain the domain
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 MPI_Datatype Init_Type_Domain (void) {
   MPI_Datatype Domain;
   MPI_Datatype type[3] = { MPI_DOUBLE, MPI_DOUBLE, MPI_INT };
   int blocklen[3] = { 1, 1, 1 };
   MPI_Aint ext;
   MPI_Type_extent (MPI_DOUBLE, &ext);
   MPI_Aint disp[3] = { 0, ext, 2 * ext };
   MPI_Type_struct (3, blocklen, disp, type, &Domain);
   return Domain;
 }
 
 void master (void) {
   double integration, a, b, sum;
   int i, size;
   struct domain_of_sum mydomain; //The C structure which will contain the domain
   MPI_Status status;
   MPI_Datatype Domain = Init_Type_Domain (); //Creation of the MPI type
   MPI_Type_commit (&Domain);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   printf ("Enter the domain of integration: ");
   scanf ("%lf %lf", &a, &b);
   mydomain.step = (b - a) / n; //step of the integration
   //The work is divided
   if (size > 1) {
     if (size > LIMIT_PROC)
       mydomain.number = (n / 2 - 1) / (size - 1);
     else
       mydomain.number = (n / 2 - 1) / size;
   }
   mydomain.begin = a + mydomain.step;
   //Each domain are sent
   for (i = 0; i < size - 1; i++) {
     mydomain.begin = a + (2 * i * mydomain.number + 1) * mydomain.step;
     MPI_Send (&mydomain, 1, Domain, i + 1, MSG_DATA, MPI_COMM_WORLD);
   }
   //The master complete the work
   for (i = (size - 1) * mydomain.number + 1, integration = 0; i < n / 2; i++)
     integration += 2 * f (a + (2 * i - 1) * mydomain.step) + f (a + 2 * i * mydomain.step);
   //The master receive the result
   for (i = 1; i < size; i++) {
     MPI_Recv (&sum, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MSG_RESULT,
     MPI_COMM_WORLD, &status);
     integration += sum;
   }
   integration *= 2;
   integration += f (a) + f (b) + 4 * f (b - mydomain.step);
   integration *= mydomain.step / 3;
   printf ("%.15lf\n", integration);
 }
 
 void slave (void) {
   double sum;
   int i;
   struct domain_of_sum mydomain;
   MPI_Status status;
   MPI_Datatype Domain = Init_Type_Domain (); //Creation of the MPI type 
   MPI_Type_commit (&Domain);
   MPI_Recv (&mydomain, 1, Domain, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0, sum = 0; i < mydomain.number; i++)
     sum += 2*f(mydomain.begin+(2*i)*mydomain.step) + f(mydomain.begin+(2*i+1)*mydomain.step);
   MPI_Send (&sum, 1, MPI_LONG_LONG_INT, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

用于创建 MPI 类型的全部例程都被重新组合到 Init_Type_Domain 函数中,以简化代码。MPI_TYPE_STRUCT 函数用于创建新类型并将其存储在 Domain 中。此函数需要 5 个参数:新类型将包含的块数(在本例中为两个双精度数和一个整数);这些块中每个元素在数组中的数量(一个双精度数、另一个双精度数和一个整数);每个块在消息中的偏移量数组;这些类型的数组;以及将包含该结构的变量的地址。每个块的偏移量是通过使用 MPI_TYPE_EXTENT 函数获得的,该函数提供了 MPI_DOUBLE 的长度。它等效于 C 中的 sizeof() 函数。MPI 类型的长度存储在 MPI_Aint 变量中。

另一种可能性是声明一个结构,该结构包含两个双精度数数组和一个整数。

 struct domain_of_sum {
   double array[2]; //Contains begin and step
   int number;
 };
 
 //The new MPI type
 MPI_Datatype Init_Type_Domain (void) {
   MPI_Datatype Domain;
   MPI_Datatype type[2] = { MPI_DOUBLE, MPI_INT };
   int blocklen[2] = { 2, 1 };
   MPI_Aint ext;
   MPI_Type_extent (MPI_DOUBLE, &ext);
   MPI_Aint disp[2] = { 0, 2 * ext };
   MPI_Type_struct (2, blocklen, disp, type, &Domain);
   return Domain;
 }

第二版

[edit | edit source]

已使用两种不同的方法来确定从属进程必须实现的操作数量。在第一个示例中,从属进程使用 MPI_Recv 函数的 status 参数。在第二个示例中,此数字直接与数据一起发送。另一种方法是通过与主进程设置此数据相同的方式获得此数字,即使用进程数。为了最小化数据传输量,我们将使用最后一种方法(此版本只是一个优化)。

 /* Integration - Simpson method - Second parallel version*/
 
 #include <stdio.h>
 #include <math.h>
 #include <mpi.h>
 #define n 10000
 #define LIMIT_PROC 4
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 double f (double x);
 int Get_the_number (int size);  // Function which gives the number of iterations
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 int Get_the_number (int size) {
   int number;
   if (size > 1) {
     if (size > LIMIT_PROC)
       number = (n / 2 - 1) / (size - 1);
     else
       number = (n / 2 - 1) / size;
   }
   return number;
 }
 
 void master (void) {
   double integration, a, b, sum;
   int i, size, number;
   double mydomain[2];
   MPI_Status status;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   number = Get_the_number (size);
   printf ("Enter the domain of integration: ");
   scanf ("%lf %lf", &a, &b);
   mydomain[1] = (b - a) / n;
   //The work is divided
   mydomain[0] = a + mydomain[1];
   for (i = 0; i < size - 1; i++) {
     mydomain[0] = a + (2 * i * number + 1) * mydomain[1];
     MPI_Send (mydomain, 2, MPI_DOUBLE, i + 1, MSG_DATA, MPI_COMM_WORLD);
   }
   //The master complete the work
   for (i = (size - 1) * number + 1, integration = 0; i < n / 2; i++)
     integration += 2 * f (a + (2 * i - 1) * mydomain[1]) + f (a + 2 * i * mydomain[1]);
   //The master receive the result
   for (i = 1; i < size; i++) {
     MPI_Recv (&sum, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MSG_RESULT, MPI_COMM_WORLD, &status);
     integration += sum;
   }
   integration *= 2;
   integration += f (a) + f (b) + 4 * f (b - mydomain[1]);
   integration *= mydomain[1] / 3;
   printf ("%.15lf\n", integration);
 }
 
 void slave (void) {
   double sum;
   int i, size, number;
   double mydomain[2];
   MPI_Status status;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   number = Get_the_number (size);
   MPI_Recv (mydomain, 2, MPI_DOUBLE, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0, sum = 0; i < number; i++)
     sum+=2*f(mydomain[0]+(2*i)*mydomain[1])+f(mydomain[0]+(2*i+1)*mydomain[1]);
   MPI_Send (&sum, 1, MPI_DOUBLE, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

向量和矩阵的乘积

[edit | edit source]

本程序的目的是计算向量和矩阵的乘积。在 C 编程中,顺序版本是

 /* Product of a vector by a matrix - Sequential version */
 
 #include <stdio.h>
 #include <mpi.h>
 #define N 10000
 
 void writecolumn (float *A);
 
 int main (int argc, char **argv) {
   float *A = (float *) malloc (N * N * sizeof (float)), B[N], C[N];
   unsigned int i, j, step;
   int size;
   //Initialization
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       *(A + i * N + j) = i + j;
     B[i] = 1;
   }
   //Product of the matrix and the vector
   for (i = 0; i < N; ++i)
     for (j = 0, C[i] = 0; j < N; j++)
       C[i] += *(A + i * N + j) * B[j];
   writecolumn(C);
   free (A);
   return 0;
 }
 
 void writecolumn (float *A) {
   int i;
   for (i = 0; i < N; i++)
   printf ("%f\n", *(A + i));
 }

第一版 - 广播

[edit | edit source]

为了并行化此程序,将把矩阵分成几组行,并将每一部分发送到不同的从属进程,同时将向量发送到每个从属进程(见下图)。

将数据从矩阵分配到进程

可以使用 MPI_Bcast 函数将相同的数据发送到每个从属进程,该函数必须由所有进程调用。

 /* Product of a vector by a matrix - First parallel version */
 
 #include <stdio.h>
 #include <mpi.h>
 #define N 10000
 #define LIMIT_PROC 4
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void writecolumn (float *A);
 int Get_the_number (int size);
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 int Get_the_number (int size) {
   int number;
   if (size > 1) {
     if (size > LIMIT_PROC)
       number = N / (size - 1);
     else
       number = N / size;
   }
   return number;
 }
 
 void master (void) {
   float *A = (float *) malloc (N * N * sizeof (float)), B[N], C[N], *buff;
   unsigned int i, j, step;
   int size;
   MPI_Status status;
   //Initialization
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       *(A + i * N + j) = i + j;
     B[i] = 1;
   }
   //Division of work
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = Get_the_number (size);
   //Broadcast of the vector
   MPI_Bcast (B, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   for (i = 1; i < size; i++)
     MPI_Send (A + (i - 1) * step * N, N * step, MPI_FLOAT, i, MSG_DATA, MPI_COMM_WORLD);
   //Finishing the work
   for (i = (size - 1) * step; i < N; ++i)
     for (j = 0, C[i] = 0; j < N; j++)
       C[i] += *(A + i * N + j) * B[j];
   buff = (float *) malloc (step * sizeof (float));
   //Receive and reorder
   for (i = 1; i < size; i++) {
     MPI_Recv (buff, step, MPI_FLOAT, MPI_ANY_SOURCE, MSG_RESULT,
     MPI_COMM_WORLD, &status);
     for (j = 0; j < step; j++)
       C[j + (status.MPI_SOURCE - 1) * step] = buff[j];
   }
   writecolumn(C);
   free (A);
   free (buff);
 }
 
 void slave (void) {
   float Vect[N];
   unsigned int i, j, step;
   int size;
   MPI_Status status;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = Get_the_number (size);
   float *result = (float *) malloc (step * sizeof (float));
   float *partmat = (float *) malloc (step * N * sizeof (float));
   MPI_Bcast (Vect, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   MPI_Recv (partmat, N * step, MPI_FLOAT, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0; i < step; ++i)
     for (j = 0, result[i] = 0; j < N; j++)
       result[i] += Vect[j] ** (partmat + i * N + j);
   MPI_Send (result, step, MPI_FLOAT, 0, MSG_RESULT, MPI_COMM_WORLD);
   free (partmat);
   free (result);
 }

MPI_Bcast 函数所需的参数比 MPI_SendMPI_Recv 函数少。MPI_Bcast 函数需要包含或将要包含广播数据的缓冲区的地址、缓冲区中的条目数、这些类型、广播根节点的等级(将发送数据的进程)以及通信器。没有标签或状态参数,因为它们用于区分消息并提供有关接收数据的的信息,在本例中,所有进程都参与了广播消息(因此,无需区分消息),并且状态中存储的信息是已知的(源是广播根节点,长度是条目数,没有标签)。

当主进程收到数据时,它必须重新排序数据,因为接收顺序未指定(MPI_ANY_SOURCE)。为此,状态结构包含消息的源,该源用于重新排序和组合向量。

在本例中,矩阵存储在一维数组中,而不是传统的二维数组。可以使用 MPI 发送多维数组,但必须仔细分配。也就是说,数组必须连续存储,因为发送函数的第一个参数是起始缓冲区的地址,第二个参数指定将发送的后续值的数目。然后,静态和动态分配是

 /* Static allocation */
 int array[rows][columns];

 /* Dynamic allocation */
 int **array;
 array = (int**) malloc (rows * sizeof(int*));
 if (array == NULL) exit (-1);
 
 array[0] = (int*) malloc (rows * columns * sizeof(int));
 if (array[0] == NULL) exit (-1);
 
 for (i=1; i<rows; i++)
   array[i] = array[0] + i * columns;
 
 /* Send either kind of array */
 MPI_Send (array[0],rows*columns,MPI_INT,0,0,MPI_COMM_WORLD);

第二版 - 散布和收集

[edit | edit source]

MPI 规范提供了集体通信函数,MPI_Bcast 函数是其中之一。本程序的下一个版本将使用“散布”和“收集”函数,它们分别将不同的数据发送给每个人,并从每个人接收不同的数据(见下图)。

散布和收集函数的原理

因此,工作将以略微不同的方式进行分割,也就是说,主进程也将参与计算。

 /* Product of a vector by a matrix - Second parallel version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 5000
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void writecolumn (float *A);
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void master (void) {
   float *A = (float *) malloc (N * N * sizeof (float)), B[N], C[N];
   unsigned int i, j, step;
   int size;
   MPI_Status status;
   //Initialization
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       *(A + i * N + j) = i + j;
     B[i] = 1;
   }
   //Division of the work
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = N / size;
   float *result = (float *) malloc (step * sizeof (float));
   float *partmat = (float *) malloc (step * N * sizeof (float));
   //Sending the data
   MPI_Bcast (B, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   //Each processes will receive a different part of the matrix
   MPI_Scatter (A, N * step, MPI_FLOAT, partmat, N * step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   //The master make its part (the same code is used for the slaves)
   for (i = 0; i < step; ++i)
     for (j = 0, result[i] = 0; j < N; j++)
       result[i] += *(partmat + i * N + j) * B[j];
   //It finish the work
   for (i = size * step; i < N; ++i)
     for (j = 0, C[i] = 0; j < N; j++)
       C[i] += *(A + i * N + j) * B[j];
   //Receiving the data in the good order
   MPI_Gather (result, step, MPI_FLOAT, C, step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   writecolumn(C);
   free (partmat);
   free (result);
   free (A);
 }
 
 void slave (void) {
   float Vect[N], *A, *C; //C and A are not used by slaves but must be declared
   unsigned int i, j, step;
   int size;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = N / size;
   float *result = (float *) malloc (step * sizeof (float));
   float *partmat = (float *) malloc (step * N * sizeof (float));
   //Receiving the data
   MPI_Bcast (Vect, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   MPI_Scatter (A, N * step, MPI_FLOAT, partmat, N * step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   //Computation
   for (i = 0; i < step; ++i)
     for (j = 0, result[i] = 0; j < N; j++)
       result[i] += Vect[j] ** (partmat + i * N + j);
   //Sending the result
   MPI_Gather (result, step, MPI_FLOAT, C, step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   free (partmat);
   free (result);
 }
 
 void writecolumn (float *A) {
   int i;
   for (i = 0; i < N; i++)
   printf ("%f\n", *(A + i));
 }

对函数 MPI_SCATTER 和 MPI_GATHER 的调用对主进程和从属进程来说是相同的(强调的参数被从属进程忽略)。参数如下:

MPI_Scatter MPI_Gather
发送缓冲区的地址 发送缓冲区的地址
每个进程发送的元素数 发送缓冲区中的元素数
发送缓冲区元素的数据类型 发送缓冲区元素的数据类型
接收缓冲区的地址 接收缓冲区的地址
接收缓冲区中发送的元素数 任何单个接收的元素数
接收缓冲区元素的数据类型 接收缓冲区元素的数据类型
发送进程的等级 接收进程的等级
通信器 通信器

请注意,在第一种情况下,主进程将数据发送给自己,在第二种情况下,主进程从自己接收数据。在本例中,矩阵按进程数进行分割,主进程将第一部分发送给自己,将第二部分发送给第一个从属进程,等等。之后,主进程从自己接收结果向量的第一部分,从第一个从属进程接收第二部分,等等。

元胞自动机

[edit | edit source]

在本例中,我们将编写一个元胞自动机。元胞自动机是对矩阵的演化,我们对其执行定义的函数,直到收敛或出现循环现象。该函数根据其四个邻居的值,为矩阵的每个点返回一个值。矩阵的边界将与其相对边的边界具有相同的值。生成的矩阵将是一个环面。在 C 编程中,顺序版本是

 /* Cellular automata - Sequential version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #define N 62
 #define MAX_ITERATION N //Number of iteration
 
 void init_mat (int *A, int length);
 void writemat (int *A);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* A=(int*)malloc(N*N*sizeof(int));
   int* B=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int i,j,k=0;
   init_mat (A,N);
   while(k++<MAX_ITERATION) {
   //Calculation of the new matrix in a temporary matrix
     for (i=0;i<N-2;i++)
       for (j=0;j<N-2;j++)
         *(B+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     //Copy of the new matrix
     for (i=1;i<N-1;i++)
       for (j=1;j<N-1;j++)
         *(A+i*N+j)=*(B+(i-1)*N+j-1);
     //Update of the border
     for (i=1;i<N-1;i++) {
       *(A+i)=*(A+(N-2)*N+i);
       *(A+(N-1)*N+i)=*(A+N+i);
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
   }
   writemat(A);
   return 0;
 }
 
 void init_mat (int *A, int length) {
   int i,j;
   for (i=0;i<length;i++)
     for (j=0;j<length;j++)
       *(A+i*length+j)=i+j>length/2&&i+j<length*3/2;
   //Each border of the matrix will have the same value than the border of his opposite side
   for (i=1;i<length-1;i++) {
     *(A+i)=*(A+(length-2)*length+i);
     *(A+(length-1)*length+i)=*(A+length+i);
     *(A+i*length)=*(A+i*length+length-2);
     *(A+i*length+length-1)=*(A+i*length+1);
   }
 }
 
 void writemat (int *A) {
   int i,j;
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       printf ("%d ", *(A + i*N +j));
     printf("\n");
   }
 }
 
 int f (int x1,int x2,int x3,int x4) {
   return x1+x2+x3+x4>2;
 }

第一版 - 死锁

[edit | edit source]

矩阵将按行组进行分割,如图 [fig:vect] 所示。为了计算矩阵的新部分,应用的函数需要属于矩阵其他部分的点的值,然后,每个进程将计算新的矩阵并将第一行和最后一行与它的两个邻居交换(见下图)。

矩阵的划分和进程之间的通信

在进程共享数据时,可能会发生死锁。死锁是程序的阻塞状态,原因是由于一个进程正在等待另一个进程执行特定的操作,但后者进程正在等待前者进程执行特定的操作。这种现象会导致永久等待状态(见下图)。

死锁示例

这段代码可能会出现死锁,因为 MPI_SEND 和 MPI_RECV 函数会阻塞程序,直到它们完成,也就是说,直到接收方收到发送的消息或发送方将要接收的消息发送出去。如果所有进程执行相同的代码(排除主进程),进程一将向进程二发送数据,进程二将向进程三发送数据,等等。进程一将等待进程二接收其数据,但这永远不会发生,因为进程二也在等待进程三接收其数据。由于通信是循环的,程序被阻塞。作为第一个解决方案,我们可以在每个偶数进程上更改发送和接收的顺序。

 /* Cellular automata - First parallel version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 62 //Must be multiple of number of processes (which must be > 1)
 #define MAX_ITERATION N
 #define MSG_DATA 100
 
 void init_mat (int *A, int length);
 void writemat (int *A, int length);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* result=(int*)malloc(N*N*sizeof(int));
   int* A=(int*)malloc(N*N*sizeof(int));
   int* tmp=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int step;
   int myrank,size,left,right;
   MPI_Status status;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   int i,j,k=0;
   step = (N-2)/size;
   //Initialization by the master
   if (!myrank) init_mat(result,N);
   MPI_Scatter (result+N,step*N,MPI_INT,A+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   //The next and the previous processes are determined
   left=(myrank-1+size)%size; right=(myrank+1)%size;
   while(k++<MAX_ITERATION) {
     //The order of exchanging the data is decided
     if (myrank%2) {
       MPI_Send (A+N,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD);
       MPI_Recv (A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,&status);
       MPI_Send (A+(step)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD);
       MPI_Recv (A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
     } else {
       MPI_Recv (A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,&status);
       MPI_Send (A+N,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD);
       MPI_Recv (A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
       MPI_Send (A+(step)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD);
     }
     for (i=1;i<step-1;i++) {
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
     for (i=0;i<step;i++)
       for (j=0;j<N-2;j++)
         *(tmp+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     for (i=1;i<step+1;i++)
       for (j=1;j<N-1;j++)
         *(A+i*N+j)=*(tmp+(i-1)*N+j-1);
   }
   MPI_Gather (A+N,step*N,MPI_INT,result+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   //The master recompose the matrix and print it
   if (!myrank) {
     for (i=1;i<N-1;i++) {
       *(result+i)=*(result+(N-2)*N+i);
       *(result+(N-1)*N+i)=*(result+N+i);
       *(result+i*N)=*(result+i*N+N-2);
       *(result+i*N+N-1)=*(result+i*N+1);
     }
     writemat(result,N);
   }
   MPI_Finalize ();
   return 0;
 }

在这个例子中,主进程和从进程的代码几乎相同,所以没有必要将代码分成两个函数(主进程特有的代码只需在等级条件前加上即可)。

当主进程等待进程 1 的数据时,进程 1 将数据发送给主进程。然后它等待主进程的数据,而主进程将数据发送给它。代码对于所有进程将具有相同的行为,因此是安全的。

第二版 - 非阻塞发送和接收

[编辑 | 编辑源代码]

第二个解决方案是使用非阻塞发送和接收:这些函数被调用并立即将控制权交还给进程。然后,进程可以进行一些计算,当需要访问消息的数据时,它可以通过调用一些合适的函数来完成它。使用非阻塞通信函数的程序可以更快。在这个第二个版本中,所有函数将在所有发送和所有接收开始后立即完成。

在前面的例子中,矩阵的维度应该是进程数量的倍数,因为剩余的行没有发送到任何进程。在这个版本中,主进程将这些行发送到最后一个进程,并分别接收结果行。

 /* Cellular automata - Second parallel version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 62 //Must be multiple of number of processes
 #define MAX_ITERATION N
 #define MSG_DATA 100
 
 void init_mat (int *A, int length);
 void writemat (int *A, int length);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* result=(int*)malloc(N*N*sizeof(int));
   int* A=(int*)malloc(N*N*sizeof(int));
   int* tmp=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int step;
   int myrank,size,left,right;
   MPI_Status status;
   MPI_Request request[4];
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   int i,j,k=0;
   step = (N-2)/size;
   if (!myrank) init_mat(result,N);
   MPI_Scatter (result+N,step*N,MPI_INT,A+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   //Code allowing any dimension of the matrix
   if (myrank==0&&(N-2)%size) MPI_Send (result+N*(step*size+1),(N-2-step*size)*N,MPI_INT,size-1,MSG_DATA,MPI_COMM_WORLD);
   else if (myrank==size-1&&(N-2)%size) {
     MPI_Recv (A+N*(step+1),(N-2-step*size)*N,MPI_INT,0,MSG_DATA,MPI_COMM_WORLD,&status);
     step+=N-2-step*size;
   }
   left=(myrank-1+size)%size; right=(myrank+1)%size;
   while(k++<MAX_ITERATION) {
     //The data begin to be shared
     MPI_Isend (A+N,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,request);
     MPI_Irecv (A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,request+1);
     MPI_Isend (A+(step)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,request+2);
     MPI_Irecv (A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,request+3);
     //The transfer of data are completed
     for (i=0;i<4;i++)
       MPI_Wait(request+i,&status);
     for (i=1;i<step-1;i++) {
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
     for (i=0;i<step;i++)
       for (j=0;j<N-2;j++)
         *(tmp+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     for (i=1;i<step+1;i++)
       for (j=1;j<N-1;j++)
       *(A+i*N+j)=*(tmp+(i-1)*N+j-1);
   }
   if (myrank==size-1&&(N-2)%size) {
     step=(N-2)/size;
     MPI_Send (A+N*(step+1),(N-2-step*size)*N,MPI_INT,0,MSG_RESULT,MPI_COMM_WORLD);
   } else if (myrank==0&&(N-2)%size)
     MPI_Recv (result+N*(step*size+1),(N-2-step*size)*N,MPI_INT,size-1,MSG_RESULT,MPI_COMM_WORLD,&status);
   MPI_Gather (A+N,step*N,MPI_INT,result+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   if (!myrank) {
     for (i=1;i<N-1;i++) {
       *(result+i)=*(result+(N-2)*N+i);
       *(result+(N-1)*N+i)=*(result+N+i);
       *(result+i*N)=*(result+i*N+N-2);
       *(result+i*N+N-1)=*(result+i*N+1);
     }
     writemat(result,N);
   }
   MPI_Finalize ();
   return 0;
 }

MPI_Isend 和 MPI_Irecv(I 代表立即)函数需要一个额外的参数:一个 MPI_Request 变量的地址,该变量将标识以后完成的请求。状态参数将传递给完成函数。MPI_WAIT 用于完成对先前函数的调用(由第一个参数标识),并将等待直到函数真正完成。

第三版 - Sendrecv

[编辑 | 编辑源代码]

下一个版本使用一个 send-receive 函数:它等效于并行调用 send 和 receive 函数。只有 while 循环不同。

 while(k++<MAX_ITERATION) {
   //The transfers are simultaneous
   MPI_Sendrecv(A+N,N,MPI_INT,left,MSG_DATA,A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,&status);
   MPI_Sendrecv(A+(step)*N,N,MPI_INT,right,MSG_DATA,A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
   for (i=1;i<step-1;i++) {
     *(A+i*N)=*(A+i*N+N-2);
     *(A+i*N+N-1)=*(A+i*N+1);
   }
   for (i=0;i<step;i++)
     for (j=0;j<N-2;j++)
       *(tmp+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
   for (i=1;i<step+1;i++)
     for (j=1;j<N-1;j++)
       *(A+i*N+j)=*(tmp+(i-1)*N+j-1);
 }

MPI_SENDRECV 函数需要参数,这些参数本质上与 MPI_SEND 和 MPI_RECV 函数所需的相同。

第四版 - 环形拓扑

[编辑 | 编辑源代码]

最后一个版本与其他版本有根本的不同,它使用的是环形拓扑。一个新的矩阵在进程 0 上计算,然后发送给进程 1。经过一次迭代,该进程将其发送给进程 2,进程 2 计算新的矩阵并将其再次发送给下一个进程。最后一个进程将矩阵返回给主进程,循环将继续,直到迭代次数达到定义的常量(参见下图)。

环形拓扑中的交互

使用这种拓扑,可以同时计算几个不同的数据。虽然在这个例子中对数据应用的操作是相同的,但进程可以执行不同类型的操作。只有一个矩阵将在环中发送。然而,可以(这也是这种结构的优点)在发送第一个矩阵后立即发送第二个矩阵。

 /* Cellular automata - Parallel version using a ring topology */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 62
 #define MAX_ITERATION N
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void init_mat (int *A, int length);
 void writemat (int *A, int length);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* A=(int*)malloc((N*N+1)*sizeof(int));
   int* B=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int myrank,size,left,right;
   MPI_Status status;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   int i,j,k=0;
   right=(myrank+1)%size; left=(myrank-1+size)%size;
   if (!myrank) init_mat(A,N);
   else MPI_Recv (A,N*N+1,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
   while (*(A+N*N)<MAX_ITERATION) {
     for (i=0;i<N-2;i++)
       for (j=0;j<N-2;j++)
         *(B+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     for (i=1;i<N-1;i++)
       for (j=1;j<N-1;j++)
         *(A+i*N+j)=*(B+(i-1)*N+j-1);
     for (i=1;i<N-1;i++) {
       *(A+i)=*(A+(N-2)*N+i);
       *(A+(N-1)*N+i)=*(A+N+i);
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
     //The number of iteration is incremented
     (*(A+N*N))++;
     //The matrix is sent and another is receive
     MPI_Sendrecv_replace(A,N*N+1,MPI_INT,right,MSG_DATA,left,MSG_DATA,MPI_COMM_WORLD,&status);
   }
   //When the number of iteration reach the maximum, the program stop
   if ((*(A+N*N))++==MAX_ITERATION) {
     MPI_Sendrecv_replace(A,N*N+1,MPI_INT,right,MSG_DATA,left,MSG_DATA,MPI_COMM_WORLD,&status);
     writemat(A,N);
   } else MPI_Send (A,N*N+1,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD);
     MPI_Finalize ();
   return 0;
 }

迭代次数与矩阵一起存储在一个额外的元素中。它将用于确定何时停止循环。当一个进程达到最大迭代次数时,矩阵将被传输到所有其他进程。这实际上是必要的,因为如果它们不停止,它们将继续循环,永远不会停止。

MPI_SENDRECV_REPLACE 函数类似于 MPI_SENDRECV 函数,但它使用相同的缓冲区来发送和接收数据。

该程序将使用有限元方法计算函数在多个点处的导数的数值。公式是:

然而,在数值上很难除以 0,并且 h 不应任意选择(参见下图)。

有限元方法的精度随 h 变化的示意图

h 的最佳值是未知的,因此,算法将减少 h 的初始值,直到精度开始下降(使用不同的 h 计算的两个导数之间的差异必须下降)。在 C 编程中,顺序版本是

 /* Derivate - Sequential version */
 
 #include <stdio.h>
 #include <math.h>
 #define H 1 //first h
 #define DEC 1.4 //decrease
 #define N 500
 
 double f (double x);
 
 int main (int argc, char **argv) {
   double x,deriv,diffnew,diffold,h;
   double result;
   int i;
   for (i=0,x=i;i<N;x=++i) {
     h=H;
     deriv=(f(x+h)-f(x-h))/(2*h);
     diffnew=100;
     diffold=2*diffnew;
     while (fabs(diffnew)<fabs(diffold)) {
       h/=DEC;
       result=deriv;
       diffold=diffnew;
       deriv=(f(x+h)-f(x-h))/(2*h);
       diffnew=deriv-result;
     }
   printf ("%lf: %.15lf\n",x,result);
   }
   return 0;
 }
 
 double f (double x) {
   return x*x*x+x*x;
 }

第一版 - 农场拓扑

[编辑 | 编辑源代码]

由于循环次数是可变的,因此执行时间无法预测。该程序将一个要计算的导数发送给每个进程,当一个进程完成(不一定是最先完成的)时,它会立即将数据重新发送给该进程。这些数据和结果被收集到两个数组中(deriv[] 和 result[])。

 /* Derivate - Parallel version using farming topology */
 
 #include <stdio.h>
 #include <mpi.h>
 #include <math.h>
 
 #define H 1 //first h
 #define DEC 1.4 //decrease (1.4)
 #define N 500
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 double f (double x);
 void init_deriv(double *deriv, double x);
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void init_deriv(double *deriv, double x) {
   deriv[0]=x;
   deriv[4]=H;
   deriv[1]=(f(deriv[0]+deriv[4])-f(deriv[0]-deriv[4]))/(2*deriv[4]);
   deriv[2]=100;
   deriv[3]=2*deriv[2];
 }
 
 void master(void) {
   double deriv[5],result[2]; //x,deriv,diffnew,diffold,h
   int i;
   int size;
   MPI_Status status;
   MPI_Comm_size(MPI_COMM_WORLD,&size);
   //Sending data to all processes
   for (i=1;i<size;i++) {
     init_deriv(deriv,1);
     MPI_Send(deriv,5,MPI_DOUBLE,i,MSG_DATA,MPI_COMM_WORLD);
   }
   //When a result is received, another task is sent
   for (;i<N;++i) {
     init_deriv(deriv,1);
     MPI_Recv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&status);
     MPI_Send(deriv,5,MPI_DOUBLE,status.MPI_SOURCE,MSG_DATA,MPI_COMM_WORLD);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
   deriv[4]=10*H;
   //The current task are completed
   for (i=1;i<size;i++) {
     MPI_Recv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&status);
     MPI_Send(deriv,5,MPI_DOUBLE,status.MPI_SOURCE,MSG_DATA,MPI_COMM_WORLD);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
 }
 
 void slave(void) {
   double deriv[5],result[2]; //x,deriv,diffnew,diffold,h
   MPI_Status status;
   MPI_Recv(deriv,5,MPI_DOUBLE,0,MSG_DATA,MPI_COMM_WORLD,&status);
   while (deriv[4]<5*H) {
     while (fabs(deriv[2])<fabs(deriv[3])) {
       deriv[4]/=DEC;
       result[1]=deriv[1];
       deriv[3]=deriv[2];
       deriv[1]=(f(deriv[0]+deriv[4])-f(deriv[0]-deriv[4]))/(2*deriv[4]);
       deriv[2]=deriv[1]-result[1];
     }
   result[0]=deriv[0];
   MPI_Send (result,2,MPI_DOUBLE,0,MSG_RESULT,MPI_COMM_WORLD);
   MPI_Recv (deriv,5,MPI_DOUBLE,0,MSG_DATA,MPI_COMM_WORLD,&status);
   }
 }

第二版 - 发送和接收的完成

[编辑 | 编辑源代码]

第二种拓扑看起来像环形拓扑,并引入了新的例程:MPI_REQUEST_FREE。主进程将始终将数据发送给进程 1,并从任何其他进程接收结果(参见下图)。

使用拓扑的示意图

由于从进程的代码类似于元胞自动机的代码(在使用环形拓扑的版本中),因此只描述主进程函数。为了避免死锁,只使用非阻塞发送(非阻塞接收在这里用于提高性能)。

 void master(void) {
   double deriv[5],result[2]; //x,deriv,diffnew,diffold,h
   int i;
   int size;
   MPI_Status status;
   MPI_Request request[N],req;
   MPI_Comm_size(MPI_COMM_WORLD,&size);
   //The master fill the ring by sending enough data
   for (i=1;i<size;i++) {
     init_deriv(deriv,i);
     MPI_Isend(deriv,5,MPI_DOUBLE,1,MSG_DATA,MPI_COMM_WORLD,request+i);
   }
   //When a result is received, another data is sent in the ring
   for (;i<N;++i) {
     MPI_Irecv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&req);
     init_deriv(deriv,i);
     MPI_Wait (&req,&status);
     MPI_Request_free (request+(int)result[0]);
     MPI_Isend(deriv,5,MPI_DOUBLE,1,MSG_DATA,MPI_COMM_WORLD,request+i);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
   //The current task are completed
   for (i=1;i<size;i++) {
     MPI_Recv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&status);
     MPI_Request_free (request+(int)result[0]);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
 }

当主进程完成发送足够的数据(对于所有进程)时,它执行一个新的循环。它接收一个结果,并同时初始化一个新的数据,然后它完成发送导致它刚刚接收的结果的数据,并将一个新的数据重新发送到环中。实际上,如果主进程接收数据的结果,这意味着该数据的发送已经完成,不需要再完成发送。MPI_REQUEST_FREE 函数允许释放缓冲的请求,而无需检查该请求何时逻辑上完成。

通信器和组

[编辑 | 编辑源代码]

集体通信(如 MPI_BCAST、MPI_SCATTER 和 MPI_GATHER)将数据发送给属于同一组的进程。MPI 提供管理组和通信器的函数。这允许在指定进程上使用集体通信,从而将网络分成可以执行不同任务的组。

例如,可以在矩阵上执行不同的操作(迹、转置、求逆、对角化)。对于每个操作,将定义一个组,每个进程将使用集体通信与其组中的其他进程通信,而不会干扰其他组。

通信器是一个对象,它指定了通信域。它与一个组相关联。一个组是从现有的组创建的,初始组是包含所有进程的通信器 MPI_COMM_WORLD 的组。

第一版 - 从组创建通信器

[编辑 | 编辑源代码]

创建新通信器的第一种方法是从该第一个组创建组,然后创建这些组的通信器。

 /* Example of use of group */
 
 #include <stdio.h>
 #include <mpi.h>
 
 int main (int argc, char *argv[]) {
   int rank[50],size,i,myrank;
   char data[10]="";
   MPI_Group group,newgroup;
   MPI_Comm newcomm;
   MPI_Init (&argc,&argv);
   //group of MPI_COMM_WORLD which contains all the processes
   MPI_Comm_group (MPI_COMM_WORLD,&group);
   MPI_Group_size (group,&size);
   //Creation of the communicator containing even-ranked processes
   for (i=0;i<size/2;i++)
     rank[i]=2*i; if (size%2) rank[size/2]=size-1;
   MPI_Group_incl (group,(size+1)/2,rank,&newgroup);
   MPI_Comm_create (MPI_COMM_WORLD,newgroup,&newcomm);
   //Only the processes belonging to the group of newcomm will participate to the broadcast on this communicator
   MPI_Comm_rank (MPI_COMM_WORLD,&myrank);
   if (!(myrank%2)) MPI_Bcast (data,10,MPI_INT,0,newcomm);
   MPI_Group_free (&group);
   MPI_Finalize ();
   return 0;
 }

在这个程序中,数据只在偶数等级的进程上广播。这是通过创建一个包含偶数等级进程组的通信器来实现的。首先,由 MPI_COMM_GROUP 例程给出 MPI_COMM_WORLD 的初始组。新创建的组包含 个在初始组中具有偶数等级的进程。MPI_GROUP_INCL 函数需要一个数组,该数组包含将属于新组的第一个组中的进程的等级。然后,MPI_COMM_CREATE 创建新组的通信器,如果进程具有奇数等级,它将参与广播。该组由 MPI_GROUP_FREE 释放。

MPI_GROUP_SIZE 和 MPI_COMM_RANK 函数类似于 MPI_COMM_SIZE 和 MPI_GROUP_RANK。这些只是快捷方式

MPI_Comm_size (comm, &size); MPI_Comm_rank (comm, &rank);
MPI_Comm_group(comm, &group); MPI_Comm_group(comm, &group);
MPI_Group_size(group, &size); MPI_Group_rank(group, &rank);
MPI_Group_free(&group); MPI_Group_free(&group);

但是,如果调用 MPI_GROUP_RANK 的进程不属于函数参数中指定的组,则 rank 中返回的值将是 MPI_UNDEFINED,而调用 MPI_COMM_RANK 将无法工作,并将停止程序。

第二版 - 从通信器创建通信器

[编辑 | 编辑源代码]

也可以不使用组来创建通信器,而是直接通过拆分现有通信器来创建通信器。

 int main (int argc, char *argv[]) {
   int color,i,myrank;
   char data[10]="";
   MPI_Comm newcomm;
   MPI_Init (&argc,&argv);
   MPI_Comm_rank (MPI_COMM_WORLD,&myrank);
   //Creation of the communicator containing even-ranked processes
   if (!(myrank%2)) color=0;
   else color=1;
   MPI_Comm_split (MPI_COMM_WORLD,color,1,&newcomm);
   //Only the processes having even rank will participate to the broadcast on this communicator
   if (!(myrank%2)) MPI_Bcast (data,10,MPI_INT,0,newcomm);
   MPI_Finalize ();
   return 0;
 }

所有进程将被分类到一个新的通信器中,这取决于其颜色。也就是说,如果两个进程的颜色等于 1,而一个进程的颜色等于 2,那么将创建两个通信器,每个组一个。MPI_COMM_SPLIT 所需的第三个参数用于在新通信器中对进程进行排序(这将改变新进程的等级)。在这里,所有进程都使用相同的 value。

环境管理

[编辑 | 编辑源代码]

最后,环境管理函数用于获取进程运行的节点的名称或测量程序的一部分所需的时间(例如)。

 /* Example of use of environmental function */
 
 #include <stdio.h>
 #include <mpi.h>
 
 int main (int argc, char *argv[]) {
   double starttime,endtime;
   int namelen;
   char processor_name[MPI_MAX_PROCESSOR_NAME];
 
   MPI_Init(&argc,&argv);
   MPI_Get_processor_name(processor_name,&namelen);
   printf ("Process %s\n", processor_name);
 
   starttime = MPI_Wtime();
   //Here some computation or communication
   endtime = MPI_Wtime();
   printf ("Time: %lf\n",endtime-starttime);
 
   MPI_Finalize ();
   return 0;
 }

其他可能性

[编辑 | 编辑源代码]

还有许多未提及的其他函数:存在不同类型的点对点通信函数;MPI 允许用户通过许多不同的函数来定义数据类型;还有其他集体通信和归约函数;以及更多用于使用通信器和组的函数。

MPI 定义了许多有用的常量,其中包括 MPI_ANY_SOURCEMPI_INTMPI_UNDEFINED 等。

MPI 也可以以高级方式用于管理进程拓扑。也就是说,MPI 实现可以通过例如优先在最快的连接上传输消息来利用物理网络的特性(对于异构网络)。

基本函数的原型

[编辑 | 编辑源代码]

有关这些 MPI 函数的更多信息,请参阅手册页。

int MPI_Init(int *argc, char ***argv);
int MPI_Finalize(void);
int MPI_Comm_rank(MPI_Comm comm, int *rank);
int MPI_Comm_size(MPI_Comm comm, int *size);
int MPI_Send(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm);
int MPI_Recv(void* buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status);
int MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count);
int MPI_Type_extent(MPI_Datatype datatype, MPI_Aint *extent);
int MPI_Type_struct(int count, int *array_of_blocklengths, MPI_Aint *array_of_displacements, MPI_Datatype *array_of_types, MPI_Datatype *newtype);
int MPI_Bcast(void* buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm);
int MPI_Scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvcount, int root, MPI_Comm comm);
int MPI_Gather(void* sendbuf, int sendcount, MPI_Datatype sendype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm);
int MPI_ISend(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request);
int MPI_Irecv(void* buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request *request);
int MPI_Sendrecv(void* sendbuf, int sendcount, MPI_Datatype datatype, int dest, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status *status);
int MPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype, int dest, int sendtag, int source, int recvtag, MPI_Comm comm, MPI_Status *status);
int MPI_Wait(MPI_Request *request, MPI_Status *status);
int MPI_Request_free(MPI_Request *request);
int MPI_Group_rank(MPI_Group group, int *rank);
int MPI_Group_size(MPI_Group group, int *size);
int MPI_Comm_group(MPI_Comm comm, MPI_Group *group);
int MPI_Group_free(MPI_Group *group);
int MPI_Group_incl(MPI_Group *group, int n, int *ranks, MPI_Group *newgroup);
int MPI_Comm_create(MPI_Comm comm, MPI_Group group, MPI_Comm *newgroup);
int MPI_Comm_split(MPI_Comm comm, int color, int key, MPI_Comm *newcomm);
int MPI_Wtime(void);
int MPI_Get_processor_name(char *name, int *resultlen);

与 C 编程语言中的常见做法一样,“数组的地址”实际上是指数组第一个元素的地址。

先决条件

[编辑 | 编辑源代码]

进一步阅读

[编辑 | 编辑源代码]
华夏公益教科书