#include "mergesortlib.h"
#include <stdlib.h>
#include "pvm3.h"

int getFileLength(char * filename);
int isParent(char * filename,int NumberOfLevels);
int isChild(void);



/* program enterence */
void main(int argc,char *argv[]){
  if (pvm_parent() < 0){
    if (argc  == 2){
      isParent(argv[1],4);
      exit(0);
    }
    else if (argc  == 3){
      isParent(argv[1],atoi(argv[2]));
      exit(0);
    }
    else {
      printf("Usage: pvm_mergesort <datafile> [Number of levels (defaults to 4)]\n");
      exit(-1);
    }
  }
  else 
    isChild();
  exit(0);
}
/*******************************/
/* This is the parent function */
/*******************************/
int isParent(char * filename, int NumberOfLevels){
  FILE * infile;
  int NumberOfValues = 0;
  float * firstHalf;
  float * secondHalf;
  float * wholeList;
  int * TaskIDs;
  int * TaskID;
  int CurrentLevel = 0;
  int NumberInFirstHalf = 0;
  int NumberInSecondHalf = 0;
  int midpoint = 0;
  int i = 0;
  
  //pvm_joingroup("pvm");
  //get number of values in file
  NumberOfValues = getFileLength(filename);

  //set some values
  midpoint = NumberOfValues/2;
  NumberInFirstHalf = midpoint;
  NumberInSecondHalf = NumberOfValues - midpoint;

  // allocate an array to hold all the task ID's 
  TaskIDs = (int *)malloc(sizeof(int)*(NumberOfLevels));
  // this gives pvm_spawn something to store a tid in
  TaskID = (int *)malloc(sizeof(int)*(1));
  
  // allocate some arrays to hold the lists 
  firstHalf = (float *)malloc(sizeof(float)*(NumberOfValues+1));
  secondHalf = (float *)malloc(sizeof(float)*(NumberOfValues/2+1));
  wholeList = (float *)malloc(sizeof(float)*NumberOfValues+1);

  //read data into arrays
  infile = fopen(filename,"r");
  for (i = 0; i < midpoint; i++)
    fscanf(infile,"%f\n",&firstHalf[i]);
  for (i = midpoint; i < NumberOfValues; i++)
    fscanf(infile,"%f\n",&secondHalf[i-midpoint]);
  
  printf("NumberOfValues: %d NumberOfLevels: %d\n",NumberOfValues,NumberOfLevels);

  for (CurrentLevel = 0; CurrentLevel < NumberOfLevels;CurrentLevel++){
    //spawn a process for this level
    if ( pvm_spawn("pvm_mergesort",(char**)0, 0, "",1 ,TaskID )< 1){
      printf("Couldn't get enough Children @ level %d\n",CurrentLevel);
      exit(-1);
    }
    //set array of task ID's 
    TaskIDs[CurrentLevel] = TaskID[0];
    printf("CurrentLevel: %d NumberInFirstHalf: %d NumberInSecondHalf: %d\n",
	   CurrentLevel,NumberInFirstHalf,NumberInSecondHalf);
    //init message buffer
    pvm_initsend(PvmDataDefault);
    //pack local TID & current Level (sneakyness abounds!!!)
    pvm_pkint(&CurrentLevel,1,1);
    //pack number of levels
    pvm_pkint(&NumberOfLevels,1,1);
    //pack length of secondHalf
    pvm_pkint(&NumberInSecondHalf,1,1);
    //pack secondHalf
    pvm_pkfloat(secondHalf,NumberInSecondHalf,1);
    //send message
    pvm_send(TaskID[0],1);
    
    //split the list if we're going to send more
    if (CurrentLevel+1 < NumberOfLevels){
      midpoint = NumberInFirstHalf /2;
      NumberInSecondHalf = NumberInFirstHalf - midpoint;
      NumberInFirstHalf = midpoint;
      for (i = midpoint; i < NumberInSecondHalf+midpoint; i++)
	secondHalf[i-midpoint] = firstHalf[i];
    }
  }
  //Sort all of the incoming chuncks
  for (CurrentLevel = 0; CurrentLevel < NumberOfLevels; CurrentLevel++){
    quicksort(firstHalf,0,NumberInFirstHalf-1);
    //pvm_barrier("pvm", 1);
    pvm_recv(-1,-1);
    printf("RECEIVED a CHUNCK ");
    pvm_upkint(&NumberInSecondHalf,1,1);
    pvm_upkfloat(secondHalf,NumberInSecondHalf,1);
    quicksort(secondHalf,0,NumberInSecondHalf-1);
    merge(firstHalf,NumberInFirstHalf,secondHalf,NumberInSecondHalf,wholeList);
    printf("Number Received = %d Merge length = %d\n",NumberInSecondHalf,NumberInFirstHalf+NumberInSecondHalf);
    //move values from whole list to First half
    NumberInFirstHalf = NumberInFirstHalf + NumberInSecondHalf;
    for (i = 0; i < NumberInFirstHalf;i++){
      firstHalf[i] = wholeList[i];
    }
  }
  
  if (NumberOfValues % 2)
    printf("Odd Median = %f\n",firstHalf[NumberOfValues/2]);
  else 
    printf("Even Median = %f\n",(firstHalf[NumberOfValues/2] +firstHalf[(NumberOfValues/2)-1])/2 );
  
  return 1;
}




/******************************/
/* This is the child function */
/******************************/
int isChild(void){
  int NumberOfLevels = 0;
  int NumberOfValues = 0;
  int ThisCurrentLevel = 0;
  float * firstHalf;
  float * secondHalf;
  float * wholeList;
  int * TaskIDs;
  int * TaskID;
  int CurrentLevel = 0;
  int NumberInFirstHalf = 0;
  int NumberInSecondHalf = 0;
  int midpoint = 0;
  int i = 0;
  int result = 0;
  
  pvm_recv(-1,-1);
  pvm_upkint(&CurrentLevel,1,1);
  CurrentLevel = CurrentLevel+1;
  ThisCurrentLevel = CurrentLevel;
  
  //unpack number of levels
  pvm_upkint(&NumberOfLevels,1,1);
  
  //unpack length of secondHalf
  pvm_upkint(&NumberOfValues,1,1);
  
  // allocate an array to hold all the task ID's 
  TaskIDs = (int *)malloc(sizeof(int)*(NumberOfLevels));
  // this gives pvm_spawn something to store a tid in
  TaskID = (int *)malloc(sizeof(int)*(1));
  
  // allocate some arrays to hold the lists 
  firstHalf = (float *)malloc(sizeof(float)*(NumberOfValues+1));
  secondHalf = (float *)malloc(sizeof(float)*(NumberOfValues/2+1));
  wholeList = (float *)malloc(sizeof(float)*NumberOfValues+1);
  
  //unpack secondHalf
  pvm_upkfloat(firstHalf,NumberOfValues,1);

  //set some values
  midpoint = NumberOfValues/2;
  NumberInFirstHalf = midpoint;
  NumberInSecondHalf = NumberOfValues - midpoint;

  //build initial second half
  for (i = 0; i < NumberInSecondHalf;i++){
    secondHalf[i] = firstHalf[i+midpoint];
  }
 

  for (CurrentLevel; CurrentLevel < NumberOfLevels;CurrentLevel++){
    if ( pvm_spawn("pvm_mergesort",(char**)0, 0, "",1 ,TaskID )< 1){
      exit(-1);
    }
    //set array of task ID's 
    TaskIDs[CurrentLevel] = TaskID[0];
    //init message buffer
    pvm_initsend(PvmDataDefault);
    //pack local TID & current Level (sneakyness abounds!!!)
    pvm_pkint(&CurrentLevel,1,1);
    //pack number of levels
    pvm_pkint(&NumberOfLevels,1,1);
    //pack length of secondHalf
    pvm_pkint(&NumberInSecondHalf,1,1);
    //pack secondHalf
    pvm_pkfloat(secondHalf,NumberInSecondHalf,1);
    //send message
    pvm_send(TaskID[0],1);
    
    //split the list if we're going to send more
    if (CurrentLevel+1 < NumberOfLevels){
      midpoint = NumberInFirstHalf /2;
      NumberInSecondHalf = NumberInFirstHalf - midpoint;
      NumberInFirstHalf = midpoint;
      for (i = midpoint; i < NumberInSecondHalf+midpoint; i++)
	secondHalf[i-midpoint] = firstHalf[i];
    }
  }
  //if this isn't then last level wait for all remaing chuncks to come in
  if (ThisCurrentLevel < NumberOfLevels)
    for (CurrentLevel = ThisCurrentLevel; CurrentLevel < NumberOfLevels; CurrentLevel++){
      quicksort(firstHalf,0,NumberInFirstHalf-1);
      pvm_recv(-1,-1);
      pvm_upkint(&NumberInSecondHalf,1,1);
      pvm_upkfloat(secondHalf,NumberInSecondHalf,1);
      quicksort(secondHalf,0,NumberInSecondHalf-1);
      merge(firstHalf,NumberInFirstHalf,secondHalf,NumberInSecondHalf,wholeList);
      //move values from whole list to First half
      NumberInFirstHalf = NumberInFirstHalf + NumberInSecondHalf;
      for (i = 0; i < NumberInFirstHalf;i++){
	firstHalf[i] = wholeList[i];
      }
    }
  else {
    //if this is the last level sort the remain yourself
    quicksort(firstHalf,0,NumberInFirstHalf-1);
    quicksort(secondHalf,0,NumberInSecondHalf-1);
    merge(firstHalf,NumberInFirstHalf,secondHalf,NumberInSecondHalf,wholeList);
    NumberInFirstHalf = NumberInFirstHalf + NumberInSecondHalf;
    for (i = 0; i < NumberInFirstHalf;i++){
      firstHalf[i] = wholeList[i];
    }
  }
  //send the sorted list back
  pvm_initsend(PvmDataDefault);
  pvm_pkint(&NumberInFirstHalf,1,1);
  pvm_pkfloat(firstHalf,NumberInFirstHalf,1);
  //to keep messages from running into each other
  sleep(2*ThisCurrentLevel);
  pvm_send(pvm_parent(),1);
  
  return 1;
}

//*********************************
//this opens a file and returns the number 
//of input values
//
//***********************************
int getFileLength(char * filename){
  FILE *infile = NULL;
  int count = 0;
  int result = 0;
  int bogus = 0;
  infile = fopen(filename,"r");
  while (1){
    result = fscanf(infile,"%f",&bogus);
    if (result == EOF)
      break;
    count++;
  }
  fclose(infile);
  return count;
}


/* Sample output:                                                 */
/* pvm@deltabell:/home/pvm/src>pvm_mergesort file1.dat 5          */
/* NumberOfValues: 1000 NumberOfLevels: 5                         */
/* CurrentLevel: 0 NumberInFirstHalf: 500 NumberInSecondHalf: 500 */
/* CurrentLevel: 1 NumberInFirstHalf: 250 NumberInSecondHalf: 250 */
/* CurrentLevel: 2 NumberInFirstHalf: 125 NumberInSecondHalf: 125 */
/* CurrentLevel: 3 NumberInFirstHalf: 62 NumberInSecondHalf: 63   */
/* CurrentLevel: 4 NumberInFirstHalf: 31 NumberInSecondHalf: 31   */
/* RECEIVED a CHUNCK Number Received = 31 Merge length = 62       */
/* RECEIVED a CHUNCK Number Received = 63 Merge length = 125      */
/* RECEIVED a CHUNCK Number Received = 125 Merge length = 250     */
/* RECEIVED a CHUNCK Number Received = 250 Merge length = 500     */
/* RECEIVED a CHUNCK Number Received = 500 Merge length = 1000    */
/* Even Median = 52.587124                                        */

