/* include standard library so we can print stuff out */
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
/* include pvm3 library so we can use pvm */
#include "pvm3.h"

int isParent(char * filename,int NumberOfTasks);
int isChild(void);
int getLength(char * filename);

/* program enterence */
main(int argc,char *argv[]){
  if (pvm_parent() < 0){
    if (argc  == 2){
      isParent(argv[1],-1);
      exit(0);
    }
    else if (argc  == 3){
      isParent(argv[1],atoi(argv[2]));
      exit(0);
    }
    else exit(-1);
  }
  else 
    isChild();
  exit(0);
}

/* This is the parent function */
int isParent(char * filename, int NumberOfTasks){
  FILE *infile = NULL;
  int NumberOfValues = 0;
  int NumberOfMachines = 0;
  int bogus1;
  struct pvmhostinfo *bogus2;
  int TaskIDs[64];
  int NumberOfChildren =0;
  int ValuesPerChild = 0;
  int CurrentMachine = 0;
  int CurrentValue = 0;
  int LengthOfValuesGroup = 0;
  int value = 0;
  float LocalSum = 0.0;
  float GlobalSum = 0.0;
  float GlobalMean = 0.0;
  float Varience = 0.0;
  float StdDev = 0.0;

  //get the number of entries in the data file
  NumberOfValues = getLength(filename);
  printf("%s has %d numbers in it.\n",filename,NumberOfValues);
  //find out the size of the virtual machine
  pvm_config(&NumberOfMachines,&bogus1,&bogus2);
  if (!(NumberOfTasks <= 0))
    NumberOfMachines = NumberOfTasks;
  printf("The cluster consists of %d machine(s)\n",NumberOfMachines);

  if (NumberOfValues < NumberOfMachines){
    NumberOfChildren = pvm_spawn("stddev_pvm",(char**)0, 0, "",NumberOfValues ,TaskIDs );
    if (NumberOfChildren != NumberOfValues)
      exit(-1);
  }
  else {
    NumberOfChildren = pvm_spawn("stddev_pvm",(char**)0, 0, "",NumberOfMachines ,TaskIDs );
    if (NumberOfChildren != NumberOfMachines)
      exit(-1);
  }
  printf("Spawned %d task(s)\n",NumberOfChildren);

  ValuesPerChild = NumberOfValues / NumberOfChildren;
  if ((NumberOfValues % NumberOfChildren ) != 0)
     ValuesPerChild++;
  printf("Will send %d values to each child\n",ValuesPerChild);

  CurrentValue = 0;
  infile = fopen(filename,"r");
  for (CurrentMachine = 0; CurrentMachine < NumberOfChildren; CurrentMachine++){
    if (CurrentMachine*ValuesPerChild+ValuesPerChild <= NumberOfValues){
      LengthOfValuesGroup = ValuesPerChild;
    }
    else {
      LengthOfValuesGroup = NumberOfValues-(CurrentMachine*ValuesPerChild);
    }
    if (LengthOfValuesGroup <= 0){
      printf("Chopping off %d child(ren) to reduce to ",NumberOfChildren - CurrentMachine);
      NumberOfChildren = CurrentMachine;
      printf("%d Children\n",NumberOfChildren);
      break;
    }
    pvm_initsend(PvmDataDefault);
    pvm_pkint(&LengthOfValuesGroup,1,1);
    printf("Packing %d values for child %d\n",LengthOfValuesGroup,CurrentMachine);
    for(CurrentValue; CurrentValue < ValuesPerChild*CurrentMachine+ValuesPerChild ; CurrentValue++){
      if (CurrentValue >= NumberOfValues){
	printf("Breaking at CurrentValue:%d ",CurrentValue);
	break;
      }
      else {
	fscanf(infile,"%d",&value);
	printf("%d[%d] ",CurrentValue,value);
	pvm_pkint(&value,1,1);
      }
    }
    pvm_send(TaskIDs[CurrentMachine],bogus1);
    printf("sent\n");
  }

  printf("Waiting for children to return localsums\n");
  for (CurrentMachine = 0; CurrentMachine < NumberOfChildren; CurrentMachine++){
    pvm_recv(-1,-1);
    printf(".");
    pvm_upkfloat(&LocalSum,1,1);
    GlobalSum = GlobalSum + LocalSum;
  }
  GlobalMean = GlobalSum / NumberOfValues;
  printf("\n Global mean = %f\n",GlobalMean);
  for (CurrentMachine = 0; CurrentMachine < NumberOfChildren; CurrentMachine++){
    pvm_initsend(PvmDataDefault);
    pvm_pkfloat(&GlobalMean,1,1);
    pvm_send(TaskIDs[CurrentMachine],bogus1);
  }
  GlobalSum = 0;
  printf("Waiting for children to return local dif sums\n");
  for (CurrentMachine = 0; CurrentMachine < NumberOfChildren; CurrentMachine++){
    pvm_recv(-1,-1);
    printf(".");
    pvm_upkfloat(&LocalSum,1,1);
    GlobalSum = GlobalSum + LocalSum;
  }
  Varience = GlobalSum / NumberOfValues;
  printf("\n Varience = %f\n",Varience);
  (double)StdDev = sqrt((double)Varience);
  printf("The standard Deviation is %2.2f \n",(float)StdDev);
  return(0);
}
//**********************************
// this is the child function     
//
//
//*********************************

int isChild(){
  int NumberOfValues = 0;
  int CurrentValue = 0;
  int Values[255];
  float LocalSum = 0;
  int bogus3 = 0;
  float GlobalMean = 0;
  float LocalDifSum = 0;
  pvm_recv(-1,-1);
  pvm_upkint(&NumberOfValues,1,1);
  for (CurrentValue = 0 ; CurrentValue < NumberOfValues;CurrentValue++){
    pvm_upkint(&Values[CurrentValue],1,1);
  }
  // sum them up
  for (CurrentValue = 0 ; CurrentValue < NumberOfValues;CurrentValue++){
    LocalSum = LocalSum + Values[CurrentValue];
  }
  pvm_initsend(PvmDataDefault);
  pvm_pkfloat(&LocalSum,1,1);
  pvm_send(pvm_parent(),bogus3);
  //*****************************************
  printf("waiting for global mean\n");
  pvm_recv(-1,-1);
  printf("Got it\n");
  pvm_upkfloat(&GlobalMean,1,1);
  for (CurrentValue = 0 ; CurrentValue < NumberOfValues;CurrentValue++){
    LocalDifSum = LocalDifSum + ((Values[CurrentValue]-GlobalMean)*(Values[CurrentValue]-GlobalMean));
  }
  //******************************************
  pvm_initsend(PvmDataDefault);
  pvm_pkfloat(&LocalDifSum,1,1);
  sleep(10); //this is an issue when you only have one machine
  pvm_send(pvm_parent(),bogus3);
  printf("child\n");
}

//*********************************
//this opens a file and returns the number 
//of input values
//
//***********************************
int getLength(char * filename){
  FILE *infile = NULL;
  int count = 0;
  int result = 0;
  int bogus = 0;
  infile = fopen(filename,"r");
  while (1){
    result = fscanf(infile,"%d",&bogus);
    if (result == EOF)
      break;
    count++;
  }
  fclose(infile);
  return count;
}


//***********************************************************************
//Sample of Output
/*
  pvm@deltabell:/home/pvm/src>./stddev_pvm test.dat 3
  test.dat has 21 numbers in it.
  The cluster consists of 3 machine(s)
  Spawned 3 task(s)
  Will send 7 values to each child
  Packing 7 values for child 0
  0[213] 1[435] 2[23] 3[56] 4[324] 5[567] 6[234] sent
  Packing 7 values for child 1
  7[567] 8[324] 9[3425] 10[365] 11[1324] 12[9] 13[324] sent
  Packing 7 values for child 2
  14[589] 15[3456] 16[2345] 17[960] 18[2345] 19[7980] 20[345] sent
  Waiting for children to return localsums
  ...
  Global mean = 1248.095215
  Waiting for children to return local dif sums
  ...
  Varience = 3341199.250000
  The standard Deviation is 1827.89 */

//**********************************************************************8
// test.dat file
/*
213
435
23
56
324
567
234
567
324
3425
365
1324
9
324
589
3456
2345
960
2345
7980
345
 */

