Example 1.3 Integration with MPI Nonblocking Communications
Until a matching receive has signaled that it is ready to receive, a blocking send will continue to wait. In situations where work following the send does not overwrite the send buffer (i.e., array waiting to be sent), it might be more efficient to use nonblocking send so that work following the send statement can start right away while the send process is pending. Similarly, a nonblocking receive could be more efficient than its blocking counter-part if work following MPI_Recv does not depend on the safe arrival of the receive buffer.
In this example, the point-to-point blocking MPI_Send used in the preceding example is replaced with the nonblocking MPI_Isend subroutine to enable work that follows it to proceed while the send process is waiting for its matching receive process to respond.
Example 1.3 Fortran Code
Program Example1_3
c#######################################################################
c# #
c# This is an MPI example on parallel integration to demonstrate the #
c# use of: #
c# #
c# * MPI_Init, MPI_Comm_rank, MPI_Comm_size, MPI_Finalize #
c# * MPI_Recv, MPI_Isend, MPI_Wait #
C# * MPI_ANY_SOURCE, MPI_ANY_TAG #
c# #
c# Dr. Kadin Tseng #
c# Scientific Computing and Visualization #
c# Boston University #
c# 1998 #
c# #
c#######################################################################
implicit none
integer n, p, i, j, proc, ierr, master, myid, tag, comm, request
real h, a, b, integral, pi, ai, my_int, integral_sum
include "mpif.h" ! brings in pre-defined MPI constants, ...
integer status(MPI_STATUS_SIZE) ! size defined in mpif.h
data master/0/ ! processor 0 collects integral sums from other processors
comm = MPI_COMM_WORLD
call MPI_Init(ierr)
! starts MPI
call MPI_Comm_rank(comm, myid, ierr)
! get current proc ID
call MPI_Comm_size(comm, p, ierr)
! get number of procs
pi = acos(-1.0) ! = 3.14159...
a = 0.0 ! lower limit of integration
b = pi/2. ! upper limit of integration
n = 500 ! number of increments in each partition
tag = 123 ! tag is additional way to identify a message
h = (b-a)/n/p ! length of increment
ai = a + myid*n*h ! lower limit of integration for partition myid
my_int = integral(ai, h, n)
! integral on processor myid
write(*,*)'myid=',myid,', my_int=',my_int
if(myid .eq. master) then ! the following is serial
integral_sum = my_int
do proc=1,p-1
call MPI_Recv(
& my_int, 1, MPI_REAL,
& MPI_ANY_SOURCE,
! message source
& MPI_ANY_TAG,
! message tag
& comm, status, ierr)
! status identifies source, tag
integral_sum = integral_sum + my_int
enddo
write(*,*)'The Integral =', integral_sum ! sum of my_int
else
call MPI_Isend(
& my_int, 1, MPI_REAL,
! buffer, size, datatype
& master, tag,
! destination and tag
& comm, request, ierr) ! get handle for MPI_Wait to check status
call other_work(myid) ! because of Isend, gets here immediately
call MPI_Wait(request, status, ierr)
! block until Isend is done
endif
call MPI_Finalize(ierr)
! let MPI finish up ...
end
subroutine other_work(myid)
implicit none
integer myid
write(*,"('more work on process ',i3)") myid
return
end
real function integral(ai, h, n)
implicit none
integer n, j
real h, ai, aij
integral = 0.0 ! initialize integral
do j=0,n-1 ! sum integrals
aij = ai +(j+0.5)*h ! abscissa mid-point
integral = integral + cos(aij)*h
enddo
return
end
Example 1.3 (C code)
#include <mpi.h>
#include <math.h>
#include <stdio.h>
/* Prototype */
void other_work(int myid);
float integral(float ai, float h, int n);
int main(int argc, char* argv[])
{
/*######################################################################
# #
# This is an MPI example on parallel integration to demonstrate the #
# use of: #
# #
# * MPI_Init, MPI_Comm_rank, MPI_Comm_size, MPI_Finalize #
# * MPI_Recv, MPI_Isend, MPI_Wait #
# * MPI_ANY_SOURCE, MPI_ANY_TAG #
# #
# Dr. Kadin Tseng #
# Scientific Computing and Visualization #
# Boston University #
# 1998 #
# #
#####################################################################*/
int n, p, myid, tag, master, proc, ierr;
float h, integral_sum, a, b, ai, pi, my_int;
MPI_Comm comm;
MPI_Request request;
MPI_Status status;
comm = MPI_COMM_WORLD;
ierr = MPI_Init(&argc,&argv);
/* starts MPI */
MPI_Comm_rank(comm, &myid);
/* get current process id */
MPI_Comm_size(comm, &p);
/* get number of processes */
master = 0;
pi = acos(-1.0); /* = 3.14159... */
a = 0.; * lower limit of integration */
b = pi*1./2.; /* upper limit of integration */
n = 500; /* number of increment within each process */
tag = 123; /* set the tag to identify this particular job */
h = (b-a)/n/p; /* length of increment */
ai = a + myid*n*h; /* lower limit of integration for partition myid */
my_int = integral(ai, h, n);
/* 0<=myid<=p-1 */
printf("Process %d has the partial result of %fn", myid, my_int);
if(myid == master) {
integral_sum = my_int;
for (proc=1;proc<p;proc++) {
MPI_Recv(
&my_int, 1, MPI_FLOAT,
/* triplet of buffer, size, data type */
MPI_ANY_SOURCE,
/* message source */
MPI_ANY_TAG,
/* message tag */
comm, &status);
/* status identifies source, tag */
integral_sum += my_int;
}
printf("The Integral =%fn",integral_sum); /* sum of my_int */
}
else {
MPI_Isend(
/* non-blocking send */
&my_int, 1, MPI_FLOAT, /* triplet of buffer, size, data type */
master,
tag,
comm, &request);
/* send my_int to master */
other_work(myid);
MPI_Wait(&request, &status);
/* block until Isend is done */
}
MPI_Finalize();
/* let MPI finish up ... */
}
void other_work(int myid)
{
printf("more work on process %dn", myid);
}
float integral(float ai, float h, int n)
{
int j;
float aij, integ;
integ = 0.0; /* initialize */
for (j=0;j<j++) { /* sum integrals */
aij = ai + (j+0.5)*h; /* mid-point */
integ += cos(aij)*h;
}
return integ;
}
Discussion
- A nonblocking MPI_Isend call returns immediately to the next statement without waiting for the task to complete. This enables other_work to proceed right away. This usage of nonblocking send (or receive) to avoid processor idling has the effect of “latency hiding,” where latency is the elapse time for an operation, such as MPI_Isend, to complete.
- Another performance enhancement parameter applied to this example is the use of MPI_ANY_SOURCE to specify message source. The wildcard nature of MPI_ANY_SOURCE enables the messages to be summed in the order of their arrival rather than any imposed sequence (such as the loop-index order used in the preceeding examples). It is important to note that summation is a mathematical operation that satisfies the associative and commutative rules
and hence the order in which the integral sums from processors are added is not pertinent to the outcome.
- Since MPI_ANY_SOURCE is used, the source where a message came from is not known explicitly. However, the status buffer returning from MPI_Recv contains useful information about the message. For example,
status(MPI_SOURCE)
returns the source (i.e., processor number) of the message in a fortran code whilestatus.MPI_SOURCE
returns source for a C code. - MPI_ANY_TAG is a constant pre-defined in mpif.h (or mpi.h for C). This represents a tag “wild card.” Generally, a tag is used as a secondary means to identify a message — the primary means is myid. An example that requires a tag in addition to myid is when multiple messages are passed between a pair of processors. Upon receive of these messages, if the receiver needs to distinguish the identities of them in order to place them or act on them accordingly, then tag can be used to differentiate the two messages. If a message’s tag is not know explicitly (because the message was sent via a nonblocking send), the tag can be retrieved via the status(MPI_TAG) for fortran and status.MPI_TAG for C.
Example 1 | Example 1.1 | Example 1.2 | Example 1.3 | Example 1.4 | Example 1.5