/*--------------------------------------------------- * file: topicmpiomp.c * purpose: run parallel topic model * inputs: T (number of topics), ITER (number of iterations), P (number of procs), docword.txt * outputs: wp.txt, dp.txt * version: 1.0 * author: newman@uci.edu * date: Fri Apr 6 16:47:24 PDT 2007 * /latent/newman/preproc/code3/run1/again * NOTE: binary matrices are all (c-style) zero offset * poe topicmpiomp 16 -nodes 2 -tasks_per_node 8 -rmpool 1 -euilib us -euidevice sn_all * *-------------------------------------------------*/ #include "mpi.h" #define ROOTONLY if(mytask==0) #define Rprintf if(mytask==0)printf #include #include #include #include #include #include "topiclib.h" /*------------------------------------------ * global variables *------------------------------------------ */ static int T; // number of topics static int W; // number of unique words static int D; // number of docs static int N; // number of words in corpus static int P; // number of procs /*========================================== * main *========================================== */ int main(int argc, char* argv[]) { int cpus=1, Ppercpu, Pm1; int i, iter0, iter, ITER=200, dump=100; double alpha=0.1, beta=0.01; FILE *fin, *fout; char fname[32]; int *w, *d, *z, **wp, *ztot, **wp0, **sumwp, **dp; int p, t, Np, Dp, NNZ, n1=1, n2=2, chk, TT; int *dd, *ww, *tt, *cc; int ierr, numtasks, mytask; ierr = MPI_Init(&argc,&argv); if (ierr != MPI_SUCCESS) { printf ("Error starting MPI program. Terminating.\n"); MPI_Abort(MPI_COMM_WORLD, ierr); } MPI_Comm_size(MPI_COMM_WORLD,&numtasks); MPI_Comm_rank(MPI_COMM_WORLD,&mytask); printf("topicmpi: hi from task %d of %d\n", mytask, numtasks); cpus = numtasks; p = mytask; if (argc == 1) { fprintf(stderr, "usage: %s P iter0\n", argv[0]); exit(-1); } P = atoi(argv[1]); assert(P>0); assert(P%cpus==0); iter0 = atoi(argv[2]); assert(iter0>-1); Pm1 = P-1; Ppercpu = P/cpus; // expect Ppercpu to be P or 1, expect p0 to be 0 or mytask assert(P==cpus); assert(Ppercpu==1); /*==================================================*/ // read wp fin = fopen("wp.sbin","r"); assert(fin); assert(fread(&W, sizeof(int),1,fin)); assert(W>0); assert(fread(&T, sizeof(int),1,fin)); assert(T>0); assert(fread(&NNZ, sizeof(int),1,fin)); assert(NNZ>0); ww = ivec(NNZ); assert(ww); tt = ivec(NNZ); assert(tt); cc = ivec(NNZ); assert(cc); chk = fread(ww,sizeof(int),NNZ,fin); assert(chk==NNZ); chk = fread(tt,sizeof(int),NNZ,fin); assert(chk==NNZ); chk = fread(cc,sizeof(int),NNZ,fin); assert(chk==NNZ); fclose(fin); printf("... read wp.sbin\n"); wp = imat(W,T); assert(wp); for (i = 0; i < NNZ; i++) wp[ww[i]][tt[i]] = cc[i]; free(ww); free(tt); free(cc); ztot = ivec(T); assert(ztot); getztot(W,T,wp,ztot); /*==================================================*/ // read dw sprintf(fname,"dw.p%d.bin",p); fin = fopen(fname,"r"); assert(fin); chk = fread(&n2, sizeof(int),1,fin); assert(chk==1); assert(n2==2); chk = fread(&Np, sizeof(int),1,fin); assert(chk==1); assert(Np>0); w = ivec(Np); d = ivec(Np); chk = fread(d,sizeof(int),Np,fin); assert(chk==Np); chk = fread(w,sizeof(int),Np,fin); assert(chk==Np); fclose(fin); printf("... read %s\n", fname); // read z sprintf(fname,"z.p%d.bin",p); fin = fopen(fname,"r"); assert(fin); chk = fread(&n1, sizeof(int),1,fin); assert(chk==1); assert(n1==1); chk = fread(&Np, sizeof(int),1,fin); assert(chk==1); assert(Np>0); z = ivec(Np); chk = fread(z,sizeof(int),Np,fin); assert(chk==Np); fclose(fin); printf("... read %s\n", fname); // read dp sprintf(fname,"dp.p%d.sbin",p); fin = fopen(fname,"r"); assert(fin); chk = fread(&Dp, sizeof(int),1,fin); assert(chk==1); assert(Dp>0); chk = fread(&TT, sizeof(int),1,fin); assert(chk==1); assert(TT=T); chk = fread(&NNZ, sizeof(int),1,fin); assert(chk==1); assert(NNZ>0); dd = ivec(NNZ); assert(dd); tt = ivec(NNZ); assert(tt); cc = ivec(NNZ); assert(cc); chk = fread(dd,sizeof(int),NNZ,fin); assert(chk==NNZ); chk = fread(tt,sizeof(int),NNZ,fin); assert(chk==NNZ); chk = fread(cc,sizeof(int),NNZ,fin); assert(chk==NNZ); fclose(fin); printf("... read %s\n", fname); dp = imat(Dp,T); assert(dp); for (i = 0; i < NNZ; i++) dp[dd[i]][tt[i]] = cc[i]; free(dd); free(tt); free(cc); printf(" Np = %d\n", Np); printf(" Dp = %d\n", Dp); MPI_Allreduce(&Dp,&D,1,MPI_INTEGER,MPI_SUM,MPI_COMM_WORLD); MPI_Allreduce(&Np,&N,1,MPI_INTEGER,MPI_SUM,MPI_COMM_WORLD); ROOTONLY { printf("P = %d\n", P); printf("Ppercpu = %d\n", Ppercpu); printf("D = %d\n", D); printf("W = %d\n", W); printf("N = %d\n", N); printf("T = %d\n", T); printf("iter0 = %d\n", iter0); printf("ITER = %d\n", ITER); printf("alpha = %f\n", alpha); printf("beta = %f\n", beta); } // set wp0=wp for first time wp0 = imat(W,T); sumwp = imat(W,T); memcpy(wp0[0],wp[0],W*T*sizeof(int)); /*===============================================================================*/ for (iter = iter0; iter <= ITER; iter++) { Rprintf("\niter %d\n", iter); sample_chain0(Np,W,T,alpha,beta,w,d,z,wp,dp,ztot); //chksum(W,T,wp,ztot); ierr = MPI_Allreduce(wp[0],sumwp[0],W*T,MPI_INTEGER,MPI_SUM,MPI_COMM_WORLD); if (ierr != MPI_SUCCESS) { printf ("ERROR MPI_Allreduce\n"); MPI_Abort(MPI_COMM_WORLD, ierr); } for (i = 0; i < W; i++) for (t = 0; t < T; t++) wp[i][t] = sumwp[i][t] - Pm1*wp0[i][t]; memcpy(wp0[0],wp[0],W*T*sizeof(int)); memset(ztot, 0, T*sizeof(int)); for (i = 0; i < W; i++) for (t = 0; t < T; t++) ztot[t] += wp[i][t]; // dump results if (iter>iter0 && iter%dump == 0) { sprintf(fname,"wp.iter%d.bout",iter); ROOTONLY write_sparsebin(W,T,wp,fname); Rprintf("... wrote %s\n", fname); sprintf(fname,"z.p%d.iter%d.bout",p,iter); fout = fopen(fname,"w"); assert(fout); chk = fwrite(&n1,sizeof(int),1,fout); assert(chk==1); chk = fwrite(&Np,sizeof(int),1,fout); assert(chk==1); chk = fwrite(z,sizeof(int),Np,fout); assert(chk==Np); fclose(fout); printf("... wrote %s\n", fname); sprintf(fname,"dp.p%d.iter%d.bout",p,iter); write_sparsebin(Dp,T,dp,fname); printf("... wrote %s\n", fname); } } // over iter /*===============================================================================*/ MPI_Finalize(); return 0; }