跳转到内容

消息传递接口

25% developed
来自维基教科书,开放世界中的开放书籍

本指南假设您之前了解 C 编程,并通过几个示例为您介绍消息传递接口 (MPI)。

什么是 MPI ?

[编辑 | 编辑源代码]

MPI 是一个标准化且可移植的消息传递系统。消息传递系统主要用于具有独立内存的分布式机器上以执行并行应用程序。使用此系统,每个正在执行的进程将通过发送和接收消息来与其他进程通信并共享其数据。MPI 是 MPI 论坛的结果规范,该论坛涉及多个组织设计一个可移植系统(允许程序在异构网络上工作)。

由于数据只能通过交换消息来共享,因此该标准不适用于共享内存系统,例如多处理器计算机(尽管它可以工作,但这不是主要目标,并且存在更强大的替代方案,例如OpenMP)。基本上,MPI 包括点对点通信、集体通信(跨进程网络)、进程组、Fortran 和 C 绑定以及其他高级功能。另一方面,该标准没有指定显式共享内存操作、调试工具、对线程的显式支持、I/O 函数。

当前版本为 2.0,尽管 1.1 仍在使用。

建议使用威廉·格罗普、尤因·拉斯克和安东尼·斯杰勒姆合著的 Using MPI: Portable Parallel Programming with the Message-Passing Interface 作为 MPI 入门。有关更完整的信息,请阅读MPI: The Complete Reference,作者为 Snir、Otto、Huss-Lederman、Walker 和 Dongarra。此外,标准本身可以在[1][2]找到。有关其他互联网参考,例如教程,请参阅开放目录项目或位于阿贡国家实验室的教程列表

目前,有两个主要实现:MPICH 和 LAM。后者的官方网站 ([3]) 包含有关 LAM 和 MPI 的许多信息。

补充库

[编辑 | 编辑源代码]

此外,还存在用于解决数值问题或在分布式机器上使用 I/O 函数的库。我们可以提到:ScaLAPACK 库、FFTW(一个可移植的 C FFT 库)、PETSc 科学计算库。可以在网站[4]上找到更完整的列表。

开发工具

[编辑 | 编辑源代码]

编译器

[编辑 | 编辑源代码]

每个实现都提供一个编译器,或者至少提供一个现有编译器的前端。

由于 MPI 没有指定调试工具,因此可以使用一些程序来实现此目的:XMPI 是 LAM/MPI 的运行/调试 GUI;MPI-CHECK 是 Fortran 的调试器;Etnus, Inc. 提供了 Totalview 调试器的版本,该版本支持 MPICH(以及其他一些 MPI 实现);Streamline Computing 的 Distributed Debugging Tool 与多个 MPI 实现一起使用,包括 MPICH 和 LAM/MPI。

基准测试

[编辑 | 编辑源代码]

Mpptest、SKaMPI 和 MPBench 测量 MPI 实现的性能。它们可用于确定使用哪个实现、如何实现可移植且高效的 MPI 程序,以及预测 MPI 程序的性能。

著名的流行 MPI 基准测试包括 NASA 并行基准测试、SpecMPI 2007 和 HPCC 基准测试套件。

它可以在大型异构 parc 上使用。

数组求和

[编辑 | 编辑源代码]

第一个简单示例的目的是计算数组中所有存储值的总和。在 C 编程中,顺序版本将如下所示

 
 /* Sum of an array - Sequential version */
 #include <stdio.h>
 #define N 100000
 
 int main (void) {
   float array[N];
   double mysum = 0;
   unsigned long long i;
   //Initialization of the array
   for (i = 0; i < N; i++)
     array[i] = i + 1;
   //Calculation of the sum
   for (i = 0; i < N; i++)
     mysum += array[i];
   printf ("%lf\n", mysum);
   return 0;
 }

第一个版本 - 基本点对点通信

[编辑 | 编辑源代码]

MPI 实现允许用户启动多个进程,每个进程都有一个定义的数字标识符,即其“秩”。按照惯例,我们将认为主进程是秩为“0”的进程。所有其他进程都是从进程。在这个程序中,主进程将数组划分为子数组并将它们发送给每个从进程。然后,每个从进程将计算其子数组的总和。如果数组的大小不是从进程数量的偶数倍,那么主进程将通过计算数组最后值的总和来完成剩余的工作(见下图)。

从数组到进程的数据分配

对于每个进程,程序的源代码是相同的,因此可执行代码也是相同的。因此,代码必须包含从进程和主进程部分。将执行的部分取决于进程的秩。

在这个第一个版本中,工作被分成两个部分

 /* Sum of an array - First parallel version */
 
 #include <stdio.h>
 #include <mpi.h>
 
 #define N 100000
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   //Initialization of MPI
   MPI_Init (&argc, &argv);
   //myrank will contain the rank of the process
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   //The part of the program which will be executed is decided
   if (myrank == 0)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void master (void) {
   float array[N];
   double mysum = 0, tmpsum;
   unsigned long long i;
   MPI_Status status;
   //Initialization of the array
   for (i = 0; i < N; i++)
     array[i] = i + 1;
   //The array is divided by two and each part is sent to the slaves
   MPI_Send (array, N / 2, MPI_FLOAT, 1, MSG_DATA, MPI_COMM_WORLD);
   MPI_Send (array + N / 2, N / 2, MPI_FLOAT, 2, MSG_DATA, MPI_COMM_WORLD);
   //The master receive the result from the slaves
   MPI_Recv (&tmpsum, 1, MPI_DOUBLE, 1, MSG_RESULT, MPI_COMM_WORLD, &status);
   mysum += tmpsum;
   MPI_Recv (&tmpsum, 1, MPI_DOUBLE, 2, MSG_RESULT, MPI_COMM_WORLD, &status);
   mysum += tmpsum;
   printf ("%lf\n", mysum);
 }
 
 void slave (void) {
   float array[N];
   double sum;
   unsigned long long i;
   MPI_Status status;
   //The slave receives the array from the master
   MPI_Recv (array, N / 2, MPI_FLOAT, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0, sum = 0; i < N / 2; i++)
     sum += array[i];
   //The result is send to the master
   MPI_Send (&sum, 1, MPI_DOUBLE, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

这个程序将逐步解释。

首先,为了使用 MPI 函数,我们必须包含文件mpi.h,其中包含所需函数的原型。数组的大小由N定义。MSG_DATAMSG_RESULT将由MPI_SendMPI_RECV函数使用。声明了两个函数的原型。这些函数分别包含主进程和从进程的代码。

主函数选择它应该执行主部分(如果进程秩为 0)还是从部分(对于任何其他进程秩)。该函数以MPI_INIT例程开始,该例程必须在任何其他 MPI 函数调用之前调用。此例程执行各种初始化函数。接下来,通过调用MPI_COMM_RANK函数获取进程的秩,并根据结果执行主部分或从部分。在计算结束时,将调用最终例程MPI_FINALIZE。它执行与结束 MPI 使用相关的各种管理任务。

主进程中包含了在程序的顺序版本中未找到的两个声明:tmpsum变量,它将存储两个从进程中的每个结果,以及status变量(其类型由 MPI 定义),它是MPI_RECV函数所需的。

现在,计算部分被对MPI_SENDMPI_RECV的两次调用取代。MPI_SEND函数的参数是发送的第一个元素的地址、发送的元素数量、它们的类型(最常见的 MPI 类型是MPI_INTMPI_CHARMPI_FLOATMPI_DOUBLE等)、接收者的秩、消息的标签以及通信器。在最简单的用法中,通信器将是MPI_COMM_WORLD,其中包括参与程序执行的所有进程。MPI_RECV函数的参数是接收缓冲区的地址、接收的最大元素数量、它们的类型、发送者的秩、消息的标签、通信器以及状态。对于一条常见的消息,类型、标签和通信器必须相同(见下图)。

MPI_SendMPI_RECV函数针对同一消息的参数

数组的前半部分被发送到第一个从进程,数组的后半部分被发送到第二个从进程。在每种情况下,子数组的大小为N/2。接下来,从从进程接收结果,存储并添加到最终总和中。

然而,在它能够接收从从进程计算出的总和之前,每个从进程必须首先对它从主进程接收的子数组的元素求和。然后,从进程将结果发送回主进程。

当多条消息以任意顺序发送到同一个进程时,标签参数允许该进程区分这些消息并接收它们。

要使用 lam 实现执行此程序,必须启动 lam 守护进程

$ lamboot

接下来,使用 mpicc 和 mpirun 进行编译和执行

$ mpicc ./source.c
$ mpirun -np 3 ./a.out

选项 -np 指定要启动的进程数量。在这个例子中,它必须是三个(一个主进程,两个从进程)。程序mpirun创建一个 MPI 环境,将a.out复制到所有三个处理器,并使用适当的处理器编号运行每个处理器。

第二个版本 - 对进程数量的自适应性

[编辑 | 编辑源代码]

这个基本程序有两个问题

  • 该程序需要两个从进程才能正确运行,并且无法自适应于可用进程的数量
  • 主进程以预定义的顺序(先第一个从进程,然后是第二个从进程)接收数据。然而,第二个从进程可能先于第一个从进程完成。

这导致了第二个版本

 /* Sum of an array - Second parallel version */
 
 #include <stdio.h>
 #include <mpi.h>
 
 #define N 100000
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void master (void) {
   float array[N];
   double mysum, tmpsum;
   unsigned long long step, i;
   int size;
   MPI_Status status;
   //Initialization of the array
   for (i = 0; i < N; i++)
     array[i] = i + 1;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   if (size != 1)
     step = N / (size - 1);
   //The array is divided by the number of slaves
   for (i = 0; i < size - 1; i++)
     MPI_Send (array + i * step, step, MPI_FLOAT, i + 1, MSG_DATA, MPI_COMM_WORLD);
   //The master completes the work if necessary
   for (i = (size - 1) * step, mysum = 0; i < N; i++)
     mysum += array[i];
   //The master receives the results in any order
   for (i = 1; i < size; mysum += tmpsum, i++)
     MPI_Recv (&tmpsum, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MSG_RESULT, MPI_COMM_WORLD, &status);
   printf ("%lf\n", mysum);
 }
 
 void slave (void) {
   float array[N];
   double sum;
   unsigned long long i;
   int count;
   MPI_Status status;
   MPI_Recv (array, N, MPI_FLOAT, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   //The slave finds the size of the array
   MPI_Get_count (&status, MPI_FLOAT, &count);
   for (i = 0, sum = 0; i < count; i++)
     sum += array[i];
   MPI_Send (&sum, 1, MPI_DOUBLE, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

对于第一个问题,主进程需要进程数量来确定子数组的大小(它将存储在 step 变量中)。MPI_COMM_SIZE函数在 size 变量中给出此指示,因此N除以从进程的数量,即size−1。接下来,数组被发送到从进程,而在从进程进行计算时,主进程将在必要时完成工作。然后,它将接收结果并将它们添加到最终总和中。为了解决第二个问题,主进程将从任何来源接收结果。这导致在MPI_Recv函数中使用 MPI_ANY_SOURCE 来代替进程的秩。

从进程不需要知道进程的数量,但它需要知道它接收到的子数组的大小。为此,我们将使用 status 参数。实际上,status 是一个结构,其中包含三个数据:消息的来源(如果使用MPI_ANY_SOURCE,则有用)、消息的标签(如果使用MPI_ANY_TAG,则有用)以及错误代码。此外,我们可以使用MPI_Get_count函数和 status 参数访问接收到的数据的实际长度。count 变量是子数组的长度。

此版本的程序可以使用任何数量的进程,甚至可以使用 1 个进程。在这种特殊情况下,主进程将完成所有计算。

不幸的是,这个程序不适合 MPI,因为,存在大量数据传输和少量计算。虽然 MPI 专为非共享内存操作而设计,但与数据传输相比,计算量必须更大,以便在消息中损失最少的时间。我们可以通过只发送第一个数字和最后一个数字来求和(并添加这两个数字之间的所有整数)来改进这个程序,但在这种情况下,只能计算几何和。

通过辛普森法进行积分

[编辑 | 编辑源代码]

这个程序的目的是使用辛普森法计算给定函数的积分。这个例子更适合 MPI,因为它需要的计算量更多,数据传输量更少,比以前。数学方程是

在 C 编程中,顺序版本是

 /* Integration - Simpson method - Sequential version */
 
 #include <stdio.h>
 #include <math.h>
 #define n 10000 //Must be even
 
 double f (double x);
 
 int main (void) {
   double integration,step;
   int i;
   printf ("Enter the domain of integration: ");
   scanf ("%lf %lf", &a, &b);
   step = (b - a) / n;
   for (i = 1, integration = 0; i < n / 2; i++)
     integration += 2*f(a+(2*i-1)*step) + f(a+2*i*step);
   integration *= 2;
   integration += f(a) + f(b) + 4*f(b - step);
   integration *= step / 3;
   printf ("%.15lf\n", integration);
   return 0;
 }
 
 double f (double x) {
   return exp(-1*pow(x,2));
 }

第一个版本 - 用户定义的类型

[编辑 | 编辑源代码]

要完成的工作是与前一个示例中的总和类似(一个数组,每个框的系数不同)。工作将按以下图所示的方式划分。

将总和数据分配到各个进程。

总和被分成多个部分,由从节点计算,主节点计算其他值并在必要时完成之前的工作。

总和的域将被发送到每个从节点,它们将评估函数的多个返回值并将其求和。将发送的信息包括总和的起始位置(begin)、每个值之间的差值(step)以及必须评估函数的值的数量(两个双精度数和一个整数)。然而,MPI_Send例程发送的数据必须具有相同的类型。因此,为了最小化发送的消息数量并优化程序,将创建一个并使用MPI类型来发送这些数据。

通常,要发送的数据需要分组并最小化,以减少在通信中浪费的时间(发送最少的消息,并带有最少的数据)。因此,必须将两个整数存储在一个数组中并以单个消息发送,而不是以两个不同的消息发送。

由于工作分配的速度比工作本身更快,当使用很少的进程时,主节点将等待很长时间才能获得从节点的结果。在这个程序中,当进程数量少于定义的限制(LIMIT_PROC)时,主节点将参与计算(即,工作将被进程数量而不是从节点数量来划分)。

 /* Integration - Simpson method - First parallel version */
 
 #include <stdio.h>
 #include <math.h>
 #include <mpi.h>
 #define n 10000 //Must be even
 #define LIMIT_PROC 4 //Number of process for which the master stop to work and only divide the work
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 struct domain_of_sum {
   double begin;
   double step;
   int number; //Number of iteration
 };
 
 double f (double x);
 MPI_Datatype Init_Type_Domain (void); //Declaration of a MPI type which will contain the domain
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 MPI_Datatype Init_Type_Domain (void) {
   MPI_Datatype Domain;
   MPI_Datatype type[3] = { MPI_DOUBLE, MPI_DOUBLE, MPI_INT };
   int blocklen[3] = { 1, 1, 1 };
   MPI_Aint ext;
   MPI_Type_extent (MPI_DOUBLE, &ext);
   MPI_Aint disp[3] = { 0, ext, 2 * ext };
   MPI_Type_struct (3, blocklen, disp, type, &Domain);
   return Domain;
 }
 
 void master (void) {
   double integration, a, b, sum;
   int i, size;
   struct domain_of_sum mydomain; //The C structure which will contain the domain
   MPI_Status status;
   MPI_Datatype Domain = Init_Type_Domain (); //Creation of the MPI type
   MPI_Type_commit (&Domain);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   printf ("Enter the domain of integration: ");
   scanf ("%lf %lf", &a, &b);
   mydomain.step = (b - a) / n; //step of the integration
   //The work is divided
   if (size > 1) {
     if (size > LIMIT_PROC)
       mydomain.number = (n / 2 - 1) / (size - 1);
     else
       mydomain.number = (n / 2 - 1) / size;
   }
   mydomain.begin = a + mydomain.step;
   //Each domain are sent
   for (i = 0; i < size - 1; i++) {
     mydomain.begin = a + (2 * i * mydomain.number + 1) * mydomain.step;
     MPI_Send (&mydomain, 1, Domain, i + 1, MSG_DATA, MPI_COMM_WORLD);
   }
   //The master complete the work
   for (i = (size - 1) * mydomain.number + 1, integration = 0; i < n / 2; i++)
     integration += 2 * f (a + (2 * i - 1) * mydomain.step) + f (a + 2 * i * mydomain.step);
   //The master receive the result
   for (i = 1; i < size; i++) {
     MPI_Recv (&sum, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MSG_RESULT,
     MPI_COMM_WORLD, &status);
     integration += sum;
   }
   integration *= 2;
   integration += f (a) + f (b) + 4 * f (b - mydomain.step);
   integration *= mydomain.step / 3;
   printf ("%.15lf\n", integration);
 }
 
 void slave (void) {
   double sum;
   int i;
   struct domain_of_sum mydomain;
   MPI_Status status;
   MPI_Datatype Domain = Init_Type_Domain (); //Creation of the MPI type 
   MPI_Type_commit (&Domain);
   MPI_Recv (&mydomain, 1, Domain, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0, sum = 0; i < mydomain.number; i++)
     sum += 2*f(mydomain.begin+(2*i)*mydomain.step) + f(mydomain.begin+(2*i+1)*mydomain.step);
   MPI_Send (&sum, 1, MPI_LONG_LONG_INT, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

用于创建MPI类型的所有例程都被重新分组到函数Init_Type_Domain中,以简化代码。MPI_TYPE_STRUCT函数用于创建新类型并将其存储在Domain中。此函数需要5个参数:新类型将包含的块数量(在本例中是两个双精度数和一个整数);这些块在数组中的每个元素的数量(一个双精度数、另一个双精度数和一个整数);每个块在消息中的偏移量的数组;这些类型的数组;以及包含结构的变量的地址。每个块的偏移量是通过使用MPI_TYPE_EXTENT函数获得的,该函数给出MPI_DOUBLE的长度。它等同于C语言中的sizeof()函数。MPI类型的长度存储在一个MPI_Aint变量中。

另一种可能性是声明一个包含两个双精度数和一个整数的数组的结构。

 struct domain_of_sum {
   double array[2]; //Contains begin and step
   int number;
 };
 
 //The new MPI type
 MPI_Datatype Init_Type_Domain (void) {
   MPI_Datatype Domain;
   MPI_Datatype type[2] = { MPI_DOUBLE, MPI_INT };
   int blocklen[2] = { 2, 1 };
   MPI_Aint ext;
   MPI_Type_extent (MPI_DOUBLE, &ext);
   MPI_Aint disp[2] = { 0, 2 * ext };
   MPI_Type_struct (2, blocklen, disp, type, &Domain);
   return Domain;
 }

第二版

[edit | edit source]

两种不同的方法用于确定从节点必须实现的操作数量。在第一个例子中,从节点使用MPI_Recv函数的状态参数。在第二个例子中,这个数量直接与数据一起发送。另一种方法是通过与主节点固定该数据相同的方式来获取它,即使用进程的数量。为了最小化数据传输量,我们将使用最后一种方法(此版本只是一个优化)。

 /* Integration - Simpson method - Second parallel version*/
 
 #include <stdio.h>
 #include <math.h>
 #include <mpi.h>
 #define n 10000
 #define LIMIT_PROC 4
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 double f (double x);
 int Get_the_number (int size);  // Function which gives the number of iterations
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 int Get_the_number (int size) {
   int number;
   if (size > 1) {
     if (size > LIMIT_PROC)
       number = (n / 2 - 1) / (size - 1);
     else
       number = (n / 2 - 1) / size;
   }
   return number;
 }
 
 void master (void) {
   double integration, a, b, sum;
   int i, size, number;
   double mydomain[2];
   MPI_Status status;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   number = Get_the_number (size);
   printf ("Enter the domain of integration: ");
   scanf ("%lf %lf", &a, &b);
   mydomain[1] = (b - a) / n;
   //The work is divided
   mydomain[0] = a + mydomain[1];
   for (i = 0; i < size - 1; i++) {
     mydomain[0] = a + (2 * i * number + 1) * mydomain[1];
     MPI_Send (mydomain, 2, MPI_DOUBLE, i + 1, MSG_DATA, MPI_COMM_WORLD);
   }
   //The master complete the work
   for (i = (size - 1) * number + 1, integration = 0; i < n / 2; i++)
     integration += 2 * f (a + (2 * i - 1) * mydomain[1]) + f (a + 2 * i * mydomain[1]);
   //The master receive the result
   for (i = 1; i < size; i++) {
     MPI_Recv (&sum, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MSG_RESULT, MPI_COMM_WORLD, &status);
     integration += sum;
   }
   integration *= 2;
   integration += f (a) + f (b) + 4 * f (b - mydomain[1]);
   integration *= mydomain[1] / 3;
   printf ("%.15lf\n", integration);
 }
 
 void slave (void) {
   double sum;
   int i, size, number;
   double mydomain[2];
   MPI_Status status;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   number = Get_the_number (size);
   MPI_Recv (mydomain, 2, MPI_DOUBLE, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0, sum = 0; i < number; i++)
     sum+=2*f(mydomain[0]+(2*i)*mydomain[1])+f(mydomain[0]+(2*i+1)*mydomain[1]);
   MPI_Send (&sum, 1, MPI_DOUBLE, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

向量和矩阵的乘积

[edit | edit source]

这个程序的目的是计算向量和矩阵的乘积。在C语言编程中,顺序版本为

 /* Product of a vector by a matrix - Sequential version */
 
 #include <stdio.h>
 #include <mpi.h>
 #define N 10000
 
 void writecolumn (float *A);
 
 int main (int argc, char **argv) {
   float *A = (float *) malloc (N * N * sizeof (float)), B[N], C[N];
   unsigned int i, j, step;
   int size;
   //Initialization
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       *(A + i * N + j) = i + j;
     B[i] = 1;
   }
   //Product of the matrix and the vector
   for (i = 0; i < N; ++i)
     for (j = 0, C[i] = 0; j < N; j++)
       C[i] += *(A + i * N + j) * B[j];
   writecolumn(C);
   free (A);
   return 0;
 }
 
 void writecolumn (float *A) {
   int i;
   for (i = 0; i < N; i++)
   printf ("%f\n", *(A + i));
 }

第一版 - 广播

[edit | edit source]

为了并行化这个程序,矩阵将被分成行组,每个部分将被发送到不同的从节点,而向量将被发送到每个从节点(参见下图)。

将矩阵数据分配到各个进程

将相同的数据发送到每个从节点可以通过使用MPI_Bcast函数广播这些数据来完成,该函数必须由所有进程调用。

 /* Product of a vector by a matrix - First parallel version */
 
 #include <stdio.h>
 #include <mpi.h>
 #define N 10000
 #define LIMIT_PROC 4
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void writecolumn (float *A);
 int Get_the_number (int size);
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 int Get_the_number (int size) {
   int number;
   if (size > 1) {
     if (size > LIMIT_PROC)
       number = N / (size - 1);
     else
       number = N / size;
   }
   return number;
 }
 
 void master (void) {
   float *A = (float *) malloc (N * N * sizeof (float)), B[N], C[N], *buff;
   unsigned int i, j, step;
   int size;
   MPI_Status status;
   //Initialization
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       *(A + i * N + j) = i + j;
     B[i] = 1;
   }
   //Division of work
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = Get_the_number (size);
   //Broadcast of the vector
   MPI_Bcast (B, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   for (i = 1; i < size; i++)
     MPI_Send (A + (i - 1) * step * N, N * step, MPI_FLOAT, i, MSG_DATA, MPI_COMM_WORLD);
   //Finishing the work
   for (i = (size - 1) * step; i < N; ++i)
     for (j = 0, C[i] = 0; j < N; j++)
       C[i] += *(A + i * N + j) * B[j];
   buff = (float *) malloc (step * sizeof (float));
   //Receive and reorder
   for (i = 1; i < size; i++) {
     MPI_Recv (buff, step, MPI_FLOAT, MPI_ANY_SOURCE, MSG_RESULT,
     MPI_COMM_WORLD, &status);
     for (j = 0; j < step; j++)
       C[j + (status.MPI_SOURCE - 1) * step] = buff[j];
   }
   writecolumn(C);
   free (A);
   free (buff);
 }
 
 void slave (void) {
   float Vect[N];
   unsigned int i, j, step;
   int size;
   MPI_Status status;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = Get_the_number (size);
   float *result = (float *) malloc (step * sizeof (float));
   float *partmat = (float *) malloc (step * N * sizeof (float));
   MPI_Bcast (Vect, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   MPI_Recv (partmat, N * step, MPI_FLOAT, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0; i < step; ++i)
     for (j = 0, result[i] = 0; j < N; j++)
       result[i] += Vect[j] ** (partmat + i * N + j);
   MPI_Send (result, step, MPI_FLOAT, 0, MSG_RESULT, MPI_COMM_WORLD);
   free (partmat);
   free (result);
 }

MPI_Bcast函数需要的参数比MPI_SendMPI_Recv函数少。MPI_Bcast函数需要包含或将包含广播数据的缓冲区的地址,缓冲区中的条目数量,这些条目类型,广播根节点的等级(发送数据的进程)和通信器。没有标签或状态参数,因为它们用于区分消息并提供有关接收数据的的信息,在本例中,所有进程都参与广播消息(因此,不需要区分消息),并且存储在状态中的信息是已知的(源是广播根节点,长度是条目数量,没有标签)。

当主节点收到数据时,它必须重新排序数据,因为接收顺序是未指定的(MPI_ANY_SOURCE)。为此,状态结构包含消息的源,该源用于重新排序和组成向量。

在这个例子中,矩阵存储在一个一维数组中,而不是传统的二维数组。可以使用MPI发送多维数组,但必须小心分配。即,数组必须连续存储,因为发送函数的第一个参数是起始缓冲区的地址,第二个参数指定将发送的后续值的数量。然后,静态和动态分配为

 /* Static allocation */
 int array[rows][columns];

 /* Dynamic allocation */
 int **array;
 array = (int**) malloc (rows * sizeof(int*));
 if (array == NULL) exit (-1);
 
 array[0] = (int*) malloc (rows * columns * sizeof(int));
 if (array[0] == NULL) exit (-1);
 
 for (i=1; i<rows; i++)
   array[i] = array[0] + i * columns;
 
 /* Send either kind of array */
 MPI_Send (array[0],rows*columns,MPI_INT,0,0,MPI_COMM_WORLD);

第二版 - 分散和收集

[edit | edit source]

MPI规范提供了集体通信函数,MPI_Bcast函数是其中之一。这个程序的下一个版本将使用“分散”和“收集”函数,它们分别将不同的数据发送给每个人,并从每个人接收不同的数据(参见下图)。

分散和收集函数的原理

因此,工作将被略微不同地划分,即,主节点也将参与计算。

 /* Product of a vector by a matrix - Second parallel version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 5000
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void writecolumn (float *A);
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void master (void) {
   float *A = (float *) malloc (N * N * sizeof (float)), B[N], C[N];
   unsigned int i, j, step;
   int size;
   MPI_Status status;
   //Initialization
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       *(A + i * N + j) = i + j;
     B[i] = 1;
   }
   //Division of the work
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = N / size;
   float *result = (float *) malloc (step * sizeof (float));
   float *partmat = (float *) malloc (step * N * sizeof (float));
   //Sending the data
   MPI_Bcast (B, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   //Each processes will receive a different part of the matrix
   MPI_Scatter (A, N * step, MPI_FLOAT, partmat, N * step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   //The master make its part (the same code is used for the slaves)
   for (i = 0; i < step; ++i)
     for (j = 0, result[i] = 0; j < N; j++)
       result[i] += *(partmat + i * N + j) * B[j];
   //It finish the work
   for (i = size * step; i < N; ++i)
     for (j = 0, C[i] = 0; j < N; j++)
       C[i] += *(A + i * N + j) * B[j];
   //Receiving the data in the good order
   MPI_Gather (result, step, MPI_FLOAT, C, step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   writecolumn(C);
   free (partmat);
   free (result);
   free (A);
 }
 
 void slave (void) {
   float Vect[N], *A, *C; //C and A are not used by slaves but must be declared
   unsigned int i, j, step;
   int size;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = N / size;
   float *result = (float *) malloc (step * sizeof (float));
   float *partmat = (float *) malloc (step * N * sizeof (float));
   //Receiving the data
   MPI_Bcast (Vect, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   MPI_Scatter (A, N * step, MPI_FLOAT, partmat, N * step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   //Computation
   for (i = 0; i < step; ++i)
     for (j = 0, result[i] = 0; j < N; j++)
       result[i] += Vect[j] ** (partmat + i * N + j);
   //Sending the result
   MPI_Gather (result, step, MPI_FLOAT, C, step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   free (partmat);
   free (result);
 }
 
 void writecolumn (float *A) {
   int i;
   for (i = 0; i < N; i++)
   printf ("%f\n", *(A + i));
 }

对函数MPI_SCATTER和MPI_GATHER的调用对于主节点和从节点来说是相同的(强调的参数被从节点忽略)。参数为

MPI_Scatter MPI_Gather
发送缓冲区的地址 发送缓冲区的地址
每个进程发送的元素数量 发送缓冲区中的元素数量
发送缓冲区元素的数据类型 发送缓冲区元素的数据类型
接收缓冲区的地址 接收缓冲区的地址
接收缓冲区中发送的元素数量 任何单个接收的元素数量
接收缓冲区元素的数据类型 接收缓冲区元素的数据类型
发送进程的等级 接收进程的等级
通信器 通信器

注意,在第一种情况下,主节点将数据发送给自己,而在第二种情况下,它从自己接收数据。在这个例子中,矩阵被进程数量划分,主节点将第一部分发送给自己,第二部分发送到第一个从节点,等等。之后,主节点从自己接收结果向量的第一部分,从第一个从节点接收第二部分,等等。

元胞自动机

[edit | edit source]

在这个例子中,我们将编写一个元胞自动机。元胞自动机是矩阵的演化,我们对它执行定义的函数,直到收敛或出现循环现象。该函数根据其四个邻居的值,为矩阵的每个点返回一个值。矩阵的边界将具有与其相反侧边界相同的值。结果矩阵将是一个环面。在C语言编程中,顺序版本为

 /* Cellular automata - Sequential version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #define N 62
 #define MAX_ITERATION N //Number of iteration
 
 void init_mat (int *A, int length);
 void writemat (int *A);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* A=(int*)malloc(N*N*sizeof(int));
   int* B=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int i,j,k=0;
   init_mat (A,N);
   while(k++<MAX_ITERATION) {
   //Calculation of the new matrix in a temporary matrix
     for (i=0;i<N-2;i++)
       for (j=0;j<N-2;j++)
         *(B+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     //Copy of the new matrix
     for (i=1;i<N-1;i++)
       for (j=1;j<N-1;j++)
         *(A+i*N+j)=*(B+(i-1)*N+j-1);
     //Update of the border
     for (i=1;i<N-1;i++) {
       *(A+i)=*(A+(N-2)*N+i);
       *(A+(N-1)*N+i)=*(A+N+i);
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
   }
   writemat(A);
   return 0;
 }
 
 void init_mat (int *A, int length) {
   int i,j;
   for (i=0;i<length;i++)
     for (j=0;j<length;j++)
       *(A+i*length+j)=i+j>length/2&&i+j<length*3/2;
   //Each border of the matrix will have the same value than the border of his opposite side
   for (i=1;i<length-1;i++) {
     *(A+i)=*(A+(length-2)*length+i);
     *(A+(length-1)*length+i)=*(A+length+i);
     *(A+i*length)=*(A+i*length+length-2);
     *(A+i*length+length-1)=*(A+i*length+1);
   }
 }
 
 void writemat (int *A) {
   int i,j;
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       printf ("%d ", *(A + i*N +j));
     printf("\n");
   }
 }
 
 int f (int x1,int x2,int x3,int x4) {
   return x1+x2+x3+x4>2;
 }

第一版 - 死锁

[edit | edit source]

矩阵将被分成行组,就像在图[fig:vect]中一样。为了计算矩阵的新部分,应用的函数需要属于矩阵其他部分的点的值,然后每个进程将计算新矩阵并将第一行和最后一行与其两个邻居交换(参见下图)。

矩阵的划分和进程之间的通信

当进程共享它们的数据时,可能会发生死锁。死锁是程序的阻塞状态,原因是某个进程正在等待另一个进程执行特定操作,但后者进程正在等待前者进程执行特定操作。这种现象导致永久等待状态(参见下图)。

死锁示例

在这个代码中可能会发生死锁,因为MPI_SEND和MPI_RECV函数会阻塞程序,直到它们完成,即直到接收者收到发送的消息或发送者发送要接收的消息。如果所有进程执行相同的代码(主节点除外),则进程一将数据发送到进程二,进程二将数据发送到进程三,等等。进程一将等待进程二接收它的数据,但这永远不会发生,因为进程二也正在等待进程三接收它的数据。由于通信是循环的,因此程序被阻塞。作为第一个解决方案,我们可以更改每个偶数进程上发送和接收的顺序。

 /* Cellular automata - First parallel version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 62 //Must be multiple of number of processes (which must be > 1)
 #define MAX_ITERATION N
 #define MSG_DATA 100
 
 void init_mat (int *A, int length);
 void writemat (int *A, int length);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* result=(int*)malloc(N*N*sizeof(int));
   int* A=(int*)malloc(N*N*sizeof(int));
   int* tmp=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int step;
   int myrank,size,left,right;
   MPI_Status status;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   int i,j,k=0;
   step = (N-2)/size;
   //Initialization by the master
   if (!myrank) init_mat(result,N);
   MPI_Scatter (result+N,step*N,MPI_INT,A+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   //The next and the previous processes are determined
   left=(myrank-1+size)%size; right=(myrank+1)%size;
   while(k++<MAX_ITERATION) {
     //The order of exchanging the data is decided
     if (myrank%2) {
       MPI_Send (A+N,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD);
       MPI_Recv (A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,&status);
       MPI_Send (A+(step)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD);
       MPI_Recv (A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
     } else {
       MPI_Recv (A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,&status);
       MPI_Send (A+N,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD);
       MPI_Recv (A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
       MPI_Send (A+(step)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD);
     }
     for (i=1;i<step-1;i++) {
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
     for (i=0;i<step;i++)
       for (j=0;j<N-2;j++)
         *(tmp+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     for (i=1;i<step+1;i++)
       for (j=1;j<N-1;j++)
         *(A+i*N+j)=*(tmp+(i-1)*N+j-1);
   }
   MPI_Gather (A+N,step*N,MPI_INT,result+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   //The master recompose the matrix and print it
   if (!myrank) {
     for (i=1;i<N-1;i++) {
       *(result+i)=*(result+(N-2)*N+i);
       *(result+(N-1)*N+i)=*(result+N+i);
       *(result+i*N)=*(result+i*N+N-2);
       *(result+i*N+N-1)=*(result+i*N+1);
     }
     writemat(result,N);
   }
   MPI_Finalize ();
   return 0;
 }

在本例中,主节点和从节点的代码几乎完全相同,因此无需将代码分成两个函数(主节点特有的代码只需在等级条件之前)。

当主节点等待进程 1 的数据时,进程 1 将数据发送到主节点。然后它等待来自主节点的数据,同时主节点将数据发送给它。代码对所有进程的行为将相同,因此是安全的。

第二版 - 非阻塞发送和接收

[编辑 | 编辑源代码]

第二个解决方案是使用非阻塞发送和接收:这些函数被调用并立即将控制权交给进程。然后,进程可以进行一些计算,当需要访问消息数据时,它可以通过调用一些适当的函数来完成它。使用非阻塞通信函数的程序可以更快。在这个第二个版本中,所有函数将在所有发送和所有接收开始后立即完成。

在前面的例子中,矩阵的维度应该是一个进程数的倍数,因为剩余的行没有发送到任何进程。在这个版本中,主节点将这些行发送到最后一个进程,并分别接收结果行。

 /* Cellular automata - Second parallel version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 62 //Must be multiple of number of processes
 #define MAX_ITERATION N
 #define MSG_DATA 100
 
 void init_mat (int *A, int length);
 void writemat (int *A, int length);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* result=(int*)malloc(N*N*sizeof(int));
   int* A=(int*)malloc(N*N*sizeof(int));
   int* tmp=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int step;
   int myrank,size,left,right;
   MPI_Status status;
   MPI_Request request[4];
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   int i,j,k=0;
   step = (N-2)/size;
   if (!myrank) init_mat(result,N);
   MPI_Scatter (result+N,step*N,MPI_INT,A+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   //Code allowing any dimension of the matrix
   if (myrank==0&&(N-2)%size) MPI_Send (result+N*(step*size+1),(N-2-step*size)*N,MPI_INT,size-1,MSG_DATA,MPI_COMM_WORLD);
   else if (myrank==size-1&&(N-2)%size) {
     MPI_Recv (A+N*(step+1),(N-2-step*size)*N,MPI_INT,0,MSG_DATA,MPI_COMM_WORLD,&status);
     step+=N-2-step*size;
   }
   left=(myrank-1+size)%size; right=(myrank+1)%size;
   while(k++<MAX_ITERATION) {
     //The data begin to be shared
     MPI_Isend (A+N,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,request);
     MPI_Irecv (A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,request+1);
     MPI_Isend (A+(step)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,request+2);
     MPI_Irecv (A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,request+3);
     //The transfer of data are completed
     for (i=0;i<4;i++)
       MPI_Wait(request+i,&status);
     for (i=1;i<step-1;i++) {
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
     for (i=0;i<step;i++)
       for (j=0;j<N-2;j++)
         *(tmp+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     for (i=1;i<step+1;i++)
       for (j=1;j<N-1;j++)
       *(A+i*N+j)=*(tmp+(i-1)*N+j-1);
   }
   if (myrank==size-1&&(N-2)%size) {
     step=(N-2)/size;
     MPI_Send (A+N*(step+1),(N-2-step*size)*N,MPI_INT,0,MSG_RESULT,MPI_COMM_WORLD);
   } else if (myrank==0&&(N-2)%size)
     MPI_Recv (result+N*(step*size+1),(N-2-step*size)*N,MPI_INT,size-1,MSG_RESULT,MPI_COMM_WORLD,&status);
   MPI_Gather (A+N,step*N,MPI_INT,result+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   if (!myrank) {
     for (i=1;i<N-1;i++) {
       *(result+i)=*(result+(N-2)*N+i);
       *(result+(N-1)*N+i)=*(result+N+i);
       *(result+i*N)=*(result+i*N+N-2);
       *(result+i*N+N-1)=*(result+i*N+1);
     }
     writemat(result,N);
   }
   MPI_Finalize ();
   return 0;
 }

MPI_Isend 和 MPI_Irecv(I 代表立即)函数需要一个额外的参数:一个 MPI_Request 变量的地址,该变量将标识稍后完成的请求。状态参数将传递给完成函数。MPI_WAIT 用于完成对先前函数的调用(由第一个参数标识),并将等待直到函数有效完成。

第三版 - Sendrecv

[编辑 | 编辑源代码]

下一个版本使用发送-接收函数:它等效于并行调用发送和接收函数。只有 while 循环不同。

 while(k++<MAX_ITERATION) {
   //The transfers are simultaneous
   MPI_Sendrecv(A+N,N,MPI_INT,left,MSG_DATA,A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,&status);
   MPI_Sendrecv(A+(step)*N,N,MPI_INT,right,MSG_DATA,A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
   for (i=1;i<step-1;i++) {
     *(A+i*N)=*(A+i*N+N-2);
     *(A+i*N+N-1)=*(A+i*N+1);
   }
   for (i=0;i<step;i++)
     for (j=0;j<N-2;j++)
       *(tmp+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
   for (i=1;i<step+1;i++)
     for (j=1;j<N-1;j++)
       *(A+i*N+j)=*(tmp+(i-1)*N+j-1);
 }

MPI_SENDRECV 函数需要参数,这些参数本质上与 MPI_SEND 和 MPI_RECV 函数需要的参数相同。

第四版 - 环形拓扑

[编辑 | 编辑源代码]

最后一个版本从根本上不同,其中使用的拓扑结构是环形拓扑结构。在进程 0 上计算一个新的矩阵,然后将其发送到进程 1。经过一次迭代,该进程将其发送到进程 2,进程 2 计算新的矩阵并将其再次发送到下一个进程。最后一个进程将矩阵返回给主节点,循环将继续,直到迭代次数达到定义的常数(见下图)。

环形拓扑结构中的交互

使用这种拓扑结构,可以同时计算多个不同的数据。虽然在本例中对数据的操作相同,但进程可以执行不同类型的操作。只有一个矩阵将在环形拓扑结构中发送。然而,可以(这也是这种结构的优势)在之后立即发送第二个矩阵。

 /* Cellular automata - Parallel version using a ring topology */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 62
 #define MAX_ITERATION N
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void init_mat (int *A, int length);
 void writemat (int *A, int length);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* A=(int*)malloc((N*N+1)*sizeof(int));
   int* B=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int myrank,size,left,right;
   MPI_Status status;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   int i,j,k=0;
   right=(myrank+1)%size; left=(myrank-1+size)%size;
   if (!myrank) init_mat(A,N);
   else MPI_Recv (A,N*N+1,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
   while (*(A+N*N)<MAX_ITERATION) {
     for (i=0;i<N-2;i++)
       for (j=0;j<N-2;j++)
         *(B+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     for (i=1;i<N-1;i++)
       for (j=1;j<N-1;j++)
         *(A+i*N+j)=*(B+(i-1)*N+j-1);
     for (i=1;i<N-1;i++) {
       *(A+i)=*(A+(N-2)*N+i);
       *(A+(N-1)*N+i)=*(A+N+i);
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
     //The number of iteration is incremented
     (*(A+N*N))++;
     //The matrix is sent and another is receive
     MPI_Sendrecv_replace(A,N*N+1,MPI_INT,right,MSG_DATA,left,MSG_DATA,MPI_COMM_WORLD,&status);
   }
   //When the number of iteration reach the maximum, the program stop
   if ((*(A+N*N))++==MAX_ITERATION) {
     MPI_Sendrecv_replace(A,N*N+1,MPI_INT,right,MSG_DATA,left,MSG_DATA,MPI_COMM_WORLD,&status);
     writemat(A,N);
   } else MPI_Send (A,N*N+1,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD);
     MPI_Finalize ();
   return 0;
 }

迭代次数与矩阵一起存储,存储在一个额外的元素中。它将用于确定何时停止循环。当一个进程达到最大迭代次数时,矩阵将被传递到所有其他进程。这实际上是必要的,以便正确地停止它们,因为否则,它们将继续循环并且永远不会停止。

MPI_SENDRECV_REPLACE 函数类似于 MPI_SENDRECV 函数,不过它使用相同的缓冲区来发送和接收数据。

该程序将使用有限元方法计算函数在多个点处的导数的数值。方程为:

但是,从数值上来说,除以 0 很难,并且 h 不能随意选择(见下图)。

有限元方法的精度随 h 的变化而变化的示意图

h 的最佳值是未知的,因此,算法将减少 h 的起始值,直到精度开始下降(用不同的 h 计算的两个导数之间的差异必须下降)。在 C 编程中,顺序版本是

 /* Derivate - Sequential version */
 
 #include <stdio.h>
 #include <math.h>
 #define H 1 //first h
 #define DEC 1.4 //decrease
 #define N 500
 
 double f (double x);
 
 int main (int argc, char **argv) {
   double x,deriv,diffnew,diffold,h;
   double result;
   int i;
   for (i=0,x=i;i<N;x=++i) {
     h=H;
     deriv=(f(x+h)-f(x-h))/(2*h);
     diffnew=100;
     diffold=2*diffnew;
     while (fabs(diffnew)<fabs(diffold)) {
       h/=DEC;
       result=deriv;
       diffold=diffnew;
       deriv=(f(x+h)-f(x-h))/(2*h);
       diffnew=deriv-result;
     }
   printf ("%lf: %.15lf\n",x,result);
   }
   return 0;
 }
 
 double f (double x) {
   return x*x*x+x*x;
 }

第一版 - 农场拓扑

[编辑 | 编辑源代码]

由于循环次数是可变的,因此无法预测执行时间。该程序将一个导数发送到每个进程进行计算,当一个进程完成时(不一定是最先完成的),它会立即将数据重新发送到该进程。这些数据和结果被收集到两个数组中(deriv[] 和 result[])。

 /* Derivate - Parallel version using farming topology */
 
 #include <stdio.h>
 #include <mpi.h>
 #include <math.h>
 
 #define H 1 //first h
 #define DEC 1.4 //decrease (1.4)
 #define N 500
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 double f (double x);
 void init_deriv(double *deriv, double x);
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void init_deriv(double *deriv, double x) {
   deriv[0]=x;
   deriv[4]=H;
   deriv[1]=(f(deriv[0]+deriv[4])-f(deriv[0]-deriv[4]))/(2*deriv[4]);
   deriv[2]=100;
   deriv[3]=2*deriv[2];
 }
 
 void master(void) {
   double deriv[5],result[2]; //x,deriv,diffnew,diffold,h
   int i;
   int size;
   MPI_Status status;
   MPI_Comm_size(MPI_COMM_WORLD,&size);
   //Sending data to all processes
   for (i=1;i<size;i++) {
     init_deriv(deriv,1);
     MPI_Send(deriv,5,MPI_DOUBLE,i,MSG_DATA,MPI_COMM_WORLD);
   }
   //When a result is received, another task is sent
   for (;i<N;++i) {
     init_deriv(deriv,1);
     MPI_Recv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&status);
     MPI_Send(deriv,5,MPI_DOUBLE,status.MPI_SOURCE,MSG_DATA,MPI_COMM_WORLD);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
   deriv[4]=10*H;
   //The current task are completed
   for (i=1;i<size;i++) {
     MPI_Recv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&status);
     MPI_Send(deriv,5,MPI_DOUBLE,status.MPI_SOURCE,MSG_DATA,MPI_COMM_WORLD);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
 }
 
 void slave(void) {
   double deriv[5],result[2]; //x,deriv,diffnew,diffold,h
   MPI_Status status;
   MPI_Recv(deriv,5,MPI_DOUBLE,0,MSG_DATA,MPI_COMM_WORLD,&status);
   while (deriv[4]<5*H) {
     while (fabs(deriv[2])<fabs(deriv[3])) {
       deriv[4]/=DEC;
       result[1]=deriv[1];
       deriv[3]=deriv[2];
       deriv[1]=(f(deriv[0]+deriv[4])-f(deriv[0]-deriv[4]))/(2*deriv[4]);
       deriv[2]=deriv[1]-result[1];
     }
   result[0]=deriv[0];
   MPI_Send (result,2,MPI_DOUBLE,0,MSG_RESULT,MPI_COMM_WORLD);
   MPI_Recv (deriv,5,MPI_DOUBLE,0,MSG_DATA,MPI_COMM_WORLD,&status);
   }
 }

第二版 - 发送和接收完成

[编辑 | 编辑源代码]

第二个拓扑结构类似于环形拓扑结构,并引入了一个新的例程:MPI_REQUEST_FREE。主节点将始终将数据发送到进程 1,并将结果从任何其他进程接收(见下图)。

所用拓扑结构的示意图

由于从节点的代码与元胞自动机(使用环形拓扑结构的版本)的代码类似,因此只描述主节点函数。为了避免死锁,只使用非阻塞发送(这里使用非阻塞接收是为了提高性能)。

 void master(void) {
   double deriv[5],result[2]; //x,deriv,diffnew,diffold,h
   int i;
   int size;
   MPI_Status status;
   MPI_Request request[N],req;
   MPI_Comm_size(MPI_COMM_WORLD,&size);
   //The master fill the ring by sending enough data
   for (i=1;i<size;i++) {
     init_deriv(deriv,i);
     MPI_Isend(deriv,5,MPI_DOUBLE,1,MSG_DATA,MPI_COMM_WORLD,request+i);
   }
   //When a result is received, another data is sent in the ring
   for (;i<N;++i) {
     MPI_Irecv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&req);
     init_deriv(deriv,i);
     MPI_Wait (&req,&status);
     MPI_Request_free (request+(int)result[0]);
     MPI_Isend(deriv,5,MPI_DOUBLE,1,MSG_DATA,MPI_COMM_WORLD,request+i);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
   //The current task are completed
   for (i=1;i<size;i++) {
     MPI_Recv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&status);
     MPI_Request_free (request+(int)result[0]);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
 }

当主节点完成发送足够的数据(对于所有进程)时,它执行一个新的循环。它接收一个结果并同时初始化一个新的数据,接下来,它完成发送导致它刚刚接收到的结果的数据,并在环形拓扑结构中重新发送一个新的数据。实际上,如果主节点接收了一个数据的結果,这意味着该数据的发送已完成,无需再完成发送。MPI_REQUEST_FREE 函数允许在不检查请求何时逻辑完成的情况下释放缓冲的请求。

通信器和组

[编辑 | 编辑源代码]

集体通信(如 MPI_BCAST、MPI_SCATTER 和 MPI_GATHER)将数据发送到属于同一组的进程。MPI 提供用于管理组和通信器的函数。这允许在指定的进程上使用集体通信,从而将网络分成可以执行不同任务的组。

例如,可以在矩阵上进行不同的操作(迹、转置、求逆、对角化)。对于每个操作,将定义一个组,每个进程将使用集体通信与其组中的其他进程通信,而不会干扰其他组。

通信器是一个对象,它指定了一个通信域。它与一个组相关联。组是从现有组创建的,初始组是包含所有进程的通信器 MPI_COMM_WORLD 的组。

第一版 - 从组创建通信器

[编辑 | 编辑源代码]

创建新通信器的第一种方法是从第一个组创建组,然后创建这些组的通信器。

 /* Example of use of group */
 
 #include <stdio.h>
 #include <mpi.h>
 
 int main (int argc, char *argv[]) {
   int rank[50],size,i,myrank;
   char data[10]="";
   MPI_Group group,newgroup;
   MPI_Comm newcomm;
   MPI_Init (&argc,&argv);
   //group of MPI_COMM_WORLD which contains all the processes
   MPI_Comm_group (MPI_COMM_WORLD,&group);
   MPI_Group_size (group,&size);
   //Creation of the communicator containing even-ranked processes
   for (i=0;i<size/2;i++)
     rank[i]=2*i; if (size%2) rank[size/2]=size-1;
   MPI_Group_incl (group,(size+1)/2,rank,&newgroup);
   MPI_Comm_create (MPI_COMM_WORLD,newgroup,&newcomm);
   //Only the processes belonging to the group of newcomm will participate to the broadcast on this communicator
   MPI_Comm_rank (MPI_COMM_WORLD,&myrank);
   if (!(myrank%2)) MPI_Bcast (data,10,MPI_INT,0,newcomm);
   MPI_Group_free (&group);
   MPI_Finalize ();
   return 0;
 }

在这个程序中,数据只在偶数等级的进程上广播。这是通过创建一个包含所有偶数等级进程的通信器来实现的。首先,使用 `MPI_COMM_GROUP` 函数获取 `MPI_COMM_WORLD` 的初始组。新创建的组包含了初始组中秩为偶数的 个进程。`MPI_GROUP_INCL` 函数需要一个数组,包含第一个组中将属于新组的进程的秩。然后,`MPI_COMM_CREATE` 函数创建新组的通信器。如果进程的秩为奇数,它将参与广播。最后,使用 `MPI_GROUP_FREE` 函数释放该组。

`MPI_GROUP_SIZE` 和 `MPI_COMM_RANK` 函数类似于 `MPI_COMM_SIZE` 和 `MPI_GROUP_RANK` 函数。后两者只是简写。

`MPI_Comm_size (comm, &size);` `MPI_Comm_rank (comm, &rank);`
`MPI_Comm_group(comm, &group);` `MPI_Comm_group(comm, &group);`
`MPI_Group_size(group, &size);` `MPI_Group_rank(group, &rank);`
`MPI_Group_free(&group);` `MPI_Group_free(&group);`

然而,如果调用 `MPI_GROUP_RANK` 的进程不属于函数参数中指定的组,则 `rank` 返回的值将为 `MPI_UNDEFINED`。而调用 `MPI_COMM_RANK` 将会导致程序停止。

第二版 - 从通信器创建通信器

[edit | edit source]

可以不使用组,直接通过分割现有的通信器来创建新的通信器。

 int main (int argc, char *argv[]) {
   int color,i,myrank;
   char data[10]="";
   MPI_Comm newcomm;
   MPI_Init (&argc,&argv);
   MPI_Comm_rank (MPI_COMM_WORLD,&myrank);
   //Creation of the communicator containing even-ranked processes
   if (!(myrank%2)) color=0;
   else color=1;
   MPI_Comm_split (MPI_COMM_WORLD,color,1,&newcomm);
   //Only the processes having even rank will participate to the broadcast on this communicator
   if (!(myrank%2)) MPI_Bcast (data,10,MPI_INT,0,newcomm);
   MPI_Finalize ();
   return 0;
 }

所有进程将根据其颜色被分类到新的通信器中。例如,如果两个进程的颜色为 1,一个进程的颜色为 2,那么将创建两个通信器,每个组一个。`MPI_COMM_SPLIT` 函数的第三个参数用于在新通信器中排序进程(这将改变新进程的秩)。在这里,所有进程都使用相同的排序值。

其他

[edit | edit source]

环境管理

[edit | edit source]

最后,环境管理函数可以用来获取进程运行的节点名称或测量程序中某个部分所花费的时间等。

 /* Example of use of environmental function */
 
 #include <stdio.h>
 #include <mpi.h>
 
 int main (int argc, char *argv[]) {
   double starttime,endtime;
   int namelen;
   char processor_name[MPI_MAX_PROCESSOR_NAME];
 
   MPI_Init(&argc,&argv);
   MPI_Get_processor_name(processor_name,&namelen);
   printf ("Process %s\n", processor_name);
 
   starttime = MPI_Wtime();
   //Here some computation or communication
   endtime = MPI_Wtime();
   printf ("Time: %lf\n",endtime-starttime);
 
   MPI_Finalize ();
   return 0;
 }

其他可能性

[edit | edit source]

还存在许多未提及的函数:有不同类型的点对点通信函数;MPI 允许用户使用多种函数定义数据类型;还存在其他集体通信和归约函数;以及更多用于使用通信器和组的函数。

MPI 定义了许多有用的常量,包括 `MPI_ANY_SOURCE`、`MPI_INT`、`MPI_UNDEFINED` 等。

MPI 还可以以更高级的方式用于管理进程拓扑。具体而言,MPI 实现可以利用物理网络的特性(对于异构网络),例如通过优先使用最快的连接来传输消息。

基本函数的原型

[edit | edit source]

有关这些 MPI 函数的更多信息,请查阅手册页。

int MPI_Init(int *argc, char ***argv);
int MPI_Finalize(void);
int MPI_Comm_rank(MPI_Comm comm, int *rank);
int MPI_Comm_size(MPI_Comm comm, int *size);
int MPI_Send(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm);
int MPI_Recv(void* buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status);
int MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count);
int MPI_Type_extent(MPI_Datatype datatype, MPI_Aint *extent);
int MPI_Type_struct(int count, int *array_of_blocklengths, MPI_Aint *array_of_displacements, MPI_Datatype *array_of_types, MPI_Datatype *newtype);
int MPI_Bcast(void* buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm);
int MPI_Scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvcount, int root, MPI_Comm comm);
int MPI_Gather(void* sendbuf, int sendcount, MPI_Datatype sendype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm);
int MPI_ISend(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request);
int MPI_Irecv(void* buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request *request);
int MPI_Sendrecv(void* sendbuf, int sendcount, MPI_Datatype datatype, int dest, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status *status);
int MPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype, int dest, int sendtag, int source, int recvtag, MPI_Comm comm, MPI_Status *status);
int MPI_Wait(MPI_Request *request, MPI_Status *status);
int MPI_Request_free(MPI_Request *request);
int MPI_Group_rank(MPI_Group group, int *rank);
int MPI_Group_size(MPI_Group group, int *size);
int MPI_Comm_group(MPI_Comm comm, MPI_Group *group);
int MPI_Group_free(MPI_Group *group);
int MPI_Group_incl(MPI_Group *group, int n, int *ranks, MPI_Group *newgroup);
int MPI_Comm_create(MPI_Comm comm, MPI_Group group, MPI_Comm *newgroup);
int MPI_Comm_split(MPI_Comm comm, int color, int key, MPI_Comm *newcomm);
int MPI_Wtime(void);
int MPI_Get_processor_name(char *name, int *resultlen);

与 C 语言中的一般用法相同,"数组的地址"实际上是指数组第一个元素的地址。

先决条件

[edit | edit source]

进一步阅读

[edit | edit source]
华夏公益教科书