#define DBL(a) (a) /* Code to sort on a parallel machine. Written by Andrew Tridgell September 1992 in collaboration with Richard Brent This program is Copyright Andrew Tridgell 1992. This program may be used for non-commercial purposes. If you wish to use this program for commercial purposes or as part of a commercial product then please contact Andrew Tridgell. There is no warranty with this product. last updated November 1997 contact: Andrew Tridgell Dept. of Computer Science Australian National University GPO BOX 4, Canberra City, A.C.T, 0200, Australia Tel: (06) 254-8209 Internet: Andrew.Tridgell@anu.edu.au The main function in this file is: void parallel_sort(void *data,int N,int el_size,int (*comparison)()); which has similar syntax to the library function qsort(), with the proviso that comparison() cannot assume cell dependant pointers, so to sort strings they would have to be of a fixed length and strcmp() could be the comparison function. The function is called from all the cells simultaneously. TODO: Store in block form without re-arrangement */ #if 0 #define memcpy amemmove #define qsort gnu_qsort #endif #ifndef SIMPLE #define SIMPLE 0 #endif /* shall I use a hyper pre-sort ? */ #ifndef HYPER_SORT #define HYPER_SORT 1 #endif /* shall I use batchers sort ? */ #ifndef BATCHERS #define BATCHERS 0 #endif /* shall I use a bitonic sort ? */ #ifndef BITONIC #define BITONIC 0 #endif /* shall I try to use an unbalanced merge ? */ #ifndef UNBALANCED_MERGE #define UNBALANCED_MERGE 1 #endif /* shall I assume integer data only ? */ #ifndef INTEGER #define INTEGER 0 #endif /* shall I keeps stats on the sort ? */ #ifndef STATS #define STATS 0 #endif /* shall I acknowledge big messages ? */ #ifndef ACKNOWLEDGE_BIG_MSGS #define ACKNOWLEDGE_BIG_MSGS 0 #endif /* shall I acknowledge all messages ? */ #ifndef ACKNOWLEDGE_ALL_MSGS #define ACKNOWLEDGE_ALL_MSGS 0 #endif /* shall I split big messages ? */ #ifndef SPLIT_BIG_MSGS #define SPLIT_BIG_MSGS 0 #endif /* this file defines the following function */ void parallel_sort(void *data,int N,int el_size,int (*comparison)()); #include #include #include "mimd.h" /* a few basic defs */ typedef enum {False=0,True=1} BOOL; void *malloc(); #define QSORT_CAST int (*)() #ifndef MIN #define MIN(a,b) ((a)<(b)?(a):(b)) #endif #ifndef MAX #define MAX(a,b) ((a)>(b)?(a):(b)) #endif #define IS_ODD(i) (((unsigned)i) & 1) /* these macros provide all the element manipulation. Special purpose ones are used for integer sorts */ #if !INTEGER #define COMPARE(p1,p2) PSI.comparison(p1,p2) #define ELEMENT(ptr,i) ((char *)(ptr) + (i)*(EL_SIZE)) #define COPYEL(p1,p2) memcpy(p1,p2,EL_SIZE) #define COPYEL_INC(p1,p2) \ (COPYEL(p1,p2),p1=ELEMENT(p1,1),p2=ELEMENT(p2,1)) #define COPYELS(p1,p2,n) memcpy(p1,p2,(n)*EL_SIZE) #else int iii; #define COMPARE(p1,p2) (*((int *)(p1)) - *((int *)(p2))) #define ELEMENT(ptr,i) ((void *)((int *)(ptr) + (i))) #define COPYEL(p1,p2) (*((int *)p1) = *((int *)p2)) #define COPYEL_INC(p1,p2) \ (COPYEL(p1,p2),p1=ELEMENT(p1,1),p2=ELEMENT(p2,1)) #define COPYELS(p1,p2,n) memcpy(p1,p2,(n)*EL_SIZE) #endif #define PTR(a,b) ELEMENT(a,b) #define el_vector(n) malloc((n)*EL_SIZE) #define EL_SIZE PSI.el_size /* this struct contains info needed for a parallel sort, it must be shared between several functions which is why it is global - this means that parallel sort can not be used recursively! */ static struct { int (*comparison)(); int local_size; int el_size; int *finished; } parallel_sort_info; #define PSI parallel_sort_info #if STATS /* the following bits allow for easy routine timing */ enum timers { T_ALL, T_MERGE_EXCHANGE, T_MERGE, T_QSORT, T_HYPER, T_COMMS, T_FIND_EXACT, T_SWAP_MEM, T_INSIT1, T_INSIT2, T_INSIT3, T_SIMPLE, T_SMALL_MERGE, T_HYPER_SORT, T_CLEANUP, T_UNFINISHED, T_BATCHERS, T_BITONIC, T_NONE}; static char *timer_labels[T_NONE+1] = { "ALL", "MERGE_EXCHANGE", "MERGE", "QSORT", "HYPERBALANCE", "WAITING+COMMS", "FIND_EXACT", "SWAP_MEM", "T_INSIT1", "T_INSIT2", "REARRANGE", "SIMPLE", "SMALL_MERGE", "HYPER_SORT", "CLEANUP", "UNFINISHED", "BATCHERS_SORT", "BITONIC_SORT", NULL }; static struct { BOOL started; double time_started; double cum_time; } timers_struct[T_NONE]; static struct { int messages; int num_guarantee_loops; } stats; #endif #if (STATS && !HOST) #define ts(x) timers_struct[x] #define TS(x) (ts(x).started=True,ts(x).time_started=MIMD_Uptime()) #define TE(x) (ts(x).started==False?printf("Not started!\n"):(ts(x).started=False,ts(x).cum_time+=(MIMD_Uptime()-ts(x).time_started))) #else #define TS(x) #define TE(x) #endif static void fmemcpy(void *to,void *from,int size) { int N; char *c1=to,*c2=from; int dorotate=0,dofloat=0,dodouble=0,doshort=0; int block=0; /* trivial case */ if (c1 == c2) return; { int diff = (int)to - (int)from; if (diff<0) diff = -diff; switch (diff%sizeof(double)) { case 0: dodouble=1; dofloat=1; doshort=1; dorotate=0; block=8; break; case 1: case 3: case 5: case 7: dodouble=0; dofloat=0; doshort=0; dorotate=1; block=1; break; case 2: case 6: dodouble=0; dofloat=0; doshort=1; dorotate=0; block=2; break; case 4: dodouble=0; dofloat=1; doshort=1; dorotate=0; block=4; break; } } /* do the first few bytes */ N = block - ((int)c1 % block); if (N != block) { size -= N; while (N--) *c1++ = *c2++; } #define MEMTYPE double if (dodouble) { MEMTYPE *p1=(MEMTYPE *)c1,*p2=(MEMTYPE *)c2; MEMTYPE d0,d1,d2,d3,d4,d5,d6,d7; N = size/sizeof(MEMTYPE); while (N >= 8) { d0=p2[0];d1=p2[1];d2=p2[2];d3=p2[3]; d4=p2[4];d5=p2[5];d6=p2[6];d7=p2[7]; p1[0]=d0;p1[1]=d1;p1[2]=d2;p1[3]=d3; p1[4]=d4;p1[5]=d5;p1[6]=d6;p1[7]=d7; p1 += 8;p2 += 8;N -= 8; } c1 = (char *)p1; c2 = (char *)p2; size = size % (sizeof(MEMTYPE)*8); } #undef MEMTYPE #define MEMTYPE float if (dofloat) { MEMTYPE *p1=(MEMTYPE *)c1,*p2=(MEMTYPE *)c2; MEMTYPE d0,d1,d2,d3,d4,d5,d6,d7; N = size/sizeof(MEMTYPE); while (N >= 8) { d0=p2[0];d1=p2[1];d2=p2[2];d3=p2[3]; d4=p2[4];d5=p2[5];d6=p2[6];d7=p2[7]; p1[0]=d0;p1[1]=d1;p1[2]=d2;p1[3]=d3; p1[4]=d4;p1[5]=d5;p1[6]=d6;p1[7]=d7; p1 += 8;p2 += 8;N -= 8; } c1 = (char *)p1; c2 = (char *)p2; size = size % (sizeof(MEMTYPE)*8); } #undef MEMTYPE #define MEMTYPE short if (doshort) { MEMTYPE *p1=(MEMTYPE *)c1,*p2=(MEMTYPE *)c2; MEMTYPE d0,d1,d2,d3,d4,d5,d6,d7; N = size/sizeof(MEMTYPE); while (N >= 8) { d0=p2[0];d1=p2[1];d2=p2[2];d3=p2[3]; d4=p2[4];d5=p2[5];d6=p2[6];d7=p2[7]; p1[0]=d0;p1[1]=d1;p1[2]=d2;p1[3]=d3; p1[4]=d4;p1[5]=d5;p1[6]=d6;p1[7]=d7; p1 += 8;p2 += 8;N -= 8; } c1 = (char *)p1; c2 = (char *)p2; size = size % (sizeof(MEMTYPE)*8); } #undef MEMTYPE #define MEMTYPE unsigned if (dorotate) { N = size; /* align the source on a MEMTYPE block */ while (N && (int)c2%sizeof(MEMTYPE)!=0) { *c1++ = *c2++; N--; } /* now work in blocks of 8 MEMTYPEs */ { int rotation = ((int)c1%sizeof(MEMTYPE)); MEMTYPE d0,d1,d2,d3,d4,d5,d6,d7; MEMTYPE head,tail; MEMTYPE *p1=(MEMTYPE *)(c1-rotation),*p2=(MEMTYPE *)c2; int rot8 = rotation*8; int rev8 = (sizeof(MEMTYPE)-rotation)*8; while (N>=8*sizeof(MEMTYPE)) { /* load the 8 MEMTYPEs */ d0=p2[0];d1=p2[1];d2=p2[2];d3=p2[3]; d4=p2[4];d5=p2[5];d6=p2[6];d7=p2[7]; #define ROTATE(from,to) to=((to)>>rot8) | ((from)<>rev8; tail = p1[8]< 0) { #if SPLIT_BIG_MSGS int to_send = MIN(BIG_MSG_SIZE,size); #else int to_send = size; #endif int sent = MIMD_Send(to,ptr,to_send); ptr += to_send; size -= to_send; #if (ACKNOWLEDGE_BIG_MSGS && !ACKNOWLEDGE_ALL_MSGS) if (size>0) #endif #if ACKNOWLEDGE_MSGS { int ack; MIMD_Recv(to,&ack,sizeof(ack)); if (ack != 1) MM_Panic("message acknowledge failed!"); } #endif } TE(T_COMMS); } /******************************************************************* recv a big message ********************************************************************/ static void MM_Recv_Big(int from,char *ptr,int size) { TS(T_COMMS); while (size > 0) { #if SPLIT_BIG_MSGS int to_recv = MIN(BIG_MSG_SIZE,size); #else int to_recv = size; #endif int recvd = MIMD_Recv(from,ptr,to_recv); ptr += recvd; size -= recvd; #if (ACKNOWLEDGE_BIG_MSGS && !ACKNOWLEDGE_ALL_MSGS) if (size>0) #endif #if ACKNOWLEDGE_MSGS { int ack=1; MIMD_Send(from,&ack,sizeof(ack)); } #endif } TE(T_COMMS); } /******************************************************************* exchange data between 2 nodes ********************************************************************/ static void MM_exchange_data(int node,void *mydata,void *hisdata, int mysize,int hissize) { if (node > NODE_NUMBER) { #if !BLOCKING if (mysize>0) MM_Send_Big(node,mydata,mysize); if (hissize>0) MM_Recv_Big(node,hisdata,hissize); #else if (hissize>0) MM_Recv_Big(node,hisdata,hissize); if (mysize>0) MM_Send_Big(node,mydata,mysize); #endif } if (node < NODE_NUMBER) { if (mysize>0) MM_Send_Big(node,mydata,mysize); if (hissize>0) MM_Recv_Big(node,hisdata,hissize); } } #if !HOST /******************************************************************* a safe realloc ********************************************************************/ static void *safe_realloc(void *ptr,int size) { void *realloc(); if (size==0) { free(ptr); return(NULL); } if (ptr) ptr = realloc(ptr,size); else ptr = malloc(size); if (!ptr) MM_Panic("memory error in realloc\n"); return(ptr); } /******************************************************************* a sorting algorithm not designed to actually sort the data but to make it more sorted. It operates particularly well in parallel. It calls the merge exchange algorithm along the edges of a hypercube ********************************************************************/ static void hyper_sort(int base,void *data,int number,void (*makesmaller)()) { int i; if (number==1) return; for (i=0;i<(number/2);i++) makesmaller(data,base+i,base+i+((number+1)/2)); #if 1 hyper_sort(base+number/2,data,(number+1)/2,makesmaller); hyper_sort(base,data,number - (number+1)/2,makesmaller); #else hyper_sort(base+(number+1)/2,data,number/2,makesmaller); hyper_sort(base,data,number - number/2,makesmaller); #endif } /******************************************************************* the first stage of a cleanup from a hyper_sort ********************************************************************/ static void hyper_cleanup1(int base,void *data,int number,void (*makesmaller)()) { int i; TS(T_CLEANUP); for (i=1;i<(number-1);i+=2) makesmaller(data,base+i,base+i+1); TE(T_CLEANUP); } /******************************************************************* use batcher's method to sort some data. makesmaller should be like this: void makesmaller(void *data,int n1,int n2) when called it should ensure that element n1 is smaller then element n2 data will always just be the same as data See Knuth Vol 3 ********************************************************************/ static void batchers_sort(void *data,int N,void (*makesmaller)()) { int p, initq, q, r, d, x; for (p=1; (1<<(p+1))<=N+1; p++); p = 1<0; p/=2) { q = initq; r = 0; d = p; do { for (x=0; x>(i)) & 1) != 0) int d; int i,j; int offset; /* find cube dimension */ for (d=1; (1<<(d+1))<=N+1; d++); for (i=1;i<=d;i++) { for (j=(i-1);j>=0;j--) { offset = 1< PSI.local_size) { *pd = safe_realloc(*pd,new_size*EL_SIZE); MM_Recv_Big(other_node,((char *)(*pd))+PSI.local_size*EL_SIZE, (new_size-PSI.local_size)*EL_SIZE); } if (new_size < PSI.local_size) { MM_Send_Big(other_node,((char *)(*pd))+new_size*EL_SIZE, (PSI.local_size-new_size)*EL_SIZE); *pd = safe_realloc(*pd,new_size*EL_SIZE); } PSI.local_size = new_size; } /******************************************************************* balance the nodes using the hypercube balance ********************************************************************/ static void hypercube_balance(int base,int number,void **pd) { int i; if (number==1) return; for (i=0;i<(number/2);i++) pair_balance(pd,base+i,base+i+((number+1)/2)); hypercube_balance(base+number/2,(number+1)/2,pd); hypercube_balance(base,number - (number+1)/2,pd); } /******************************************************************** complete the balancing ********************************************************************/ static void complete_balance(void **pd) { int n; int total,target, max_size; MIMD_Sum_Int(PSI.local_size,&total); max_size = MIMD_Max_Int(PSI.local_size); if (total % NUM_NODES == 0) target = total / NUM_NODES; else target = max_size; if (IS_TOP_NODE) { int i; target = total - target*(NUM_NODES-1); for (i=BOTTOM_NODE;i0) { n = MIN(n,PSI.local_size); MIMD_Send(i,&n,sizeof(n)); MIMD_Send(i,PTR(*pd,PSI.local_size-n),n*EL_SIZE); PSI.local_size -= n; } if (n<0) { MIMD_Send(i,&n,sizeof(n)); *pd = safe_realloc(*pd,(PSI.local_size-n)*EL_SIZE); MIMD_Recv(i,PTR(*pd,PSI.local_size),-n*EL_SIZE); PSI.local_size -= n; } } } else { n = target - PSI.local_size; MIMD_Send(TOP_NODE,&n,sizeof(n)); if (n>0) { MIMD_Recv(TOP_NODE,&n,sizeof(n)); PSI.local_size += n; *pd = safe_realloc(*pd,PSI.local_size*EL_SIZE); MIMD_Recv(TOP_NODE,PTR(*pd,PSI.local_size-n),n*EL_SIZE); } if (n<0) { MIMD_Recv(TOP_NODE,&n,sizeof(n)); PSI.local_size += n; MIMD_Send(TOP_NODE,PTR(*pd,PSI.local_size),-n*EL_SIZE); *pd = safe_realloc(*pd,PSI.local_size*EL_SIZE); } } } /******************************************************************* swap memory areas between two cells ********************************************************************/ static void swap_memory(int node,void *ptr,int to_receive,int to_send) { TS(T_SWAP_MEM); #ifndef CM5 { int BufSize = 64*(1<<10); if (NODE_NUMBER < node) { char *Rptr = ((char *)ptr) + to_receive; char *Sptr = ((char *)ptr) + to_send; #if BLOCKING void *buf = malloc(BufSize); #endif int recv,send; while (to_receive>0 || to_send>0) { recv = MIN(to_receive,BufSize); send = MIN(to_send,BufSize); #if BLOCKING if (send) memcpy(buf,Sptr-send,send); if (recv) MIMD_Recv(node,Rptr-recv,recv); if (send) MIMD_Send(node,buf,send); #else #if 0 if (send) FSend(node,Sptr-send,send); if (recv) FRecv(node,Rptr-recv,recv); #else if (send) MIMD_Send(node,Sptr-send,send); if (recv) MIMD_Recv(node,Rptr-recv,recv); #endif #endif to_receive -= recv; to_send -= send; Rptr -= recv; Sptr -= send; } #if BLOCKING free(buf); #endif } else { char *Rptr = ((char *)ptr) + to_receive; char *Sptr = ((char *)ptr) + to_send; int recv,send; while (to_receive>0 || to_send>0) { recv = MIN(to_receive,BufSize); send = MIN(to_send,BufSize); #if 0 if (send) FSend(node,Sptr-send,send); if (recv) FRecv(node,Rptr-recv,recv); #else if (send) MIMD_Send(node,Sptr-send,send); if (recv) MIMD_Recv(node,Rptr-recv,recv); #endif to_receive -= recv; to_send -= send; Rptr -= recv; Sptr -= send; } } } #else #if 1 { int BufSize = 30*(1<<10); char *Cptr = (char *)ptr; while (to_receive>0 || to_send>0) { int recv = MIN(to_receive,BufSize); int send = MIN(to_send,BufSize); CMMD_send_and_receive(node,0,Cptr,recv,node,0,Cptr,send); to_receive -= recv; to_send -= send; Cptr += BufSize; } } #else CMMD_send_and_receive(node,1,ptr,to_receive,node,1,ptr,to_send); #endif #endif TE(T_SWAP_MEM); } /******************************************************************* simple merge code - must be fast no boundary checking is done and the number of elements taken from list1 is returned ********************************************************************/ static int SimpleMerge(void *dest,void *src1,void *src2,int n) { #if INTEGER #define DOONE(x) (i1= 8) { DOONE(d0); DOONE(d1); DOONE(d2); DOONE(d3); DOONE(d4); DOONE(d5); DOONE(d6); DOONE(d7); d[0] = d0;d[1] = d1;d[2] = d2;d[3] = d3; d[4] = d4;d[5] = d5;d[6] = d6;d[7] = d7; d+=8;N-=8; } TE(T_SIMPLE); while (N--) DOONE(*d++); return(ss1-(int *)src1); } #else { int N = n; int elsize = EL_SIZE; int ii; int (*cmp)() = PSI.comparison; TS(T_SIMPLE); if (elsize%sizeof(double)==0 && ((int)src1)%sizeof(double)==0 && ((int)src2)%sizeof(double)==0 && ((int)dest)%sizeof(double)==0) { double *ss1=src1,*ss2=src2,*d=dest; elsize /= sizeof(double); switch (elsize) { case 1: while (N--) *d++ = (cmp(ss1,ss2)<0?*ss1++:*ss2++); break; case 2: { while (N--) { if (cmp(ss1,ss2)<0) { d[0] = ss1[0];d[1] = ss1[1]; ss1+=2; } else { d[0] = ss2[0];d[1] = ss2[1]; ss2+=2; } d+=2; } } break; default: while (N--) if (cmp(ss1,ss2)<0) {ii=elsize;while (ii--) *d++ = *ss1++;} else {ii=elsize;while (ii--) *d++ = *ss2++;} break; } TE(T_SIMPLE); return(((char *)ss1 - (char *)src1)/EL_SIZE); } if (elsize==sizeof(float) && ((int)src1)%sizeof(float)==0 && ((int)src2)%sizeof(float)==0 && ((int)dest)%sizeof(float)==0) { float *ss1=src1,*ss2=src2,*d=dest; elsize /= sizeof(float); while (N--) *d++ = (cmp(ss1,ss2)<0?*ss1++:*ss2++); TE(T_SIMPLE); return(((char *)ss1 - (char *)src1)/EL_SIZE); } { char *ss1=src1,*ss2=src2,*d=dest; while (N--) { if (cmp(ss1,ss2)<0) { memcpy(d,ss1,elsize); d+=elsize; ss1+=elsize; } else { memcpy(d,ss2,elsize); d+=elsize; ss2+=elsize; } } TE(T_SIMPLE); return(((char *)ss1 - (char *)src1)/EL_SIZE); } } #endif } /******************************************************************* shift some elements - may overlap! ********************************************************************/ static void ShiftEls(void *dest,void *src,int N) { int i; int block = ((char *)dest - (char *)src)/EL_SIZE; if (dest == src) return; if (block < 0) block = -block; if ((char *)dest < (char *)src) { for (i=0;i0;i-=block) { todo = MIN(block,i+1); COPYELS(PTR(dest,i-todo),PTR(src,i-todo),todo); } } } /******************************************************************* unbalanced merge of two lists - best when L1 or L2 is small ********************************************************************/ static void UnbalancedMerge(void *data,int L1,int L2,int gap) { int *slots; int N = L1 + L2; int smallL = MIN(L1,L2); int bigL = MAX(L1,L2); void *smallP,*bigP; int start=0; int i; TS(T_SMALL_MERGE); slots = malloc(sizeof(int)*smallL); if (L1 < L2) { smallP = data; bigP = PTR(data,L1+gap); } else { bigP = data; smallP = PTR(data,L1+gap); } /* create the slots list - this says where each element in smallList will fit into the big list */ for (i=0;i=0) { int todo = bigL-slots[slot]-num_taken; ShiftEls(PTR(data,N-num_done-todo), PTR(bigP,bigL-num_taken-todo), todo); num_done += todo; num_taken += todo; COPYEL(PTR(data,N-num_done-1), PTR(TempBuf,smallL-(num_done-num_taken)-1)); num_done++; slot--; } ShiftEls(data,bigP,N-num_done); free(TempBuf); } free(slots); TE(T_SMALL_MERGE); } /******************************************************************* merge 2 lists into one with minimum memory overhead ********************************************************************/ static void insitu_merge(void *data,int L1,int L2,int gap) { int N = L1 + L2; int B; int bListLen1,bListLen2,bListLen3; int NumTemp=3; int TailSize; BOOL TailDone; typedef struct { int start,length; } bListStruct; bListStruct *bList1,*bList2,*bList3; void *TailBuf; void *TempBuf; double sqrt(double ); #define COPYP(p1,p2) COPYEL(p1,p2) #define COPYP_INC(p1,p2) COPYEL_INC(p1,p2) #define COPYPN(p1,p2,n) COPYELS(p1,p2,n) #define LESSTHANP(p1,p2) (COMPARE(p1,p2)<0) if (L2==0) return; if (L1==0 && gap>0) { ShiftEls(data,PTR(data,gap),L2); return; } B = MIN(N,(int)sqrt((double)N)*4); /* B = MIN(B,(128*1024/EL_SIZE)/4); */ #if UNBALANCED_MERGE if (L1 <= B || L2 <= B) { UnbalancedMerge(data,L1,L2,gap); return; } #endif TS(T_INSIT1); /* INITIALISATION: allocate memory */ TempBuf = el_vector(NumTemp*B); bList1 = (bListStruct *)malloc((L1/B + 2)*sizeof(bListStruct)); bList2 = (bListStruct *)malloc((L2/B + 2)*sizeof(bListStruct)); bList3 = (bListStruct *)malloc((L1/B + L2/B + 5)*sizeof(bListStruct)); TailSize = L1 % B; TailBuf = el_vector(TailSize); TailDone = (TailSize == 0); /* STAGE 1: Create the 2 source block lists and initialise the 3rd */ { int L,S,pos; bListStruct *list; list=bList1; L=L1 - TailSize; S=0; pos=0; while (L) { list[pos].start = S + pos * B; list[pos].length = MIN(B,L); L -= list[pos].length; pos++; } bListLen1=pos; /* copy the tail from list 1 */ COPYPN(TailBuf,PTR(data,bListLen1*B),TailSize); list=bList2; L=L2; S=L1+gap; pos=0; while (L) { list[pos].start = S + pos * B; list[pos].length = MIN(B,L); L -= list[pos].length; pos++; } bListLen2=pos; bListLen3=0; } TE(T_INSIT1); TS(T_INSIT2); /* STAGE 2: merge the two lists into list3 */ { void *pL1,*pL2,*pL3; int avail1,avail2; int space3; int sPos1,sPos2; int availPos3,dPos3; BOOL empty1,empty2; /* select the first blocks in list 1 and 2 */ if (bListLen1 > 0) { pL1 = PTR(data,bList1[0].start); avail1 = bList1[0].length; sPos1 = 0; empty1 = False; } else { pL1 = TailBuf; avail1 = TailSize; TailDone = True; sPos1 = 0; empty1 = False; } pL2 = PTR(data,bList2[0].start); avail2 = bList2[0].length; sPos2 = 0; empty2 = False; /* initially select the TempBuf */ bList3[0].start = -1; bList3[0].length = NumTemp*B; availPos3 = 1; pL3 = TempBuf; space3 = NumTemp*B; dPos3 = 0; while (!empty1 || !empty2) { int MinToDo = space3; if (!empty1) MinToDo = MIN(MinToDo,avail1); if (!empty2) MinToDo = MIN(MinToDo,avail2); space3 -= MinToDo; if (empty2 || empty1) { if (empty1) { COPYPN(pL3,pL2,MinToDo); pL2 = PTR(pL2,MinToDo); avail2 -= MinToDo; } else { COPYPN(pL3,pL1,MinToDo); pL1 = PTR(pL1,MinToDo); avail1 -= MinToDo; } pL3 = PTR(pL3,MinToDo); } else { int FrompL1 = SimpleMerge(pL3,pL1,pL2,MinToDo); avail1 -= FrompL1; avail2 -= (MinToDo-FrompL1); pL3 = PTR(pL3,MinToDo); pL1 = PTR(pL1,FrompL1); pL2 = PTR(pL2,MinToDo-FrompL1); } /* have I emptied list 1 ? */ if (avail1==0 && !empty1) { /* make the just emptied block available */ if (TailSize == 0 || !TailDone) { /* int k = dPos3+1; */ int k = availPos3; memcpy(&bList3[k+1],&bList3[k],(availPos3-k)*sizeof(bList3[k])); bList3[k].start = bList1[sPos1].start; bList3[k].length = bList1[sPos1].length; availPos3++; sPos1++; } /* are there no blocks left ? */ if (sPos1 == bListLen1) { if (!TailDone) { pL1 = TailBuf; avail1 = TailSize; TailDone = True; } else empty1 = True; } else { /* start using the next block */ pL1 = PTR(data,bList1[sPos1].start); avail1 = bList1[sPos1].length; } } /* have I emptied list 2 ? */ if (avail2==0 && !empty2) { /* int k = dPos3+1; */ int k = availPos3; memcpy(&bList3[k+1],&bList3[k],(availPos3 - k)*sizeof(bList3[k])); /* make the just emptied block available */ bList3[k].start = bList2[sPos2].start - (TailSize + gap); bList3[k].length = bList2[sPos2].length; if (sPos2 == (bListLen2-1)) bList3[k].length += TailSize; if (bList3[k].length > B) { bList3[availPos3+1].length = bList3[k].length - B; bList3[availPos3+1].start = bList3[k].start + B; bList3[k].length = B; availPos3++; } /* don't make odd sized blocks available */ if (bList3[availPos3].length == B) availPos3++; sPos2++; /* are there no blocks left ? */ if (sPos2 == bListLen2) empty2 = True; else { /* start using the next block */ pL2 = PTR(data,bList2[sPos2].start); avail2 = bList2[sPos2].length; } } /* have I filled list 3 ? */ if (space3 == 0) { /* start using the next block */ dPos3++; if (dPos3 == availPos3) printf("\n%d merge error - dPos3==availPos3! %d\n",NODE_NUMBER,dPos3); pL3 = PTR(data,bList3[dPos3].start); space3 = bList3[dPos3].length; } } /* we may not have filled list 3 completely */ bList3[dPos3].length -= space3; /* move the last elements to the last block in list 2 */ COPYPN(PTR(data,N-bList3[dPos3].length), PTR(pL3,-bList3[dPos3].length), bList3[dPos3].length); bList3[dPos3].start = N - bList3[dPos3].length; bListLen3 = dPos3; } TE(T_INSIT2); TS(T_INSIT3); /* STAGE 3: re-arrange to give final result */ { typedef struct { int where_hiding; int what_now; BOOL occupied; } block_struct; block_struct *blocks; int num_blocks = N / B; int i; BOOL all_done = False; blocks = (block_struct *)malloc(num_blocks * sizeof(block_struct)); for (i=0;i= 0) { COPYPN(PTR(data,i*B),PTR(data,from*B),B); blocks[from].occupied = False; } else COPYPN(PTR(data,i*B),PTR(TempBuf,-(from+1)*B),B); blocks[i].occupied = True; blocks[i].what_now = i; blocks[i].where_hiding = i; if (from >= 0) i = from; } } } for (i=0;i 0) min = limits[0]+1; else max = limits[0]; if (COMPARE(PTR(data,(N-1)-(limits[0]+limits[1])/2), PTR(TempEl,1)) > 0) min = MAX((limits[0]+limits[1])/2 + 1, min); else max = MIN((limits[0]+limits[1])/2, max); if (COMPARE(PTR(data,(N-1)-(limits[1]-1)), PTR(TempEl,2)) > 0) min = MAX(limits[1], min); else max = MIN(limits[1]-1, max); limits[0] = min; limits[1] = max; } free(TempEl); MIMD_Send(n2,limits,sizeof(limits)); return limits[0]; } /******************************************************************* merge_exchange between two cells, giving cell n1 all the small elements. Leave the number of elements in each cell the same ********************************************************************/ static void merge_exchange(void *data,int n1,int n2) { int other_node = (NODE_NUMBER==n1?n2:n1); int to_send; /* node numbers might not start at 0 */ n1 += BOTTOM_NODE; n2 += BOTTOM_NODE; /* only do work if n1==NODE_NUMBER || n2=NODE_NUMBER */ if (n1 != NODE_NUMBER && n2 != NODE_NUMBER) return; /* if one of the nodes is finished then don't do anything */ if (PSI.finished[n1] || PSI.finished[n2]) return; TS(T_MERGE_EXCHANGE); TS(T_FIND_EXACT); to_send = find_exact(data, PSI.local_size, n1, n2); TE(T_FIND_EXACT); if (NODE_NUMBER == n1) swap_memory(other_node,PTR(data,PSI.local_size-to_send), to_send*EL_SIZE,to_send*EL_SIZE); else swap_memory(other_node,data, to_send*EL_SIZE,to_send*EL_SIZE); if (NODE_NUMBER == n1) insitu_merge(data,PSI.local_size-to_send,to_send,0); else insitu_merge(data,to_send,PSI.local_size-to_send,0); TE(T_MERGE_EXCHANGE); } /******************************************************************* fill in the PSI.finished[] array to say which nodes are finished return the number of nodes still needing to be sorted ********************************************************************/ static int unfinished(void *data,int N,int el_size) { int ret, i; void *smallest, *largest, *cum_smallest, *cum_largest; TS(T_UNFINISHED); if (!IS_BOTTOM_NODE) { MIMD_Send(BOTTOM_NODE, PTR(data, 0), el_size); MIMD_Send(BOTTOM_NODE, PTR(data, N-1), el_size); MIMD_Broad_Recv(BOTTOM_NODE, PSI.finished, NUM_NODES*sizeof(int)); for (ret=i=0;i 0) { memcpy(PTR(cum_largest,i), PTR(largest,i), el_size); } else { memcpy(PTR(cum_largest,i), PTR(cum_largest,i-1), el_size); } } for (i=NUM_NODES-2;i>=0;i--) { if (COMPARE(PTR(smallest,i), PTR(cum_smallest, i+1)) < 0) { memcpy(PTR(cum_smallest,i), PTR(smallest,i), el_size); } else { memcpy(PTR(cum_smallest,i), PTR(cum_smallest,i+1), el_size); } } PSI.finished[0] = (COMPARE(PTR(largest,0), PTR(cum_smallest, 1)) <= 0); PSI.finished[NUM_NODES-1] = (COMPARE(PTR(smallest,NUM_NODES-1), PTR(cum_largest, NUM_NODES-2)) >= 0); for (i=1;i= 0)); } for (ret=i=0;i0 if el1 > el2 ********************************************************************/ void parallel_sort(void *data,int N,int el_size,int (*comparison)()) { PSI.comparison = comparison; PSI.local_size = N; PSI.el_size = el_size; PSI.finished = (int *)malloc(sizeof(int)*NUM_NODES); if (!PSI.finished) MM_Panic("out of memory"); memset(PSI.finished, 0, sizeof(int)*NUM_NODES); #if STATS stats.messages = 0; stats.num_guarantee_loops = 0; { int i; for (i=0;i 1) qsort(data,PSI.local_size,EL_SIZE,(QSORT_CAST)PSI.comparison); TE(T_QSORT); #if HYPER_SORT TS(T_HYPER_SORT); do { hyper_sort(BOTTOM_NODE,data,NUM_NODES,merge_exchange); hyper_cleanup1(BOTTOM_NODE,data,NUM_NODES,merge_exchange); } while (unfinished(data, PSI.local_size, EL_SIZE)); TE(T_HYPER_SORT); #endif #if BATCHERS TS(T_BATCHERS); batchers_sort(data,NUM_NODES,merge_exchange); TE(T_BATCHERS); #endif #if BITONIC TS(T_BITONIC); bitonic_sort(data,NUM_NODES,merge_exchange); TE(T_BITONIC); #endif free(PSI.finished); TE(T_ALL); } /******************************************************************* similar to parallel_sort but only perform the hypercube pre-sort ********************************************************************/ void hypercube_sort(void *data,int N,int el_size,int (*comparison)()) { TS(T_ALL); PSI.comparison = comparison; PSI.local_size = N; PSI.el_size = el_size; PSI.finished = (int *)malloc(sizeof(int)*NUM_NODES); if (!PSI.finished) MM_Panic("out of memory"); memset(PSI.finished, 0, sizeof(int)*NUM_NODES); #if STATS stats.messages = 0; stats.num_guarantee_loops = 0; { int i; for (i=0;i 1) qsort(data,PSI.local_size,EL_SIZE,(QSORT_CAST)PSI.comparison); TE(T_QSORT); #if HYPER_SORT TS(T_HYPER_SORT); hyper_sort(BOTTOM_NODE,data,NUM_NODES,merge_exchange); /* hyper_cleanup1(BOTTOM_NODE,data,NUM_NODES,merge_exchange); */ TE(T_HYPER_SORT); #endif free(PSI.finished); TE(T_ALL); } #endif #if STATS /******************************************************************* report the timing stats ********************************************************************/ void report_stats(int N) { int i; double times[T_NONE]; #if HOST MIMD_Recv(BOTTOM_NODE,times,sizeof(double)*T_NONE); for (i=0;i