Task Farming & the Message Passing Interface
by Paulo Marques


Listing One

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

#define BUF_SIZE            80
int main(int argc, char* argv[])
{  
  int  id;                  // The rank of this process
  int  n_processes;         // The size of the world
  char buffer[BUF_SIZE];    // A message buffer

  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &id);
  MPI_Comm_size(MPI_COMM_WORLD, &n_processes);

  if (id == 0) {
    // Process 0 - Receive messages from all other processes
    for (int i=1; i<=n_processes-1; i++) {
      MPI_Recv(buffer, BUF_SIZE, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG, 
               MPI_COMM_WORLD, MPI_STATUS_IGNORE);
      printf("%s", buffer);
    }
  }
  else {
    // All other processes send a message to process 0
    sprintf(buffer, "Hello, I'm process %d\n", id);
    MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
  }
  MPI_Finalize();
  return 0;
}


Listing Two

// Solve the NQueens problem for the <size> NxN board
int n_queens(int size)
{
  int board[size];
  int n_solutions = place_queen(0, board, size);
  return n_solutions;
}
int place_queen(int column, int board[], int size)
{
  int n_solutions = 0;
  // Try to place a queen in each line of <column>
  for (int i=0; i<size; i++) {
    board[column] = i;
    // Check if this board is still a solution
    bool is_sol = true;
    for (int j=column-1; j>=0; j--) {
      if ((board[column] == board[j])               ||
          (board[column] == board[j] - (column-j))  ||
          (board[column] == board[j] + (column-j))) {
        is_sol = false;
        break;
      }
    }
    if (is_sol) {                    // It is a solution!
      if (column == size-1) {
        // If this is the last column, printout the solution
        ++n_solutions;
        print_solution(board, size);
      } else {
        // The board is not complete. Try to place the queens
        // on the next level, using the current board
        n_solutions+= place_queen(column+1, board, size);
      }
    }
  }
  return n_solutions;
}


Listing Three
// The master process (returns the number of solutions found)
int master(int size)
{
  int board[size];
  int n_solutions;
  n_solutions = master_place_queen(0, board, size);
  n_solutions+= wait_remaining_results();
}
int master_place_queen(int column, int board[], size)
{
  int n_solutions = 0;
  for (int i=0; i<size; i++) {
    // Place the queen on the correct line of <column>
    board[column] = i;
    // Check if this board is still a solution
    bool is_sol = true;
    for (int j=column-1; j>=0; j--) {
      if ((board[column] == board[j])               ||
          (board[column] == board[j] - (column-j))  ||
          (board[column] == board[j] + (column-j))) {
        is_sol = false;
        break;
      }
    }
    if (is_sol) {                    // If it is a solution...
      if (column == GRANULARITY-1) {  
        // If we are at the last level (granularity of the job), 
        // this is a job for sending to a worker
        n_solutions+= send_job_worker(board, size);
      }
      else {
        // Not in the last level, try to place queens in the 
        // next one using the current board
        n_solutions+= master_place_queen(column+1, board, size);
      }
    }
  }
  return n_solutions;
}


Listing Four
int send_job_worker(int board[], int size)
{
  int n_solutions = 0;      // The number of solutions found meanwhile
  job to_do;                // The job to do
  // Build the job
  to_do.type = DO_WORK;
  for (int i=0; i<GRANULARITY; i++)
    to_do.board[i] = board[i];
  // Receive the last result from a worker
  worker_msg msg;
  MPI_Recv(&msg, sizeof(msg), MPI_BYTE, MPI_ANY_SOURCE, 
           MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  n_solutions = msg.solutions_found;
  // Send the new job to the worker
  MPI_Send(&to_do, sizeof(to_do), MPI_BYTE, msg.origin, 0, MPI_COMM_WORLD);
  return n_solutions;
}


Listing Five
int wait_remaining_results()
{
  int n_solutions = 0;
  // Wait for remaining results, sending a quit whenever a new result arrives
  job byebye;
  byebye.type = QUIT;
  while (n_workers > 0)
  {
    // Receive a message from a worker
    worker_msg msg;
    MPI_Recv(&msg, sizeof(msg), MPI_BYTE, MPI_ANY_SOURCE, 
             MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    n_solutions+= msg.solutions_found;
    MPI_Send(&byebye, sizeof(byebye),MPI_BYTE,msg.origin, 0, MPI_COMM_WORLD);
    --n_workers;
  }
  return n_solutions;
}

Listing Six 
void worker(int size)
{
  int n_solutions;
  // There is a default message named ask_job which lets a worker request a 
  /// job reporting the number of solutions found in the last iteration
  worker_msg ask_job;
  ask_job.origin          = id;
  ask_job.solutions_found = 0;
  // Request initial job
  MPI_Send(&ask_job, sizeof(ask_job), MPI_BYTE, MASTER, 0, MPI_COMM_WORLD);
  while (true) {
    // Wait for a job or a quit message
    job work_to_do;
    MPI_Recv(&work_to_do, sizeof(work_to_do), MPI_BYTE, MPI_ANY_SOURCE, 
             MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    if (work_to_do.type == QUIT)
      break;
    n_solutions = worker_place_queen(GRANULARITY, work_to_do.board, size);
    // Ask for more work
    ask_job.solutions_found = n_solutions;
    MPI_Send(&ask_job, sizeof(ask_job), MPI_BYTE, MASTER, 0, MPI_COMM_WORLD);
  }
}





4


