Pages

Parallel Computing with MPI - Part IX: MPI Warmup

[PLEASE NOTE, THAT DUE TO TIME RESTRICTIONS, I WILL NOT BE ABLE TO PUBLISH THE FULL MPI WARMUP TUTORIAL IMMEDIATELY, THEREFORE, I WILL PUT THE PARTS ONE AT A TIME. SO KEEP CHECKING THIS POST. THANK YOU FOR READING THIS BLOG.]

In this article, I will go over several examples of how to use some of the MPI functions discussed in the previous post. I will start by discussing the essential parts of every MPI program and then go over more specific examples on how to deal with synchronous and asynchronous communcation.

I will illustrate this using Visual Studio and MPICH2. Please bear in mind that the codes are universal, and if you are a UNIX user, then you will have no problem compiling the codes presented hereafter. I chose Visual Studio because I am currently satisfied with the approach that Microsoft is using regarding programming, open source codes, and the .Net technology. That's my biased opinion, but whether you like it or not, Visual Studio is the most versatile and easy to use IDE and compiler on the planet! and the best of all, you can get it for free!

First, lets get visual studio ready. If you have not done so, please refer to "MPICH2 and Visual Studio" article.

Now start visual studio and create a new C++ EMPTY project (Win32 console application). Call it whatever you want, say MPIWarmup. Then, in the folder browser, right click on "Source files" and create a new C++ source file. Call it "main.cpp". Now we are ready to get started with MPI programming.

First Principles: Your First Parallel Program!


In science, a strong theory is always derived from first principles. These are essentially the conservation laws of mass, momentum, energy etc... In MPI, there are also some first principles to make a code run in parallel.
First of all, before you do anything, you have to include the mpi header file, i.e.

#include <mpi.h>


Every MPI program has to start and end with two very specific functions, respectively. These are:

int MPI_Init(int *argc, char ***argv)
int MPI_Finalize()


all the programming will take place between these two functions. So, here's how it would look like in an actual program:
#include <mpi.h>
#include <stdio.h>
int main(int argc, char* argv[]) {
  // initialize the MPI world
  MPI_Init(&argc,&argv);
  // each processor will print "Bonjour Y'all!"
  printf("Bonjour Y'all! \n");
  // shutdown the MPI world before exiting
  MPI_Finalize();
  return 0;
}
To run this program, you need to start a command prompt, and browse to the directory where your code was compiled. Usually, this would be the debug folder of your project directory. Here's a video that summarizes the above.


Here, I ran two instances of the code, that's why you see the message "Bonjour Y'all!" printed twice. This is the simplest MPI program ever, and it does not have any kind of messages sent back and forth of course.

Now let us improve our program a little bit. How about adding the processor "rank" or ID to the printed message. To do this, each processor has to call the MPI rank function. Here's how it is usually done:
#include <mpi.h>
#include <stdio.h>
int processorID;
int numberOfProcessors;

int main (int argc, char* argv[]) {
  MPI_Init(&argc, &argv);
  // get the size of the cluster and store in numberOfProcessors
  MPI_Comm_size(MPI_COMM_WORLD,&numberOfProcessors);
  // get the rank or the number of the current process and store it in processorID
  MPI_Comm_rank(MPI_COMM_WORLD,&processorID);
  printf("I am processor %d - Bonjour Y'all! \n", processorID);
  MPI_Finalize();
  return 0;
}

This program will output the processor number as well as the Bonjour Y'all message. When debugging parallel codes, including the processor number in your print statements is invaluable because it help you pinpoint where the code is breaking. You will find it very useful to place the above piece of code (without the printf statement of course) in all your parallel programs.

Notice that we have used something called MPI_COMM_WORLD in two of the function calls. This is simply the default communicator that is create when you initialize the MPI world. Communicators are like a web that connects different computers together. Sometimes, you might want to define a new communicator that links only a subset of your parallel computer. I have never used that though.

Also, notice the use of the ampersand (&) in front of the numberOfProcessors and processorID variables. By definition, variables in MPI are passed by address because the data is being copied from one memory address to another. I will not get into the details of this, but remember, you can never send a pointer in MPI.

Let us assume now that you want the MASTER node to print something different than the compute nodes. This can be easily done now that we have the processorID at hand. Of course, the root node has processorID "0" by definition. So, in your code, you could include an if statement that checks if the current processorID is zero or not. However, there's a more elegant way of doing it by using some "define" directives as allowed in the C language. Since we're at it, let us also redefine a couple of global variables:

#include <mpi.h>
#include <stdio.h>
#define MASTER (processorID == 0)
#define NODE (processorID != 0)
#define WORLD MPI_COMM_WORLD

int processorID;
int numberOfProcessors;

int main (int argc, char* argv[]) {
  MPI_Init(&argc, &argv);
  MPI_Comm_size(WORLD,&numberOfProcessors);
  MPI_Comm_rank(WORLD,&processorID);
  if (MASTER)
    printf("Master node says: Bonjour Y'all! \n");
  else
    printf("Compute node says: Chillax! \n");
  MPI_Finalize();
  return 0;
}
I think the above code is clear enough as to what it does.

Peer to Peer Communication

It is time now to start experimenting with send and receive operations. As was mentioned in the previous post, there are two kinds of peer-to-peer communcation, i.e. blocking and immediate. We'll start with the blocking send and receive calls. The anatomy of the send function is
int MPI_Send( void *buf, int count, MPI_Datatype datatype, int dest,
int tag, MPI_Comm comm )

where:
- buf: memory address of the data to be sent
- count: number of elements in send buffer
- datatype: datatype of each send buffer element
- dest: rank of the destination processor
- tag: message tag - useful to identify what is coming from where
- comm: communicator - usually MPI_COMM_WORLD

The receive function call has a similar anatomy as follows:
int MPI_Recv( void *buf, int count, MPI_Datatype datatype, int source,
int tag, MPI_Comm comm, MPI_Status *status )

where:
- buf: memeory address of receive buffer
- status: status object - detects the status of the receive operation
- count: maximum number of elements in receive buffer
- datatype: datatype of each receive buffer element
- source: rank of source processor
- tag: message tag - this should match the tag of the send function
- comm: communicator - usually MPI_COMM_WORLD

Here's an example: assume that the master node has a variable that it wants to send to processor 2 (i.e. rank 1). This could be done by using two if statements as follows:
#include <mpi.h>
#include <stdio.h>
#define MASTER (processorID == 0)
#define NODE (processorID != 0)
#define WORLD MPI_COMM_WORLD

int processorID;
int numberOfProcessors;

int main (int argc, char* argv[]) {
  int someVariable = 0;
  MPI_Init(&argc, &argv);
  MPI_Comm_size(WORLD,&numberOfProcessors);
  MPI_Comm_rank(WORLD,&processorID);
  // set the value of someVarialbe on the Master node only
  if (MASTER) someVariable = 10;
  // print the value of someVariable on all processors - this should be zero for all
  //nodes except the master
  printf("Before Send - Processor %i - someVariable = %i  \n",processorID,someVari  able);
  if (MASTER) {
    // send someVariable to processor 1, with the tag being
    //the rank of the source processor, i.e. 0
    MPI_Send(&someVariable,1,MPI_INT,1,0,WORLD);
  }
  if (processorID == 1) {
    // allocate a status variable
    MPI_Status theStatus;
    // initiate the receive operation from proccor 0.
    MPI_Recv(&someVariable,1,MPI_INT,0,0,WORLD,&theStatus);
  }
  printf("After Send - Processor %i - someVariable = %i \n",processorID,someVariab  le);
  MPI_Finalize();
  return 0;
}


Voila! To send an array, all you need to do is use the array variable name instead of someVariable. Of course, you have to remove the ampersand if your array was declared as a pointer. If your array is declared as a fixed array, then you can keep the ampersand. I never used fixed array unless it is really necessary for internal code management. A code that does not interact with the user, i.e. a code that is not dynamic by nature is useless. That defeats the whole purpose of programming! So here's an example with an array declared using pointers.

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#define MASTER (processorID == 0)
#define NODE (processorID != 0)
#define WORLD MPI_COMM_WORLD

int processorID;
int numberOfProcessors;

int main (int argc, char* argv[]) {
  int arraySize = 0; /* the arraySize variable will be entered at the command line */
  MPI_Init(&argc, &argv);
  MPI_Comm_size(WORLD,&numberOfProcessors);
  MPI_Comm_rank(WORLD,&processorID);
  /* only one node (usually the master node) should read from the command line */
  if (MASTER) {
    printf("Please enter the array size: \n");
    fflush(stdout);
    scanf("%i",&arraySize);
  }
  /* after the arraySize is read on the master node, the other nodes need to
  have that value locally. at this point, the arraySize = 0 on all nodes
  except the master node where it is equal to the value entered at the cmd.
  a simple broadcast call will send the arraySize from the root to all other 
nodes  */
  MPI_Bcast(&arraySize,1,MPI_INT,0,WORLD);
  /* declare a new array using malloc */
  float* someArray = (float*) malloc(arraySize*sizeof(float));
  /* initialize the newly declared array*/
  for (int i = 0; i < arraySize; i++) someArray[i] = 0.0;

  /* print the newly declared array */
  if(processorID == 1) {
    for (int i = 0; i < arraySize; i++)
    printf("someArray[%i] = %f \n", i, someArray[i]);
    printf("------------------------------------------\n");
  }

  /* change the values on the master node */
  if (MASTER)
    for (int i = 0; i < arraySize; i++) someArray[i] = 1.0*(i+1);

  if (MASTER) {
    /* send someArray to processor 1, with the tag being the
    rank of the source processor, i.e. 0 */
    MPI_Send(someArray,arraySize,MPI_FLOAT,1,0,WORLD);
  }
  if (processorID == 1) {
    // allocate a status variable
    MPI_Status theStatus;
    // initiate the receive operation from proccor 0.
    MPI_Recv(someArray,arraySize,MPI_FLOAT,0,0,WORLD,&theStatus);
  }

  /* print the values to make sure the array was correctly received */
  if(processorID == 1) for (int i = 0; i < arraySize; i++)
  printf("someArray[%i] = %f \n", i, someArray[i]);
  MPI_Finalize();
  return 0;
} 



Note that I have used a broadcast call after the root node reads the array size from the command line. I will explain collective communication very shortly.

So far, we've sent data from the root node to the first node. Of course, this can be done between any two processors. One processor may also send different arrays to different processors.

One subtle scenario occurs when all processors are sending and receiving data from each. This may lead to a deadlock if not properly handled using blocking communication. The safest bet in this case is to use asynchronous or immediate communication. I use block based communication when there is a single processor sending different data sets to all the other processors. For example, in CFD, a typical scenario is when the problem is read into the master node and partitioning is subsequently applied, on the master node. Then, the master has to send the partition information and data structures to its processor. This should be done using blocking send and receive. Now, when the processors start doing the computation, data has to be communicated at the partition interfaces. When the partition topology is simple (unidirectional for example) then blocking communication may be used. However, when the topology is complicated (see Fig. 1), then the easiest and safest way to pass on the data at the interface is by using immediate sends and receives.



Friday, July 4, 2008

Parallel Computing with MPI - Part VIII: Essential Ingredients of Message Passing

In this article, I will show you, in the most simple of terms, how easy it is to write parallel code using the message passing interface. There will be no coding at this stage, only the basic concepts.

The most fundamental concept in parallel computing is that processor A has to send a message that contains some data (e.g. an array of values) to processor B, and processor B has to receive that message. That's all!

This can be done by using what is called peer-to-peer or p2p communication. I'm pretty sure this sounds familiar! Another way of sending data is called collective communication. In this case, one processor will send data to all remaining processors.

As you will notice, almost 85% of the parallel code that you will write will be comprised of p2p and collective communication calls. So, it is logical to discuss these two modes of communication first.

The remaining 15% (and these numbers are speculations based on the parallel codes I have been involved in designing) go to datatype creation, topology, and miscellaneous functions. These are slightly advanced concepts that will not be discussed in this article. However, MPI datatype creation will be part of the parallel heat equation. So don't worry, you'll get a chance to see how that works!

ROOT & NODE
In a distributed memory parallel computer, a ROOT processor refers to the processor from which the job (or code) is executed. It acts as the job manager for all other processors. This is usually the most powerful machine in your system. All pre and post processing is usually done ont he ROOT.

Conversly, a NODE is any other processor that is not a ROOT.

In MPI, each processor has a "rank" or simply a number assigned to it. By definition, the ROOT processor has rank 0 while the other processors are numbered sequentially.

Peer to Peer (P2P) Communication
In P2P communication, processor A initiates a Send operation to processor B, while processor B initiates a Receive operation from processor A. These have to be matched to avoid what is called a deadlock. We'll discuss that shortly and learn how to overcome it.

There are two ways of initiating send and receive operations:
  1. Blocking
  2. Immediate or non-blocking
Blocking Communication

In a blocking send (receive), the processor "waits" for the message to be sent (received), i.e. the processor stops at that line of code and waits for the message to be sent (received). Imagine the processor is executing the code line by line. When it arrives at a blocking send (receive) it just waits there for the message to sent (received). The nuance here is that the message cannot be sent unless it is ready to be received. So the sending processor will wait until the receiving processor reaches the receive function call and starts receiving the message.


If not properly implemented, this can cause a deadlock, a situation in which a processor cannot send a message because it cannot be received by another processor. Here's a famous scenario.

Consider a parallel code running on four processors, numbered 1, 2, 3 and 4. Also, let each processor send a message to the next one, i.e. 1 -> 2, 2 -> 3, 3 -> 4, and 4 -> 1. For the sake of illustration, let us start by analyzing what processor 1 is doing. When the code reaches the send function, processor 1 starts to prepare the data to be sent to processor 2, but, processor 2 cannot receive this message because it sending a message to processor 3. However, processor 3 cannot receive this message because it is sending a message to processor 4, and so on. So all processors will wait (forever) on that send call because none of them has initiated any receive yet. They are all busy waiting for the receiving processor to start receiving the message.

Immediate Communication

Immediate communication is the opposite of blocking communication. In this case, the processor does not wait for the message to be received. It sends it over the network and continues to the next line of code. This mode of communication immediately avoids any deadlock problems.


But, it can cause premature ending of the code because of data mismatch or loss over the network. This can be simply overcome by issuing a "wait" function that tells all processors to wait until all data has been sent and received. This will be shown in the parallel implementation of the heat equation.

Collective Communication
In collective communication, all processors are involved in the communication process. There are several instances where you need collective communication. For example, if a processor wants to tell all other processors that it finished computing something, this can be done by using collective communication (instead of writing a loop of send and receive operations).

The most important functionalities of collective communication as defined by MPI are:
  1. Broadcast
  2. Gather
  3. Reduce
Broadcast

A broadcast operation consists of sending data from one processor to the others.


Gather

A gather operation consists of gathering values from a group processors and doing something with them. For example, the ROOT processor might want to gather the solution from each processor to put them in one final array.

Reduce

A reduce operation consists of collecting values from all processors and applying an algebraic (sum, product etc...) or Boolean (maximum, minimum) operation on those values. The resulting reduced data is then stored in one processor.


In the next post, we'll start a (long) MPI warmup to get you familiar with the functions presented in this article as well as some programming intricacies of MPI.

Wednesday, July 2, 2008

Parallel Computing with MPI - Part VII: Measuring Parallel Performance

The most obvious benefit of using parallel computing is the reduction in execution time of your code. For example, if your code takes two hours to run on a single processor, it would (theoretically) take one hour to run on two processors, 30 minutes to run on four processors, and 2/N hours to run on N processors. However, there are two delicate issues to address here.

First, you must distinguish between CPU time and wall clock time.

CPU vs Clock Time

CPU time is the time required by the central processing unit to process instructions. It does not involve input/output waiting times (for example, when the code is waiting for the user to input some data and the user is in taking a break).

On the other hand, wall clock time is the time taken to run a code as measured by a regular clock. This will include all kinds of input/output operations and any processor idle time. Also, if you are running 10 applications alongside your code, the wall clock time for your code will be obviously larger than if it was run only by itself.

CPU time is usually used to benchmark your code, but in reality, it is the wall clock time that really counts because if you start your code in the morning, and finishes by early evening, then that's when it really finished executing, not earlier as would most likely be reported by the CPU time.

I personally use both methods to time my parallel codes. Ideally, if your code is running on a dedicated parallel machine then the wall clock time will be somehow close to the CPU time.

Speedup

The other issue I would like to address is the actual reduction in running time, usually measured by the speedup. The speedup is defined as


Theoretically, your speedup should be equal to N, i.e. if your codes runs in T seconds, then it takes it T/N seconds to run on N processors. Then, an ideal speedup would be a straight line at a 45 degrees angle
(Fig. 1)

The speedup curve may also give information on the scalability of your code. A code is scalable, in the parallel sense, when the speedup does not drift away from the ideal curve.

In practice, the speedup is not always ideal or even linear. The deterioration in scalability has several reasons that are beyond the scope of this article. A rule of thumb is to try to keep the load on each processor balanced. The more processors you want to use, the larger your problem data structures should be. You will learn this by practice.

Sometimes, you will see super-linear speedup, i.e. S(N) > N. This usually happens because the parallel code is sometimes more efficient that its sequential counterpart.

Efficiency

The efficiency of parallel code is a measure of how much time the processors are actually being used to execute the program. This may be written as

(Eq. 2)

When inserting Eq. 1 into Eq. 2, one arrives at


For instance, if the efficiency is 80%, then the processors are being used 80% of the time to do the actual computation and 20% of the time being idle.

In the next post, I will discuss the essential ingredients needed in the message passing paradigm. You will see that only a few functions are required to write a parallel code using message passing.