/*
  PSORT   (P(arallel)SORT)
  Jim Garlick
  
  The purpose of this program is to sort a list of numbers in parallel and then
  find the median in the list. The program is divided into two sections: parent
  and child. The parent is in charge of reading in the numbers, starting the first
  layer of children, and outputting the resulting median. The children receive from
  the parent and spawn some children of their own and then receive those responses.
  The program can be thought of as being recursive: the parent creates a child copy
  of itself, which in turn creates its own children.
*/

#include <stdio.h>
#include <math.h>
#include "pvm3.h"
#include "mergesort.h"

#define BINNAME "psort"
#define TO_MASTER 1
#define TO_SLAVE 2

void process(float *array, int n, int level);
void sendToChild (int level, int length, float *array);
void copyArray(float *to, float *from, int n);

int main (int argc, char* argv[]) {

  char filename[256], tmp[100];
  FILE *input, *output;
  int i;

  /* Pointer that will contain the entire data set array */
  float *mainData, median;
  int n, level;
  
  /* get the parent's id (if there is one) */
  int parent_id = pvm_parent();
  
  /* If there is no parent */
  if (parent_id == PvmNoParent) {
    
    /* Get info from command line */
    if (argc < 3) {
      printf("usage: %s <max level> <input filename> [<output filename>]\n", BINNAME);
      exit(1);
    }
    level = atoi(argv[1]);
    strcpy(filename, argv[2]);
    
    /* Open the file for reading */
    if ((input = fopen(filename, "r")) == NULL) {
      fprintf(stderr, "Error opening %s\n", filename);
      exit(1);
    }
    
    /* Count the number of numbers in the file */
    for(n=0; fgets(tmp, sizeof(tmp), input); n++);
    rewind(input);
    printf("Sorting %d numbers.\n", n);
    
    /* Allocate memory for the array */
    mainData = (float*)malloc(n * sizeof(float));
    
    /* Read in the numbers */
    for(i=0; fgets(tmp, sizeof(tmp), input); i++)
      sscanf(tmp, "%f", &mainData[i]);
    
    fclose(input);
    
    /* Sort n elements of mainData splitting it level times */
    process(mainData, n, level);

    /* Save the sorted data to a file if it's given */
    if (argc == 4) {
      output = fopen(argv[3], "w");

      printf("Saving sorted data to %s\n", argv[3]);
      for (i=0; i<n; i++)
	fprintf(output, "%f\n", mainData[i]);

      fclose(output);
    }

    /* Print out the median */
    printf("%d\n", n%2);
    if (n%2)    /* if n is odd */
      median = mainData[(n/2)];
    else
      median = (mainData[(n/2)-1] + mainData[(n/2)]) / 2;
    printf("Median: %f\n", median);
  } 

  /* If the process does have a parent */
  if (parent_id != PvmNoParent) {

    /* Receive from the parent */
    printf("Receiving from Parent...\n");
    pvm_recv(parent_id, TO_SLAVE);
    
    /* Unpack the data */
    printf("Unpacking...\n");
    pvm_upkint(&level, 1, 1);
    pvm_upkint(&n, 1, 1);
    mainData = (float*) malloc(n*sizeof(float));
    pvm_upkfloat(mainData, n, 1);

    /* Sort n elements of mainData splitting it level times */
    printf("Processing\n");
    process(mainData, n, level);
    
    /* Repack the sorted data */
    printf("Packing...\n");
    pvm_initsend(PvmDataDefault);
    pvm_pkint(&n, 1, 1);
    pvm_pkfloat(mainData, n, 1);

    /* Send it back to the parent */
    printf("Sending...\n");
    pvm_send(parent_id, TO_MASTER);
  }

  /* Quit PVM and the program */
  pvm_exit();
  exit(0);
}

void process(float *whole, int n, int level) {
/*
  process() is where the work happens in both the parent and the children. The 
  function starts by decrementing the level  spawning that many children each 
  with half of the original (whole) array.
*/
  float *temp;
  int i, numSpawned, newN;
  
  /* Allocate memory for a temporary array */
  temp = (float*) malloc(n*sizeof(float));

  for (i=(level-1), numSpawned=0; (i>=0 && n>5); i--, numSpawned++) {
    /* Split the array */
    newN = (n/2);

    printf("Level: %d :: ", i);
    printf("Keeping %d, Sending %d\n", newN, (n-newN));

    /* Send to the child, the level, the length of the new array and the second 
       half of the array */
    sendToChild(i, (n-newN), &whole[newN]);
    n = newN;
  }
  
  printf("Lowest level reached with %d spawned.\n",numSpawned);

  /* Then we process what's left over */
  printf("Sorting %d remaining elements.\n", n);
  quicksort(whole, 0, n);

  /* Now we start receiving from the children... */
  for (i=0; i<numSpawned; i++) {
    /* Receive from any child */
    pvm_recv(-1, TO_MASTER);

    /* 
       The following statements require some explanation.
       I didn't want to use three arrays (first half, second half, destination).
       Instead, I reused the {whole} array by putting the second half of the 
       array back into it.
    */

    /* Unpack the number of numbers in the array */
    pvm_upkint(&newN, 1, 1);
    /* Put the numbers in the {whole} array starting at the end. */
    pvm_upkfloat(&whole[n], newN, 1);
    printf("Recieved %d numbers. ", newN);

    /* Merge the two arrays (using the same array, but partitioning them) */
    merge(whole, n, &whole[n], (newN), temp);
    printf("Merged array is %d long.\n", (newN+n));

    /* Replace the whole array with the temp array */
    n += newN;
    copyArray(whole, temp, n);
  }
}

void sendToChild (int level, int length, float *array) {
/* 
   sendToChild's purpose is to simplify some code from above. I wanted
   to add some error checking to the program because there is a possibility
   that the user could create an unreasonable number of children.
*/
   
  int tid, n, info;
  
  /* Spawn a child (with some error checking */
  if ((pvm_spawn(BINNAME, 0, PvmTaskDefault, "", 1, &tid)) <= 0) {
    fprintf(stderr, "Could not spawn child...exiting.\n");
    printf("Error Code: %d\n", tid);
    exit(1);
  }
  
  /* Send the data to the child */
  pvm_initsend(PvmDataDefault);
  pvm_pkint(&level, 1, 1);
  pvm_pkint(&length, 1, 1);
  pvm_pkfloat(array, length, 1);
  info = pvm_send(tid, TO_SLAVE);
  
  /* More error checking */
  if (info < 0) {
    pvm_perror("");
    exit(1);
  } 
}

void copyArray(float *to, float *from, int n) {
/* 
   copyArray does just that. It copies n elements from the from array
   and puts them into the to array.
*/
  int i;

  for (i=0; i<n; i++)
    to[i] = from[i];
}
