|
DataMuseum.dkPresents historical artifacts from the history of: DKUUG/EUUG Conference tapes |
This is an automatic "excavation" of a thematic subset of
See our Wiki for more about DKUUG/EUUG Conference tapes Excavated with: AutoArchaeologist - Free & Open Source Software. |
top - metrics - downloadIndex: T m
Length: 8440 (0x20f8) Types: TextFile Names: »makework.c«
└─⟦db229ac7e⟧ Bits:30007240 EUUGD20: SSBA 1.2 / AFW Benchmarks └─⟦this⟧ »EUUGD20/AFUU-ssba1.21/ssba1.21E/musbus/makework.c« └─⟦this⟧ »EUUGD20/AFUU-ssba1.21/ssba1.21F/musbus/makework.c«
/* * makework -- emulate a series of terminal users * * makework nusers * * job streams are specified on standard input with lines of the form * home_dir cmd_path_name [ options ] [ <stdin_file ] [ >stdout_file ] * * Input is send to all nuser instances of the commands in the * job streams at a rate not in excess of "rate" characters per second * per command * * environment variables $rate and $tty control typing rate (characters * per second) and destination of echoed output. * * $Header: makework.c,v 5.2 87/12/09 15:06:17 kenj Exp $ */ #include "makework.h" #define DEF_RATE 5 #define GRANULE 5 #define CHUNK 60 float thres; float est_rate = DEF_RATE; int firstuser; /* ordinal identification of first user for this * process */ int exit_status = 0; int sigpipe; /* pipe write error flag */ int nstream; static stream *cp; main(argc, argv) int argc; char *argv[]; { int i; int l; int fcopy = 0; /* fd for copy output */ int master = 1; /* the REAL master, == 0 for clones */ int cmseq = 0; /* clone/master seq number */ int done; /* count of children finished */ int output; /* aggregate output char count for all children */ int c; int nch; /* # characters to write */ int written; /* # characters actully written */ char logname[20]; /* name of the log file(s) */ int nusers; int onalarm(); int pipeerr(); int wrapup(); int grunt(); int pvec[2]; /* for pipes */ char *p; char *prog; /* my name */ char *getenv(); FILE *uf; /* user ordinal number file */ double atof(); freopen("Tmp/masterlog.00", "w", stderr); fprintf(stderr, "*** New Run *** "); prog = argv[0]; if ((p = getenv("rate")) != (char *)0) { est_rate= (double) atof(p); if (est_rate <= 0) { fprintf(stderr, "%s: bad rate, reset to %.2f chars/sec\n", prog, DEF_RATE); est_rate = DEF_RATE; } } if ((p = getenv("tty")) != (char *)0) { fcopy = open(p, 1); if (fcopy < 0) fcopy = creat(p, 0600); if (fcopy < 0) { fprintf(stderr, "%s: cannot open copy file '%s'\n", prog, p); fflush(stderr); exit(2); } lseek(fcopy, 0L, 2); /* append at end of file */ } if (argc < 2) { fprintf(stderr, "%s: missing nusers\n", prog); fflush(stderr); exit(4); } nusers = atoi(argv[1]); if (nusers < 1) { fprintf(stderr, "%s: impossible nusers (%d<-%s)\n", prog, nusers, argv[1]); fflush(stderr); exit(4); } fprintf(stderr, "%d Users\n", nusers); argc--; argv++; /* clone copies of myself to run up to MAXSTREAM users each */ firstuser = 0; fprintf(stderr, "master pid %d\n", getpid()); fflush(stderr); while (nusers > MAXSTREAM) { cmseq++; fflush(stderr); sprintf(logname, "Tmp/masterlog.%02d", cmseq); freopen(logname, "w", stderr); nstream = MAXSTREAM; /* build job streams for the clone */ getwork(nstream); fflush(stderr); if ((l = fork()) == -1) { /* fork failed */ freopen("Tmp/masterlog.00", "a", stderr); fatal("** clone fork failed **\n"); goto bepatient; } else if (l > 0) { /* I am the master with nstream fewer users to run */ freopen("Tmp/masterlog.00", "a", stderr); fprintf(stderr, "clone %d pid %d\n", cmseq, l); nusers -= nstream; firstuser += nstream; continue; } else { /* I am a clone, run nstream users */ master = 0; nusers = nstream; break; } } if (master) { cmseq = 0; nstream = nusers; /* build job streams for the master */ getwork(nstream); } close(0); for (i = 0; i < nstream; i++ ) { if (master) fprintf(stderr, "user %d master stream %d ", firstuser+i, i); else fprintf(stderr, "user %d clone %d stream %d ", firstuser+i, cmseq, i); if (pipe(pvec) == -1) { /* this is fatal */ fatal("** pipe failed **\n"); goto bepatient; } fflush(stderr); if ((work[i].pid = fork()) == 0) { int fd; /* the command */ if (pvec[0] != 0) { close(0); dup(pvec[0]); } sprintf(logname, "Tmp/userlog.%02d", firstuser+i); freopen(logname, "w", stderr); for (fd = 3; fd < nstream+4; fd++) close(fd); if (work[i].tty[0] != '\0') { close(pvec[1]); /* redirect std output */ if (freopen(work[i].tty, "w", stdout) == NULL) { fprintf(stderr, "makework: cannot open %s for std output\n", work[i].tty); fflush(stderr); goto bepatient; } } if (chdir(work[i].home) == -1) { fprintf(stderr, "makework: chdir to \"%s\" failed!\n", work[i].home); fflush(stderr); goto bepatient; } if ((uf = fopen("USER", "w")) == NULL) { fprintf(stderr, "makework: creat \"USER\" failed for user %d!\n", firstuser+i); fflush(stderr); goto bepatient; } fprintf(uf, "%d\n", firstuser+i); fclose(uf); execv(work[i].cmd, work[i].av); /* don't expect to get here! */ fatal("** exec failed **\n"); goto bepatient; } else if (work[i].pid == -1) { fatal("** fork failed **\n"); goto bepatient; } else { close(pvec[0]); work[i].fd = pvec[1]; work[i].line = work[i].bp = work[i].buf; fprintf(stderr, "pid %d pipe fd %d", work[i].pid, work[i].fd); if (work[i].tty[0] != '\0') fprintf(stderr, " > %s", work[i].tty); fputc('\n', stderr); } } fflush(stderr); srand(time(0)); thres = 0; done = output = 0; for (i = 0; i < nstream; i++) { if (work[i].blen == 0) done++; else thres += est_rate * GRANULE; } est_rate = thres; signal(SIGALRM, onalarm); signal(SIGPIPE, pipeerr); alarm(GRANULE); while (done < nstream) { for (i = 0; i < nstream; i++) { cp = &work[i]; if (cp->xmit >= cp->blen) continue; l = rand() % CHUNK + 1; /* 1-CHUNK chars */ if (l == 0) continue; if (cp->xmit + l > cp->blen) l = cp->blen - cp->xmit; p = cp->bp; cp->bp += l; cp->xmit += l; while (p < cp->bp) { if (*p == '\n' || (p == &cp->bp[-1] && cp->xmit >= cp->blen)) { /* write it out */ nch = p - cp->line + 1; if ((written = write(cp->fd, cp->line, nch)) != nch) { /* argh! */ cp->line[nch] = '\0'; fprintf(stderr, "user %d cmd %s ", firstuser+i, cp->line); fprintf(stderr, "write(,,%d) returns %d\n", nch, written); if (sigpipe) fatal("** SIGPIPE error **\n"); else fatal("** write error **\n"); goto bepatient; } if (fcopy) write(fcopy, cp->line, p - cp->line + 1); cp->line = &p[1]; } p++; } if (cp->xmit >= cp->blen) { done++; close(cp->fd); } output += l; } while (output > thres) { pause(); } } bepatient: alarm(0); /**** * If everything is going OK, we should simply be able to keep * looping unitil 'wait' fails, however some descendent process may * be in a state from which it can never exit, and so a timeout * is used. ****/ signal(SIGALRM, grunt); alarm(10000); while ((c = wait(&l)) != -1) { for (i = 0; i < nstream; i++) { if (c == work[i].pid) { fprintf(stderr, "user %d pid %d done", firstuser+i, c); if (l != 0) { if (l & 0x7f) fprintf(stderr, " status %d", l & 0x7f); if (l & 0xff00) fprintf(stderr, " exit code %d", (l>>8) & 0xff); exit_status = 4; } fputc('\n', stderr); c = work[i].pid = -1; break; } } if (c != -1) { fprintf(stderr, "clone done, pid %d ", c); if (l != 0) { if (l & 0x7f) fprintf(stderr, " status %d", l & 0x7f); if (l & 0xff00) fprintf(stderr, " exit code %d", (l>>8) & 0xff); exit_status = 4; } fputc('\n', stderr); } } alarm(0); wrapup("Finished waiting ..."); } onalarm() { thres += est_rate; signal(SIGALRM, onalarm); alarm(GRANULE); } grunt() { /* timeout after label "bepatient" in main */ exit_status = 4; wrapup("Timed out waiting for users to finish ..."); } pipeerr() { sigpipe++; } wrapup(reason) char *reason; { int i; int killed = 0; fflush(stderr); for (i = 0; i < nstream; i++) { if (work[i].pid > 0 && kill(work[i].pid, SIGKILL) != -1) { if (!killed) { killed++; fprintf(stderr, "%s\n", reason); fflush(stderr); } fprintf(stderr, "user %d pid %d killed off\n", firstuser+i, work[i].pid); fflush(stderr); } } exit(exit_status); }