/*--------------------------------------------------- * file: topicmpiomp.c * purpose: run parallel topic model * inputs: T (number of topics), ITER (number of iterations), P (number of procs), docword.txt * outputs: wp.txt, dp.txt * version: 1.0 * author: newman@uci.edu * date: Fri Apr 6 16:47:24 PDT 2007 * /latent/newman/preproc/code3/run1/again * NOTE: binary matrices are all (c-style) zero offset * poe topicmpiomp 16 -nodes 2 -tasks_per_node 8 -rmpool 1 -euilib us -euidevice sn_all * gcc -o topicomp -O3 -Wall topicomp.c mpcc_r -c -q64 -qtune=pwr4 -qarch=pwr4 -qcpluscmt -O5 -qsmp=omp topicomp.c mpcc_r -c -q64 -qtune=pwr4 -qarch=pwr4 -qcpluscmt -O5 -qsmp=omp topiclib.c mpcc_r -o topicomp -q64 -qtune=pwr4 -qarch=pwr4 -qcpluscmt -O5 -qsmp=omp topicomp.o topiclib.o export OMP_NUM_THREADS=1 export OMP_NUM_THREADS=2 export OMP_NUM_THREADS=8 DONE::: djn: aug 24: delete negcount and if in inner loop? djn: aug 24: in sectors could memcpy(wpwi,wp[w[i]],T*sizeof(int)) to avoid false sharing djn: aug 24: compile with static schedule? *-------------------------------------------------*/ #include "mpi.h" #define ROOTONLY if(mytask==0) #define Rprintf if(mytask==0)printf #include #include #include #include #include #include "topiclib.h" /*------------------------------------------ * global variables *------------------------------------------ */ static int T; // number of topics static int W; // number of unique words static int D; // number of docs static int N; // number of words in corpus static int P; // number of procs //#define _T_ 250 //#define _T_ 500 //#define _T_ 1000 #define _T_ 2000 //#define _ALPHA_ 0.04 #define _ALPHA_ 0.001 #define _BETA_ 0.004 #define _WBETA_ 564.172 /*========================================== * main *========================================== */ int main(int argc, char* argv[]) { int Ppertask; int i, iter0, iter, ITER=10000, dump=40; double alpha=_ALPHA_, beta=_BETA_; FILE *fin, *fout; char fname[32]; int *w, *d, *z, **wp, *ztot, **sumwp, **dp; int p, t, Np, Dp, NNZ, n1=1, n2=2, chk, TT; int *dd, *ww, *tt, *cc; int ierr, numtasks, mytask; int numthreads=-1, mythread=-1, maxthreads=-1; ierr = MPI_Init(&argc,&argv); if (ierr != MPI_SUCCESS) { printf ("Error starting MPI program. Terminating.\n"); MPI_Abort(MPI_COMM_WORLD, ierr); } MPI_Comm_size(MPI_COMM_WORLD,&numtasks); MPI_Comm_rank(MPI_COMM_WORLD,&mytask); printf("topicmpi: hi from task %d of %d\n", mytask, numtasks); p = mytask; if (argc == 1) { fprintf(stderr, "usage: %s iter0\n", argv[0]); exit(-1); } //P = atoi(argv[1]); assert(P>0); assert(P%numtasks==0); iter0 = atoi(argv[1]); assert(iter0>-1); // hardcode P P = numtasks; assert(P==numtasks); Ppertask = P/numtasks; assert(Ppertask==1); numthreads = omp_get_num_threads(); maxthreads = omp_get_max_threads(); mythread = omp_get_thread_num(); printf("hi from thread %d of %d\n", mythread, numthreads); printf("max headroom is %d\n", maxthreads); /*==================================================*/ // read wp fin = fopen("wp.sbin","r"); assert(fin); assert(fread(&W, sizeof(int),1,fin)); assert(W>0); assert(fread(&T, sizeof(int),1,fin)); assert(T>0); assert(T==_T_); assert(fread(&NNZ, sizeof(int),1,fin)); assert(NNZ>0); ww = ivec(NNZ); assert(ww); tt = ivec(NNZ); assert(tt); cc = ivec(NNZ); assert(cc); chk = fread(ww,sizeof(int),NNZ,fin); assert(chk==NNZ); chk = fread(tt,sizeof(int),NNZ,fin); assert(chk==NNZ); chk = fread(cc,sizeof(int),NNZ,fin); assert(chk==NNZ); fclose(fin); printf("... read wp.sbin\n"); wp = imat(W,T); assert(wp); sumwp = imat(W,T); assert(wp); for (i = 0; i < NNZ; i++) wp[ww[i]][tt[i]] = cc[i]; free(ww); free(tt); free(cc); ztot = ivec(T); assert(ztot); getztot(W,T,wp,ztot); /*==================================================*/ // read dw sprintf(fname,"dw.p%d.bin",p); fin = fopen(fname,"r"); assert(fin); chk = fread(&n2, sizeof(int),1,fin); assert(chk==1); assert(n2==2); chk = fread(&Np, sizeof(int),1,fin); assert(chk==1); assert(Np>0); w = ivec(Np); d = ivec(Np); chk = fread(d,sizeof(int),Np,fin); assert(chk==Np); chk = fread(w,sizeof(int),Np,fin); assert(chk==Np); fclose(fin); printf("... read %s\n", fname); // read z sprintf(fname,"z.p%d.bin",p); fin = fopen(fname,"r"); assert(fin); chk = fread(&n1, sizeof(int),1,fin); assert(chk==1); assert(n1==1); chk = fread(&Np, sizeof(int),1,fin); assert(chk==1); assert(Np>0); z = ivec(Np); chk = fread(z,sizeof(int),Np,fin); assert(chk==Np); fclose(fin); printf("... read %s\n", fname); // read dp sprintf(fname,"dp.p%d.sbin",p); fin = fopen(fname,"r"); assert(fin); chk = fread(&Dp, sizeof(int),1,fin); assert(chk==1); assert(Dp>0); chk = fread(&TT, sizeof(int),1,fin); assert(chk==1); assert(TT=T); chk = fread(&NNZ, sizeof(int),1,fin); assert(chk==1); assert(NNZ>0); dd = ivec(NNZ); assert(dd); tt = ivec(NNZ); assert(tt); cc = ivec(NNZ); assert(cc); chk = fread(dd,sizeof(int),NNZ,fin); assert(chk==NNZ); chk = fread(tt,sizeof(int),NNZ,fin); assert(chk==NNZ); chk = fread(cc,sizeof(int),NNZ,fin); assert(chk==NNZ); fclose(fin); printf("... read %s\n", fname); dp = imat(Dp,T); assert(dp); for (i = 0; i < NNZ; i++) dp[dd[i]][tt[i]] = cc[i]; free(dd); free(tt); free(cc); printf(" Np = %d\n", Np); printf(" Dp = %d\n", Dp); MPI_Allreduce(&Dp,&D,1,MPI_INTEGER,MPI_SUM,MPI_COMM_WORLD); MPI_Allreduce(&Np,&N,1,MPI_INTEGER,MPI_SUM,MPI_COMM_WORLD); ROOTONLY { printf("P = %d\n", P); printf("Ppertask = %d\n", Ppertask); printf("D = %d\n", D); printf("W = %d\n", W); printf("N = %d\n", N); printf("T = %d\n", T); printf("iter0 = %d\n", iter0); printf("ITER = %d\n", ITER); printf("alpha = %f\n", alpha); printf("beta = %f\n", beta); } /*===============================================================================*/ for (iter = iter0; iter <= ITER; iter++) { Rprintf("\niter %d\n", iter); //if NO omp, then just use following line //sample_chain0(Np,W,T,alpha,beta,w,d,z,wp,dp,ztot); //chksum(W,T,wp,ztot); #pragma omp parallel shared(d,w,z,dp,wp,ztot) { #pragma omp sections { /*============== CUT HERE ============ CUT HERE =============================*/ #pragma omp section /*---------------------------------------------------------*/ /* sector 0 */ /*---------------------------------------------------------*/ { double probs0[_T_],totprob0,maxprob0,currprob0; int ztot0[_T_],i0,t0; printf("sector 0: thread=%d\n", omp_get_thread_num()); memcpy(ztot0,ztot,_T_*sizeof(int)); for (i0 = 0*Np/8; i0 < 1*Np/8; i0++) { t0 = z[i0]; ztot0[t0]--; wp[w[i0]][t0]--; dp[d[i0]][t0]--; for (t0 = 0, totprob0 = 0.0; t0 < _T_; t0++) { probs0[t0] = (dp[d[i0]][t0] + _ALPHA_) * (wp[w[i0]][t0] + _BETA_) / (ztot0[t0] + _WBETA_); totprob0 += probs0[t0]; } maxprob0 = drand48()*totprob0; currprob0 = probs0[0]; t0 = 0; while (maxprob0>currprob0) { t0++; currprob0 += probs0[t0]; } z[i0] = t0; wp[w[i0]][t0]++; dp[d[i0]][t0]++; ztot0[t0]++; } } #pragma omp section /*---------------------------------------------------------*/ /* sector 1 */ /*---------------------------------------------------------*/ { double probs1[_T_],totprob1,maxprob1,currprob1; int ztot1[_T_],i1,t1; printf("sector 1: thread=%d\n", omp_get_thread_num()); memcpy(ztot1,ztot,_T_*sizeof(int)); for (i1 = 1*Np/8; i1 < 2*Np/8; i1++) { t1 = z[i1]; ztot1[t1]--; wp[w[i1]][t1]--; dp[d[i1]][t1]--; for (t1 = 0, totprob1 = 0.0; t1 < _T_; t1++) { probs1[t1] = (dp[d[i1]][t1] + _ALPHA_) * (wp[w[i1]][t1] + _BETA_) / (ztot1[t1] + _WBETA_); totprob1 += probs1[t1]; } maxprob1 = drand48()*totprob1; currprob1 = probs1[0]; t1 = 0; while (maxprob1>currprob1) { t1++; currprob1 += probs1[t1]; } z[i1] = t1; wp[w[i1]][t1]++; dp[d[i1]][t1]++; ztot1[t1]++; } } #pragma omp section /*---------------------------------------------------------*/ /* sector 2 */ /*---------------------------------------------------------*/ { double probs2[_T_],totprob2,maxprob2,currprob2; int ztot2[_T_],i2,t2; printf("sector 2: thread=%d\n", omp_get_thread_num()); memcpy(ztot2,ztot,_T_*sizeof(int)); for (i2 = 2*Np/8; i2 < 3*Np/8; i2++) { t2 = z[i2]; ztot2[t2]--; wp[w[i2]][t2]--; dp[d[i2]][t2]--; for (t2 = 0, totprob2 = 0.0; t2 < _T_; t2++) { probs2[t2] = (dp[d[i2]][t2] + _ALPHA_) * (wp[w[i2]][t2] + _BETA_) / (ztot2[t2] + _WBETA_); totprob2 += probs2[t2]; } maxprob2 = drand48()*totprob2; currprob2 = probs2[0]; t2 = 0; while (maxprob2>currprob2) { t2++; currprob2 += probs2[t2]; } z[i2] = t2; wp[w[i2]][t2]++; dp[d[i2]][t2]++; ztot2[t2]++; } } #pragma omp section /*---------------------------------------------------------*/ /* sector 3 */ /*---------------------------------------------------------*/ { double probs3[_T_],totprob3,maxprob3,currprob3; int ztot3[_T_],i3,t3; printf("sector 3: thread=%d\n", omp_get_thread_num()); memcpy(ztot3,ztot,_T_*sizeof(int)); for (i3 = 3*Np/8; i3 < 4*Np/8; i3++) { t3 = z[i3]; ztot3[t3]--; wp[w[i3]][t3]--; dp[d[i3]][t3]--; for (t3 = 0, totprob3 = 0.0; t3 < _T_; t3++) { probs3[t3] = (dp[d[i3]][t3] + _ALPHA_) * (wp[w[i3]][t3] + _BETA_) / (ztot3[t3] + _WBETA_); totprob3 += probs3[t3]; } maxprob3 = drand48()*totprob3; currprob3 = probs3[0]; t3 = 0; while (maxprob3>currprob3) { t3++; currprob3 += probs3[t3]; } z[i3] = t3; wp[w[i3]][t3]++; dp[d[i3]][t3]++; ztot3[t3]++; } } #pragma omp section /*---------------------------------------------------------*/ /* sector 4 */ /*---------------------------------------------------------*/ { double probs4[_T_],totprob4,maxprob4,currprob4; int ztot4[_T_],i4,t4; printf("sector 4: thread=%d\n", omp_get_thread_num()); memcpy(ztot4,ztot,_T_*sizeof(int)); for (i4 = 4*Np/8; i4 < 5*Np/8; i4++) { t4 = z[i4]; ztot4[t4]--; wp[w[i4]][t4]--; dp[d[i4]][t4]--; for (t4 = 0, totprob4 = 0.0; t4 < _T_; t4++) { probs4[t4] = (dp[d[i4]][t4] + _ALPHA_) * (wp[w[i4]][t4] + _BETA_) / (ztot4[t4] + _WBETA_); totprob4 += probs4[t4]; } maxprob4 = drand48()*totprob4; currprob4 = probs4[0]; t4 = 0; while (maxprob4>currprob4) { t4++; currprob4 += probs4[t4]; } z[i4] = t4; wp[w[i4]][t4]++; dp[d[i4]][t4]++; ztot4[t4]++; } } #pragma omp section /*---------------------------------------------------------*/ /* sector 5 */ /*---------------------------------------------------------*/ { double probs5[_T_],totprob5,maxprob5,currprob5; int ztot5[_T_],i5,t5; printf("sector 5: thread=%d\n", omp_get_thread_num()); memcpy(ztot5,ztot,_T_*sizeof(int)); for (i5 = 5*Np/8; i5 < 6*Np/8; i5++) { t5 = z[i5]; ztot5[t5]--; wp[w[i5]][t5]--; dp[d[i5]][t5]--; for (t5 = 0, totprob5 = 0.0; t5 < _T_; t5++) { probs5[t5] = (dp[d[i5]][t5] + _ALPHA_) * (wp[w[i5]][t5] + _BETA_) / (ztot5[t5] + _WBETA_); totprob5 += probs5[t5]; } maxprob5 = drand48()*totprob5; currprob5 = probs5[0]; t5 = 0; while (maxprob5>currprob5) { t5++; currprob5 += probs5[t5]; } z[i5] = t5; wp[w[i5]][t5]++; dp[d[i5]][t5]++; ztot5[t5]++; } } #pragma omp section /*---------------------------------------------------------*/ /* sector 6 */ /*---------------------------------------------------------*/ { double probs6[_T_],totprob6,maxprob6,currprob6; int ztot6[_T_],i6,t6; printf("sector 6: thread=%d\n", omp_get_thread_num()); memcpy(ztot6,ztot,_T_*sizeof(int)); for (i6 = 6*Np/8; i6 < 7*Np/8; i6++) { t6 = z[i6]; ztot6[t6]--; wp[w[i6]][t6]--; dp[d[i6]][t6]--; for (t6 = 0, totprob6 = 0.0; t6 < _T_; t6++) { probs6[t6] = (dp[d[i6]][t6] + _ALPHA_) * (wp[w[i6]][t6] + _BETA_) / (ztot6[t6] + _WBETA_); totprob6 += probs6[t6]; } maxprob6 = drand48()*totprob6; currprob6 = probs6[0]; t6 = 0; while (maxprob6>currprob6) { t6++; currprob6 += probs6[t6]; } z[i6] = t6; wp[w[i6]][t6]++; dp[d[i6]][t6]++; ztot6[t6]++; } } #pragma omp section /*---------------------------------------------------------*/ /* sector 7 */ /*---------------------------------------------------------*/ { double probs7[_T_],totprob7,maxprob7,currprob7; int ztot7[_T_],i7,t7; printf("sector 7: thread=%d\n", omp_get_thread_num()); memcpy(ztot7,ztot,_T_*sizeof(int)); for (i7 = 7*Np/8; i7 < 8*Np/8; i7++) { t7 = z[i7]; ztot7[t7]--; wp[w[i7]][t7]--; dp[d[i7]][t7]--; for (t7 = 0, totprob7 = 0.0; t7 < _T_; t7++) { probs7[t7] = (dp[d[i7]][t7] + _ALPHA_) * (wp[w[i7]][t7] + _BETA_) / (ztot7[t7] + _WBETA_); totprob7 += probs7[t7]; } maxprob7 = drand48()*totprob7; currprob7 = probs7[0]; t7 = 0; while (maxprob7>currprob7) { t7++; currprob7 += probs7[t7]; } z[i7] = t7; wp[w[i7]][t7]++; dp[d[i7]][t7]++; ztot7[t7]++; } } /*============== CUT HERE ============ CUT HERE =============================*/ } // over sections } // over parallel memset(sumwp[0],0, W*T*sizeof(int)); memset( dp[0],0,Dp*T*sizeof(int)); for (i = 0; i < Np; i++) { t = z[i]; sumwp[w[i]][t]++; dp[d[i]][t]++; } ierr = MPI_Allreduce(sumwp[0],wp[0],W*T,MPI_INTEGER,MPI_SUM,MPI_COMM_WORLD); if (ierr != MPI_SUCCESS) { printf ("ERROR MPI_Allreduce\n"); MPI_Abort(MPI_COMM_WORLD, ierr); } memset(ztot,0,T*sizeof(int)); for (i = 0; i < W; i++) for (t = 0; t < T; t++) ztot[t] += wp[i][t]; // dump results if (iter>iter0 && iter%dump == 0) { sprintf(fname,"wp.iter%d.bout",iter); ROOTONLY write_sparsebin(W,T,wp,fname); Rprintf("... wrote %s\n", fname); sprintf(fname,"z.p%d.iter%d.bout",p,iter); fout = fopen(fname,"w"); assert(fout); chk = fwrite(&n1,sizeof(int),1,fout); assert(chk==1); chk = fwrite(&Np,sizeof(int),1,fout); assert(chk==1); chk = fwrite(z,sizeof(int),Np,fout); assert(chk==Np); fclose(fout); printf("... wrote %s\n", fname); sprintf(fname,"dp.p%d.iter%d.bout",p,iter); write_sparsebin(Dp,T,dp,fname); printf("... wrote %s\n", fname); } } // over iter /*===============================================================================*/ MPI_Finalize(); return 0; }