/*--------------------------------------------------- * file: topicmpiomp.c * purpose: run parallel topic model * inputs: T (number of topics), ITER (number of iterations), P (number of procs), docword.txt * outputs: wp.txt, dp.txt * version: 1.0 * author: newman@uci.edu * date: Fri Apr 6 16:47:24 PDT 2007 * /latent/newman/preproc/code3/run1/again * NOTE: binary matrices are all (c-style) zero offset * poe topicmpiomp 16 -nodes 2 -tasks_per_node 8 -rmpool 1 -euilib us -euidevice sn_all * *-------------------------------------------------*/ #include "mpi.h" #define ROOTONLY if(mytask==0) #define Rprintf if(mytask==0)printf #include #include #include #include #include #include "topiclib.h" void updatesort(int n, int *x, int *indx, int *revindx, int ip, int im); /*------------------------------------------ * global variables *------------------------------------------ */ static int T; // number of topics static int W; // number of unique words static int D; // number of docs static int N; // number of words in corpus static int P; // number of procs /*========================================== * main *========================================== */ int main(int argc, char* argv[]) { int cpus=1, Ppercpu, Pm1; int i, iter0, iter, ITER=200, dump=100; double alpha=0.01, beta=0.01; FILE *fin, *fout; char fname[32]; int *w, *d, *z, **wp, *ztot, **wp0, **sumwp, **dp; int p, t, Np, Dp, NNZ, n1=1, n2=2, chk, TT; int *dd, *ww, *kk, *cc; int ierr, numtasks, mytask; int tt, tt_, ttt=-1, ttt_chk, t_, Tchk; int *indx, *d_last_z, *w_last_z, *ztot_last, ztotmin=-1, d_old, w_old, *indx_z, *revindx_z, **indx_dp, **revindx_dp; double Wbeta, *probs, *Ad, *Bw, Z, Zp=-1, Zp_old, currprob, *a, *b, aa=-1, bb=-1, U, sumprobs, sumprobs_old, fac=-1,totprob,maxprob; int *ldacounts, *fastcounts; int word, doc; int *doc_array, *word_array; ierr = MPI_Init(&argc,&argv); if (ierr != MPI_SUCCESS) { printf ("Error starting MPI program. Terminating.\n"); MPI_Abort(MPI_COMM_WORLD, ierr); } MPI_Comm_size(MPI_COMM_WORLD,&numtasks); MPI_Comm_rank(MPI_COMM_WORLD,&mytask); printf("topicmpi: hi from task %d of %d\n", mytask, numtasks); cpus = numtasks; p = mytask; if (argc == 1) { fprintf(stderr, "usage: %s P iter0\n", argv[0]); exit(-1); } P = atoi(argv[1]); assert(P>0); assert(P%cpus==0); iter0 = atoi(argv[2]); assert(iter0>-1); Pm1 = P-1; Ppercpu = P/cpus; // expect Ppercpu to be P or 1, expect p0 to be 0 or mytask assert(P==cpus); assert(Ppercpu==1); /*==================================================*/ // read wp fin = fopen("wp.sbin","r"); assert(fin); assert(fread(&W, sizeof(int),1,fin)); assert(W>0); assert(fread(&T, sizeof(int),1,fin)); assert(T>0); assert(fread(&NNZ, sizeof(int),1,fin)); assert(NNZ>0); ww = ivec(NNZ); assert(ww); kk = ivec(NNZ); assert(kk); cc = ivec(NNZ); assert(cc); chk = fread(ww,sizeof(int),NNZ,fin); assert(chk==NNZ); chk = fread(kk,sizeof(int),NNZ,fin); assert(chk==NNZ); chk = fread(cc,sizeof(int),NNZ,fin); assert(chk==NNZ); fclose(fin); printf("... read wp.sbin\n"); wp = imat(W,T); assert(wp); for (i = 0; i < NNZ; i++) wp[ww[i]][kk[i]] = cc[i]; free(ww); free(kk); free(cc); ztot = ivec(T); assert(ztot); probs= dvec(T); assert(probs); getztot(W,T,wp,ztot); /*==================================================*/ // read dw sprintf(fname,"dw.p%d.bin",p); fin = fopen(fname,"r"); assert(fin); chk = fread(&n2, sizeof(int),1,fin); assert(chk==1); assert(n2==2); chk = fread(&Np, sizeof(int),1,fin); assert(chk==1); assert(Np>0); w = ivec(Np); d = ivec(Np); chk = fread(d,sizeof(int),Np,fin); assert(chk==Np); chk = fread(w,sizeof(int),Np,fin); assert(chk==Np); fclose(fin); printf("... read %s\n", fname); // read z sprintf(fname,"z.p%d.bin",p); fin = fopen(fname,"r"); assert(fin); chk = fread(&n1, sizeof(int),1,fin); assert(chk==1); assert(n1==1); chk = fread(&Np, sizeof(int),1,fin); assert(chk==1); assert(Np>0); z = ivec(Np); chk = fread(z,sizeof(int),Np,fin); assert(chk==Np); fclose(fin); printf("... read %s\n", fname); // read dp sprintf(fname,"dp.p%d.sbin",p); fin = fopen(fname,"r"); assert(fin); chk = fread(&Dp, sizeof(int),1,fin); assert(chk==1); assert(Dp>0); chk = fread(&TT, sizeof(int),1,fin); assert(chk==1); assert(TT=T); chk = fread(&NNZ, sizeof(int),1,fin); assert(chk==1); assert(NNZ>0); dd = ivec(NNZ); assert(dd); kk = ivec(NNZ); assert(kk); cc = ivec(NNZ); assert(cc); chk = fread(dd,sizeof(int),NNZ,fin); assert(chk==NNZ); chk = fread(kk,sizeof(int),NNZ,fin); assert(chk==NNZ); chk = fread(cc,sizeof(int),NNZ,fin); assert(chk==NNZ); fclose(fin); printf("... read %s\n", fname); dp = imat(Dp,T); assert(dp); for (i = 0; i < NNZ; i++) dp[dd[i]][kk[i]] = cc[i]; free(dd); free(kk); free(cc); printf(" Np = %d\n", Np); printf(" Dp = %d\n", Dp); Wbeta = W*beta; Ad = dvec(T); Bw = dvec(T); a = dvec(Dp); b = dvec(W); indx_z = ivec(T); revindx_z = ivec(T); indx_dp = imat(Dp,T); revindx_dp = imat(Dp,T); d_last_z = ivec(Dp); w_last_z = ivec(W); ztot_last = ivec(W); MPI_Allreduce(&Dp,&D,1,MPI_INTEGER,MPI_SUM,MPI_COMM_WORLD); MPI_Allreduce(&Np,&N,1,MPI_INTEGER,MPI_SUM,MPI_COMM_WORLD); ROOTONLY { printf("P = %d\n", P); printf("Ppercpu = %d\n", Ppercpu); printf("D = %d\n", D); printf("W = %d\n", W); printf("N = %d\n", N); printf("T = %d\n", T); printf("iter0 = %d\n", iter0); printf("ITER = %d\n", ITER); printf("alpha = %f\n", alpha); printf("beta = %f\n", beta); } // set wp0=wp for first time wp0 = imat(W,T); sumwp = imat(W,T); memcpy(wp0[0],wp[0],W*T*sizeof(int)); /*===============================================================================*/ for (iter = 0; iter <= ITER; iter++) { Rprintf("\niter %d\n", iter); //sample_chain0(Np,W,T,alpha,beta,w,d,z,wp,dp,ztot); /************************************************************************/ /************************************************************************/ /****** ITER 0 ITER 0 ITER 0 ITER 0 ITER 0 ************/ /************************************************************************/ /************************************************************************/ if (iter==0) { for (i = 0; i < Np; i++) { //t=z[i]; ztot[t]--; wp[w[i]][t]--; dp[d[i]][t]--; word = w[i]; doc = d[i]; doc_array = dp[doc]; word_array = wp[word]; tt = z[i]; word_array[tt]--; doc_array[tt]--; ztot[tt]--; if (i==0) { isort(T, ztot, -1, indx_z); isort(T, indx_z, 1, revindx_z); } else { if (tt != ttt) updatesort(T,ztot,indx_z,revindx_z,ttt,tt); } ztotmin = ztot[indx_z[T-1]]; fac = 1. / (ztotmin + Wbeta); // setup indx_dp if (i==Np-1 || d[i+1]!=d[i]) { // just do last doc isort(T, doc_array, -1, indx_dp[doc]); isort(T, indx_dp[doc], 1, revindx_dp[doc]); } // setup norms aa and bb for (t = 0; t < T; t++) { Ad[t] = ( doc_array[t] + alpha); Bw[t] = (word_array[t] + beta) / (ztotmin + Wbeta); } a[doc] = ddot(T,Ad,Ad); b[word] = ddot(T,Bw,Bw); aa = a[doc]; bb = b[word] / (fac*fac); for (t = 0, totprob = 0.0; t < T; t++) { probs[t] = (dp[d[i]][t]+alpha)*(wp[w[i]][t]+beta)/(ztot[t]+Wbeta); totprob += probs[t]; } maxprob = drand48()*totprob; currprob = probs[0]; t = 0; while (maxprob>currprob) { t++; currprob += probs[t]; } d_last_z[doc] = ttt; w_last_z[word] = ttt; ztot_last[word] = ztotmin; z[i] = ttt; word_array[ttt]++; doc_array[ttt]++; ztot[ttt]++; //z[i]=t; wp[w[i]][t]++; dp[d[i]][t]++; ztot[t]++; } } /************************************************************************/ /************************************************************************/ /****** ITER > 0 ITER > 0 ITER > 0 ITER > 0 ***************/ /************************************************************************/ /************************************************************************/ if (iter>0) { for (i = 0; i < Np; i++) { //t=z[i]; ztot[t]--; wp[w[i]][t]--; dp[d[i]][t]--; word = w[i]; doc = d[i]; doc_array = dp[doc]; word_array = wp[word]; tt = z[i]; word_array[tt]--; doc_array[tt]--; ztot[tt]--; d_old = d_last_z[doc]; w_old = w_last_z[word]; if (ttt != tt) { updatesort(T,ztot,indx_z,revindx_z,ttt,tt); ztotmin = ztot[indx_z[T-1]]; fac = 1. / (ztotmin + Wbeta); } if (d_old != tt) { a[doc] -= SQR(alpha + doc_array[d_old]-1); a[doc] -= SQR(alpha + doc_array[tt ]+1); a[doc] += SQR(alpha + doc_array[d_old] ); a[doc] += SQR(alpha + doc_array[tt ] ); updatesort(T,doc_array,indx_dp[doc],revindx_dp[doc],d_old,tt); } aa = a[doc]; b[word] *= SQR(ztot_last[word] + Wbeta); if (w_old != tt) { b[word] -= SQR(beta + word_array[w_old]-1); b[word] -= SQR(beta + word_array[tt ]+1); b[word] += SQR(beta + word_array[w_old] ); b[word] += SQR(beta + word_array[tt ] ); } bb = b[word]; b[word] *= (fac*fac); U = drand48(); // draw indx = indx_dp[doc]; // point indx to current sort order for (t_ = 0; t_ < T; t_++) { // for each possible topic (sorted order) t=indx[t_]; // t_ refers to sort #, t "actual" topic # if (t_!=0) probs[t_]=probs[t_-1]; else probs[t_]=0; // init cumulative probability mass f'n probs[t_] += (doc_array[t] + alpha)*(word_array[t] + beta)/(ztot[t] + Wbeta); // increment aa -= SQR(alpha + doc_array[t]); // update current norm-based bounds bb -= SQR(beta + word_array[t]); if (aa<0) aa=0; if (bb<0) bb=0; // avoid sqrt(-0) = nan bug Zp_old=Zp; Zp = probs[t_] + sqrt(aa*bb) * fac; // save "old" and "new" Z-bounds if (probs[t_] < U*Zp) continue; // if we haven't found it yet, loop back else { // otherwise if ( (t_==0) || (U*Zp > probs[t_-1]) ) { // if it's this topic, save & quit ttt = indx[t_]; break; } else { // if it's a previous topic U = (U*Zp_old-probs[t_-1])*Zp/(Zp_old-Zp); // compute the shifted & rescaled threshold for (tt_=0; tt_= U) { ttt = indx[tt_]; break; // (break out of tt_ for loop) } } break; // (done finding which prev topic) } } } } d_last_z[doc] = ttt; w_last_z[word] = ttt; ztot_last[word] = ztotmin; z[i] = ttt; word_array[ttt]++; doc_array[ttt]++; ztot[ttt]++; //z[i]=t; wp[w[i]][t]++; dp[d[i]][t]++; ztot[t]++; } ierr = MPI_Allreduce(wp[0],sumwp[0],W*T,MPI_INTEGER,MPI_SUM,MPI_COMM_WORLD); if (ierr != MPI_SUCCESS) { printf ("ERROR MPI_Allreduce\n"); MPI_Abort(MPI_COMM_WORLD, ierr); } for (i = 0; i < W; i++) for (t = 0; t < T; t++) wp[i][t] = sumwp[i][t] - Pm1*wp0[i][t]; memcpy(wp0[0],wp[0],W*T*sizeof(int)); memset(ztot, 0, T*sizeof(int)); for (i = 0; i < W; i++) for (t = 0; t < T; t++) ztot[t] += wp[i][t]; // dump results if (iter>iter0 && iter%dump == 0) { sprintf(fname,"wp.iter%d.bout",iter); ROOTONLY write_sparsebin(W,T,wp,fname); Rprintf("... wrote %s\n", fname); sprintf(fname,"z.p%d.iter%d.bout",p,iter); fout = fopen(fname,"w"); assert(fout); chk = fwrite(&n1,sizeof(int),1,fout); assert(chk==1); chk = fwrite(&Np,sizeof(int),1,fout); assert(chk==1); chk = fwrite(z,sizeof(int),Np,fout); assert(chk==Np); fclose(fout); printf("... wrote %s\n", fname); sprintf(fname,"dp.p%d.iter%d.bout",p,iter); write_sparsebin(Dp,T,dp,fname); printf("... wrote %s\n", fname); } ROOTONLY { sprintf(fname,"time.iter%d.txt",iter); fout = fopen(fname,"w"); assert(fout); fprintf(fout,"time.iter%d.txt\n",iter); fclose(fout); } } // over iter /*===============================================================================*/ MPI_Finalize(); return 0; } void updatesort(int n, int *x, int *indx, int *revindx, int ip, int im) // { int tmp; // x: array // indx: index array // revindx: reverse index array // ip: element to be incremented p=plus // im: element to be decremented m=minus // following has already been performed // x[ip]++; // x[im]--; // INCREMENT // did ++ get bigger than prev? ip = revindx[ip]; while (ip>0 && x[indx[ip]] > x[indx[ip-1]]) { // swap indx tmp = indx[ip]; indx[ip] = indx[ip-1]; indx[ip-1] = tmp; // swap revindx tmp = revindx[indx[ip]]; revindx[indx[ip]] = revindx[indx[ip-1]]; revindx[indx[ip-1]] = tmp; ip--; } // DECREMENT // did -- get smaller than next? im = revindx[im]; while (im