/*--------------------------------------------------- * file: adldaMPI.c * purpose: run simulated parallel topic model in MPI * inputs: T (number of topics), iter (number of iterations), P (number of procs), seed, docword file * outputs: wp text file * version: 1.1 * author: asuncion@uci.edu * date: 4/30/08 *-------------------------------------------------*/ #include #include #include #include #include #include void sample_chain0 (int ntot, int W, int T, double alpha, double beta, int *w, int *d, int *z, int **wp, int **dp, int *ztot); void printStats (int ntot, int W, int T, double alpha, double beta, int *w, int *d, int *z, int **wp, int **dp, int *ztot); void chksum (int n, int T, int **x, int *sumx); int **imat(int nr, int nc); int *ivec(int n); double *dvec(int n); int countntot(char *fname); void randomassignment(int ntot, int T, int *w, int *d, int *z, int **wp, int **dp, int *ztot); void read_dw(char *fname, int *d, int *w, int *D, int *W); void write_wp(char *fname, int **wp, int W, int T); /*------------------------------------------ * global variables *------------------------------------------ */ static int T; // number of topics static int W; // number of unique words static int D; // number of docs static int ntot; // number of words in corpus /*========================================== * main *========================================== */ int main(int argc, char* argv[]) { int rank,size,length,Pm1; char name[BUFSIZ]; int i=0, j=0, k=0, h=0, proc, iter=0, seed, dump; int *w, **wSplit, *d, **dSplit, *z, **zSplit, **wp, **sumwp, **wp0, **dwp, **dp, ***dpSplit, *ztot, *ztot0; int Dp, nstart, nend, dstart, dend; int *localntot; // keeps track of the local proc's number of words double alpha=0.1, beta=0.01; int P; // # of processors char *fname; char outputFile[32]; if (argc == 1) { fprintf(stderr, "usage: %s T iter P seed docword\n", argv[0]); exit(-1); } T = atoi(argv[1]); iter = atoi(argv[2]); P = atoi(argv[3]); seed = atoi(argv[4]); fname = (char*)argv[5]; assert(T>0); assert(iter>0); assert(alpha>0); assert(beta>0); assert(seed>0); ntot = countntot(fname); w = ivec(ntot); d = ivec(ntot); z = ivec(ntot); read_dw(fname, d, w, &D, &W); wp = imat(W,T); sumwp = imat(W,T); dp = imat(D,T); srand48(seed); printf("file = "); printf(fname); printf("\n"); printf("mem = %.1f (GByte)\n", 1e-9*(6*ntot + T*(2*D+3*W) + 2*T)*sizeof(int)); printf("seed = %d\n", seed); printf("P = %d\n", P); printf("ntot = %d\n", ntot); printf("W = %d\n", W); printf("D = %d\n", D); printf("T = %d\n", T); printf("iter = %d\n", iter); printf("alpha = %f\n", alpha); printf("beta = %f\n", beta); /* MPI INITIALIZATION */ MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Get_processor_name(name, &length); printf("%s: hello world from process %d of %d\n", name, rank, size); ztot = ivec(T); randomassignment(ntot,T,w,d,z,wp,dp,ztot); chksum(W,T,wp,ztot); chksum(D,T,dp,ztot); // Split over P procs Dp = D/P; nstart = 0; wSplit = (int**)calloc(P,sizeof(int*)); dSplit = (int**)calloc(P,sizeof(int*)); zSplit = (int**)calloc(P,sizeof(int*)); dpSplit = (int***)calloc(P,sizeof(int**)); localntot = (int*)calloc(P,sizeof(int)); for (proc = 0; proc < P; proc++) { dstart = proc*Dp; dend = (proc+1)*Dp - 1; // Compute nend nend = 0; for (i = 0; i <= dend; i++) { for (j = 0; j < T; j++) { nend += dp[i][j]; } } // Subtract one since C's arrays are zero-based nend = nend -1; // We are creating proc's local w, which will contain // the interval w(nstart:nend) wSplit[proc] = ivec(nend - nstart + 1); for (i = nstart; i <= nend; i++) { wSplit[proc][i - nstart] = w[i]; } // Now we make proc's local d dSplit[proc] = ivec(nend - nstart + 1); for (i = nstart; i <= nend; i++) { dSplit[proc][i - nstart] = d[i] - dstart; } // Now we make proc's local z zSplit[proc] = ivec(nend - nstart + 1); for (i = nstart; i <= nend; i++) { zSplit[proc][i - nstart] = z[i]; } // Now we make proc's local dp matrix dpSplit[proc] = imat(dend - dstart + 1, T); for (i = dstart; i <= dend; i++) { for (j = 0; j < T; j++) { dpSplit[proc][i-dstart][j] = dp[i][j]; } } // set local ntot size of proc localntot[proc] = nend - nstart + 1; nstart = nend + 1; } wp0 = imat(W,T); dwp = imat(W,T); ztot0 = ivec(T); if (wp0 == NULL || dwp == NULL || ztot0 == NULL) { printf("out of memory"); abort(); } Pm1 = P - 1; // ***** Now we start to iterate for (i = 1; i <= iter; i++) { if (rank == 0) printf("iter %d\n",i); // update wp0, dwp, and ztot0 all at once for (j = 0; j < T; j++) { ztot0[j] = 0; for (k = 0; k < W; k++) { // Copy wp to wp0 wp0[k][j] = wp[k][j]; // set dwp to be completely 0 dwp[k][j] = 0; ztot0[j] += wp0[k][j]; } ztot[j] = ztot0[j]; } chksum(W,T,wp,ztot0); memset(sumwp[0], 0, W*T*sizeof(int)); j = rank; sample_chain0(localntot[j],W,T,alpha,beta,wSplit[j],dSplit[j],zSplit[j],wp,dpSplit[j],ztot); /* MPI CALL: */ MPI_Allreduce(wp[0],sumwp[0],W*T,MPI_INTEGER,MPI_SUM,MPI_COMM_WORLD); for (j = 0; j < W; j++) for (k = 0; k < T; k++) wp[j][k] = sumwp[j][k] - Pm1*wp0[j][k]; // Update wp } // End of iterations // We need to write out wp //sprintf(outputFile, "test.txt"); //write_wp(outputFile, wp, W, T); /* MPI FINALIZE: */ MPI_Finalize(); return 0; } // This function just prints out w,d,z,wp,and dp for debugging purposes void printStats(int localntot, int W, int T, double alpha, double beta, int *w, int *d, int *z, int **wp, int **dp, int *ztot) { int i,j; printf("\nw: "); for (i =0; i < localntot; i++) { printf("%d ",w[i]); } printf("\nd: "); for (i =0; i < localntot; i++) { printf("%d ",d[i]); } printf("\nz: "); for (i =0; i < localntot; i++) { printf("%d ",z[i]); } printf("\nwp:\n"); for (i = 0; i < W;i++) { for (j=0;jcurrprob) { t++; currprob += probs[t]; } z[i] = t; wp[w[i]][t]++; dp[d[i]][t]++; ztot[t]++; } free(probs); } void chksum (int n, int T, int **x, int *sumx) // { int i, t, sum; for (t = 0; t < T; t++) { sum = 0; for (i = 0; i < n; i++) sum += x[i][t]; assert(sum==sumx[t]); } } int **imat(int nr, int nc) // { int ntot = nr*nc; int *tmp = (int*) calloc(ntot,sizeof(int)); int **x = (int**)calloc(nr,sizeof(int*)); int r; assert(tmp); assert(x); for (r = 0; r < nr; r++) x[r] = tmp + nc*r; return x; } int *ivec(int n) // { int *x = (int*)calloc(n,sizeof(int)); assert(x); return x; } double *dvec(int n) // { double *x = (double*)calloc(n,sizeof(double)); assert(x); return x; } int countntot(char *fname) // { int i, count, ntot = 0; char buf[BUFSIZ]; FILE *fp = fopen(fname ,"r"); assert(fp); for (i = 0; i < 3; i++) fgets(buf, BUFSIZ, fp); // skip 3 header lines while (fscanf(fp, "%*d%*d%d", &count) != EOF) ntot += count; fclose(fp); assert(ntot>0); return ntot; } void randomassignment(int ntot, int T, int *w, int *d, int *z, int **wp, int **dp, int *ztot) // { int i, t; for (i = 0; i < ntot; i++) { t = (int)(T*drand48()); z[i] = t; wp[w[i]][t]++; dp[d[i]][t]++; ztot[t]++; } } void read_dw(char *fname, int *d, int *w, int *D, int *W) // { int i,wt,dt,ct,count,nnz; FILE *fp = fopen(fname ,"r"); assert(fp); count = 0; fscanf(fp,"%d", D); assert(*D>0); fscanf(fp,"%d", W); assert(*W>0); fscanf(fp,"%d", &nnz); assert(nnz>0); while (fscanf(fp, "%d%d%d", &dt, &wt, &ct) != EOF) { for (i = count; i < count+ct; i++) { w[i] = wt-1; d[i] = dt-1; } count += ct; } fclose(fp); } void write_wp(char *fname, int **wp, int W, int T) { int i, j, total; FILE *fp = fopen(fname,"w"); assert(fp); total = 0; for (i = 0; i < W; i++) { for (j = 0; j < T; j++) { if (wp[i][j] > 0) { total += 1; // # of distinct pairs } } } fprintf(fp,"%d\n",W); fprintf(fp,"%d\n",T); fprintf(fp,"%d\n",total); for (i = 0; i < W; i++) { for (j = 0; j < T; j++) { if (wp[i][j] > 0) { fprintf(fp,"%d %d %d\n",i+1,j+1,wp[i][j]); } } } fclose(fp); }