|
DataMuseum.dkPresents historical artifacts from the history of: DKUUG/EUUG Conference tapes |
This is an automatic "excavation" of a thematic subset of
See our Wiki for more about DKUUG/EUUG Conference tapes Excavated with: AutoArchaeologist - Free & Open Source Software. |
top - metrics - downloadIndex: T c
Length: 39839 (0x9b9f) Types: TextFile Names: »chans.c«
└─⟦2d1937cfd⟧ Bits:30007241 EUUGD22: P.P 5.0 └─⟦dc59850a2⟧ »EurOpenD22/pp5.0/pp-5.tar.Z« └─⟦e5a54fb17⟧ └─⟦this⟧ »pp-5.0/Src/qmgr/chans.c«
/* chans.c: channel specific stuff for the qmgr */ # ifndef lint static char Rcsid[] = "@(#)$Header: /cs/research/pp/hubris/pp-beta/Src/qmgr/RCS/chans.c,v 5.0 90/09/20 16:21:02 pp Exp Locker: pp $"; # endif /* * $Header: /cs/research/pp/hubris/pp-beta/Src/qmgr/RCS/chans.c,v 5.0 90/09/20 16:21:02 pp Exp Locker: pp $ * * $Log: chans.c,v $ * Revision 5.0 90/09/20 16:21:02 pp * rcsforce : 5.0 public release * */ #include <isode/rosy.h> #include <isode/logger.h> #include "types.h" #include "ryinitiator.h" #include "Qmgr-ops.h" #include "util.h" extern struct Mtalist *findmtalist (); extern MsgStruct *newmsgstruct (); extern MsgStruct *find_msg (); extern Chanlist *findchanlist (); extern Mlist *findmtamsg (); static int cb_once_only = 0; static Connblk cbqueue; static Connblk *CHead = &cbqueue; static char *cb2name (); static char *cb_print (); static int cb_count = 0; struct eventqueue { struct eventqueue *ev_forw; struct eventqueue *ev_back; time_t ev_time; Connblk *ev_conn; enum cb_type ev_type; Chanlist *ev_clp; }; static struct eventqueue evqueue; static struct eventqueue *EHead = &evqueue; static int ev_count = 0; static struct eventqueue *newevblk (); static void freevblk (), init_events (); static void do_event (); static void channel_event (); static void special_event (); static void restart (); #define NULLEV ((struct eventqueue *)0) static int runnable (); static int invoke (); static int timeout_proc (); static int channel_ttl (); static int expiremsg (); /* static void setfds (); */ static void getresponse (); static void start_timer (); extern Mlist *nextmsg (); extern fd_set perf_rfds, perf_wfds; extern int perf_nfds; extern int delaytime; void review_channels (); void investigate_chan (); void investigate_mta (); void delay_channel (); void cleanup_conn (); int nchansrunning = 0; static int timer_running = 0; static int do_processmessage (), processmessage_result (), processmessage_error (), do_quit (); static int do_initchannel (), channelinit_result (); static int do_readqueue (), readqueue_result (); static int general_error (); static time_t nextchantime; static int rqueue; static int noperations; double load_avg[2]; static struct client_dispatch dis[] = { #define DIS_PROC 0 { "processmessage", operation_Qmgr_processmessage, do_processmessage, free_Qmgr_processmessage_argument, processmessage_result, processmessage_error, "Process some messages" }, #define DIS_INIT 1 { "channelInitialise", operation_Qmgr_channelInitialise, do_initchannel, free_Qmgr_channelInitialise_argument, channelinit_result, general_error, "Initialise a channel" }, #define DIS_QUIT 2 { "quit", 0, do_quit, NULLIFP, NULLIFP, NULLIFP, "terminate the association" }, #define DIS_READQ 3 { "readqueue", operation_Qmgr_readqueue, do_readqueue, free_Qmgr_readqueue_argument, readqueue_result, general_error, "Load up the queue" }, NULL }; void init_chans () { extern int ch_maxchans; extern void create_chan (); int i; PP_TRACE (("init_chans ()")); for (i = 0; i < ch_maxchans && ch_all[i]; i++) create_chan (ch_all[i]); sort_chans (); init_events (); } /* ARGSUSED */ int chan_lose (fd, acf) int fd; struct AcSAPfinish *acf; { Connblk *cb; PP_TRACE (("chan_lose (%d)", fd)); if ((cb = findcblk (fd)) == NULLCB) return ACS_ACCEPT; /* restore the state */ delay_channel (cb); if (fd != NOTOK) { FD_CLR (fd, &perf_rfds); FD_CLR (fd, &perf_wfds); } cb -> cb_fd = NOTOK; cleanup_conn (cb); return ACS_ACCEPT; } void start_specials () { Connblk *cb; PP_TRACE (("start_specials ()")); if (loader_chan == NULLCHANLIST) advise (LLOG_EXCEPTIONS, NULLCP, "NO loading channel specified"); else { loader_chan -> chan_update = 1; } if (trash_chan == NULLCHANLIST) advise (LLOG_EXCEPTIONS, NULLCP, "No debris channel specified."); else { cache_inc (&trash_chan -> cache, TRASH_START_DELAY); trash_chan -> chan_update = 1; } if (timeout_chan == NULLCHANLIST) advise (LLOG_EXCEPTIONS, NULLCP, "No timeout channel specified"); else { cb = newcblk (cb_timer); cb -> cb_proc = timeout_proc; cb -> cb_reload = timeout_time; (void) newevblk (NULLCHANLIST, cb, cb_timer, TIMEOUT_START_DELAY); } } #define LOAD_UPDATE_INTERVAL 15 #define L1_DECAY (0.9) #define L2_DECAY (0.92) static void do_load_avg () { static time_t lastload; if (lastload == 0) lastload = current_time; if (current_time - lastload < LOAD_UPDATE_INTERVAL) { load_avg[0] = L1_DECAY * load_avg[0] + (1.0 - L1_DECAY) * rqueue; return; } while (lastload < current_time) { load_avg[0] = L1_DECAY * load_avg[0] + (1.0 - L1_DECAY) * rqueue; load_avg[1] = L2_DECAY * load_avg[1] + (1.0 - L2_DECAY) * ((double)noperations / (double)LOAD_UPDATE_INTERVAL); lastload += LOAD_UPDATE_INTERVAL; } noperations = 0; PP_TRACE (("Load averages rq %g, ops %g", load_avg[0], load_avg[1])); lastload = current_time; } void schedule () { struct eventqueue *ev; static time_t lasttime; time_t delta; static int countdown = 50; PP_TRACE (("schedule ()")); if (opmode && nchansrunning == 0) { if (opmode == OP_SHUTDOWN) exit (0); if (opmode == OP_RESTART) restart (); } if (countdown -- < 0) { countdown = 50; sort_chans (); } if (lasttime == 0) lasttime = current_time; delta = current_time - lasttime; review_channels (current_time); do_load_avg (); if (EHead -> ev_forw != EHead) EHead -> ev_forw -> ev_time -= delta; for (ev = EHead -> ev_forw; ev != EHead;) { struct eventqueue *ev2 = ev -> ev_forw; if (ev -> ev_time <= 0) { do_event (ev); freevblk (ev); } else break; ev = ev2; } review_channels (current_time); if (EHead -> ev_forw == EHead ) delaytime = nextchantime; else { PP_DBG (("event head time = %d delta = %d", EHead -> ev_forw -> ev_time, delta)); delaytime = max(0, EHead -> ev_forw -> ev_time); } if (nextchantime < delaytime) delaytime = nextchantime; PP_TRACE (("delaytime == %d", delaytime)); lasttime = current_time; if (opmode && nchansrunning == 0) { if (opmode == OP_SHUTDOWN) exit (0); if (opmode == OP_RESTART) restart (); } } static void restart () { int n, i; extern char *cmddfldir, *dupfpath (); char *path; PP_TRACE (("restart ()")); while (CHead -> cb_forw != CHead) freecblk (CHead -> cb_forw); /* shut down connecitons */ n = getdtablesize (); i = (isatty(2) ? 3 : 0); for (; i <= n; i++) (void) close (i); #ifdef notdef for (i = 0; i < NSIG; i++) (void) signal (i, SIG_DFL); #endif path = dupfpath (cmddfldir, myname); execl (path, myname, NULLCP); execl (myname, myname, NULLCP); _exit(1); } void review_channels (now) time_t now; { Chanlist **clpp; time_t tdiff = 0; nextchantime = MAX_SLEEP; rqueue = nchansrunning; for (clpp = chan_list; clpp < &chan_list[nchanlist]; clpp++) { switch (runnable (*clpp, now)) { case OK: (void) newevblk (*clpp, NULLCB, (*clpp)-> chan_special ? cb_special : cb_channel, (time_t)0); /* fall */ case DONE: rqueue ++; break; case NOTOK: tdiff = (*clpp) -> nextevent - current_time; if (tdiff > 0 && tdiff < nextchantime) nextchantime = tdiff; break; } } } static int runnable (clp, now) Chanlist *clp; time_t now; { if (opmode) return NOTOK; if (clp -> chan_update) { investigate_chan (clp, now); clp -> chan_update = 0; } if (clp -> chan_special == 0 && clp -> num_msgs <= 0 && clp -> num_drs <= 0) /* special channel with things to do */ return NOTOK; if (clp -> chan_enabled == 0) /* channel switched off */ return NOTOK; if (clp -> nextevent > now ) /* not time yet */ return NOTOK; if (nchansrunning >= maxchansrunning) /* too many running already */ return DONE; if (clp -> nactive <= 0) return OK; /* None running - start one */ if (clp -> chan -> ch_maxproc > 0 && clp -> nactive >= clp -> chan -> ch_maxproc) return NOTOK; /* * So there is a channel already running: * OK - should we start more - to do this we need the following * o More mtas ready than chans currently running * o We started the last channel more than a 15 secs ago * * o We aren't consuming all the channels allowed * o More messages than channels * 5 OR last time we started was ages ago */ if (clp -> nmtas <= clp -> nactive || clp -> laststart + 15 > current_time) return NOTOK; /* ok - we ought to start a new channel - but should we? */ if (clp -> nactive >= max(1,maxchansrunning-1)) return DONE; /* Would like to - but out of resources */ if ((clp -> num_msgs + clp -> num_drs) > clp -> nactive * 5 || clp -> laststart + 60*5 < current_time) { char *mta = NULLCP; if (nextmsg (clp, &mta, 0) == NULL) return NOTOK; /* nope all locked */ PP_TRACE (("Can start a new chan on %s", mta)); free (mta); return OK; } /* No - this channel is just not ready to start another one */ return NOTOK; } static void do_event (ev) struct eventqueue *ev; { Connblk *cb; PP_TRACE (("do_event ()")); if ((cb = ev -> ev_conn) == NULLCB) { ev -> ev_conn = cb = newcblk (ev -> ev_type); cb -> cb_clp = ev -> ev_clp; } switch (cb -> cb_type) { case cb_channel: channel_event (ev); break; case cb_special: special_event (ev); break; case cb_timer: PP_TRACE (("Timeout event")); if (cb -> cb_proc) (*cb -> cb_proc)(); if (cb -> cb_reload) newevblk (NULLCHANLIST, cb, cb_timer, cb -> cb_reload); else freecblk (cb); break; default: PP_LOG (LLOG_EXCEPTIONS, ("Unknown type %d", cb -> cb_type)); break; } } static void channel_event (ev) struct eventqueue *ev; { Connblk *cb = ev -> ev_conn; Mlist *ml; Chanlist *clp; PP_TRACE (("channel_event ()")); if ((clp = ev -> ev_clp) == NULLCHANLIST) PP_TRACE (("NO chanlist of event queue")); switch (cb -> cb_state) { case cb_idle: if (runnable (clp, current_time) != OK) { freecblk (cb); return; } if ((ml = nextmsg (cb -> cb_clp, &cb -> cb_mta, 1)) == NULL) { clp -> chan_update = 1; freecblk (cb); return; } cb -> cb_ml = ml; cb -> cb_clp -> lastattempt = current_time; cb -> cb_clp -> laststart = current_time; switch (start_async (cb, clp -> channame)) { case NOTOK: cache_inc (&clp -> cache, cache_time); clp -> chan_update = 1; cleanup_conn (cb); break; case DONE: (void) newevblk (cb -> cb_clp, cb, cb -> cb_type, (time_t)0); /* and fall */ case OK: clp -> nactive ++; nchansrunning ++; break; } break; case cb_conn_established: chan_invoke (cb, DIS_INIT, (caddr_t *) &clp -> channame); break; case cb_conn_request1: case cb_conn_request2: case cb_init_sent: case cb_proc_sent: case cb_close_sent: PP_TRACE (("Shouldn't be in this state - %d", cb -> cb_state)); break; case cb_active: if (cb -> cb_clp -> chan_enabled == 0 || opmode) chan_invoke (cb, DIS_QUIT, (caddr_t *)0); else if (cb -> cb_ml == NULL && (cb -> cb_ml = nextmsg (cb -> cb_clp, &cb -> cb_mta, 1)) == NULL) { if (cb -> cb_clp -> cache.cachetime < current_time) /* stop it thrashing... */ cb -> cb_clp -> cache.cachetime = current_time + 15; chan_invoke (cb, DIS_QUIT, (caddr_t *)0); } else { chan_invoke (cb, DIS_PROC, (caddr_t *)0); } break; } } static void special_event (ev) struct eventqueue *ev; { Connblk *cb = ev -> ev_conn; Chanlist *clp = ev -> ev_clp; PP_TRACE (("special_event (%s)", cb_print (cb))); switch (cb -> cb_state) { case cb_idle: if (runnable (clp, current_time) != OK) { freecblk (cb); return; } clp -> lastattempt = current_time; clp -> laststart = current_time; switch (start_async (cb, clp -> channame)) { case NOTOK: PP_LOG (LLOG_EXCEPTIONS, ("Can't start special channel %s", clp -> channame)); cache_inc (&clp -> cache, (time_t)60); (void) newevblk (NULLCHANLIST, NULLCB, cb_timer, (time_t) 60); clp -> chan_update = 1; freecblk (cb); break; case DONE: (void) newevblk (clp, cb, cb -> cb_type, (time_t)0); case OK: clp -> nactive ++; nchansrunning ++; break; } break; case cb_active: chan_invoke (cb, DIS_QUIT, (caddr_t *)0); switch (clp -> chan -> ch_chan_type) { case CH_QMGR_LOAD: clp -> cache.cachetime = current_time + load_time; break; case CH_DEBRIS: clp -> cache.cachetime = current_time + debris_time; break; } clp -> chan_update = 1; break; case cb_conn_established: switch (clp -> chan -> ch_chan_type) { case CH_QMGR_LOAD: PP_TRACE (("start loading")); chan_invoke (cb, DIS_READQ, (caddr_t *)0); break; case CH_DEBRIS: PP_TRACE (("start debris collection")); chan_invoke (cb, DIS_INIT, (caddr_t *)&clp -> channame); break; case CH_TIMEOUT: break; } break; default: PP_LOG (LLOG_EXCEPTIONS, ("Bad state in special %d", cb -> cb_state)); break; } } int start_async (cb, title) Connblk *cb; char *title; { int sd; int result; PP_NOTICE (("Starting channel %s %s", title, cb_print(cb))); switch(result = assoc_start (title, &sd)) { case NOTOK: advise (LLOG_EXCEPTIONS, NULLCP, "Can't start channel %s", title); return NOTOK; #ifdef CONNECTING_1 case CONNECTING_1: cb -> cb_state = cb_conn_request1; cb -> cb_fd = sd; FD_CLR (sd, &perf_rfds); FD_SET (sd, &perf_wfds); if (sd >= perf_nfds) perf_nfds = sd + 1; break; case CONNECTING_2: cb -> cb_state = cb_conn_request2; cb -> cb_fd = sd; FD_CLR (sd, &perf_wfds); FD_SET (sd, &perf_rfds); if (sd >= perf_nfds) perf_nfds = sd + 1; break; #else case OK: cb -> cb_state = cb_conn_request1; cb -> cb_fd = sd; FD_CLR (sd, &perf_rfds); FD_SET (sd, &perf_wfds); if (sd >= perf_nfds) perf_nfds = sd + 1; break; #endif case DONE: cb -> cb_state = cb_conn_established; cb -> cb_fd = sd; FD_CLR (sd, &perf_wfds); FD_SET (sd, &perf_rfds); if (sd >= perf_nfds) perf_nfds = sd + 1; break; default: PP_LOG (LLOG_EXCEPTIONS, ("Unknown return code from assoc_start")); return NOTOK; } start_timer (); cb -> cb_ttl = current_time + CHAN_TIMEOUT; PP_DBG (("Association descriptor=%d", sd)); return result; } int chan_invoke (cb, type, arg) Connblk *cb; int type; caddr_t *arg; { struct client_dispatch *ds; int result; noperations ++; ds = &dis[type]; switch (type) { case DIS_INIT: case DIS_QUIT: PP_TRACE (("%s %s", ds -> ds_name, cb_print (cb))); break; case DIS_READQ: case DIS_PROC: PP_NOTICE (("%s %s", ds -> ds_name, cb_print (cb))); break; } switch (result = invoke (cb, table_Qmgr_Operations, ds, arg)) { case OK: break; case NOTOK: cb -> cb_state = cb_active; break; case DONE: cleanup_conn (cb); break; } return result; } /* \f */ static int invoke (cb, ops, ds, args) Connblk *cb; struct RyOperation ops[]; register struct client_dispatch *ds; char **args; { int result; caddr_t in; struct RoSAPindication rois; register struct RoSAPindication *roi = &rois; register struct RoSAPpreject *rop = &roi -> roi_preject; int sd; PP_TRACE (("invoke (%s)", cb_print (cb))); sd = cb -> cb_fd; in = NULL; if (ds -> ds_argument && (result = (*ds -> ds_argument) (sd, ds, args, &in)) != OK) return result; switch (result = RyStub (sd, ops, ds -> ds_operation, cb -> cb_id = RyGenID (sd), NULLIP, in, ds -> ds_result, ds -> ds_error, ROS_ASYNC, roi)) { case NOTOK: /* failure */ ros_advise (rop, "STUB"); if (ROS_FATAL (rop -> rop_reason)) return DONE; return NOTOK; case OK: /* got a result/error response */ break; case DONE: /* got RO-END? */ adios (NULLCP, "got RO-END.INDICATION"); return NOTOK; default: adios (NULLCP, "unknown return from RyStub=%d", result); /* NOTREACHED */ } if (ds -> ds_free && in) (void) (*ds -> ds_free) (in); return OK; } void chan_manage (fd) int fd; { register Connblk *cb; int sd; if ((cb = findcblk (fd)) == NULLCB) { advise (LLOG_EXCEPTIONS, NULLCP, "fd %d is not registered!", fd); FD_CLR (fd, &perf_rfds); FD_CLR (fd, &perf_wfds); return; } PP_TRACE (("chan_manage (%s)", cb_print (cb) )); switch (cb -> cb_state) { case cb_conn_request1: case cb_conn_request2: PP_TRACE (("Awaiting async connection (state %d)", cb -> cb_state == cb_conn_request1 ? 1 : 2)); switch (acsap_retry (sd = cb -> cb_fd)) { case NOTOK: PP_TRACE (("Connection error on %s", cb_print (cb))); delay_channel (cb); cb -> cb_state = cb_idle; cleanup_conn (cb); return; #ifdef CONNECTING_1 case CONNECTING_1: cb -> cb_state = cb_conn_request1; PP_TRACE (("State 1 on %s", cb_print (cb))); FD_CLR (sd, &perf_rfds); FD_SET (sd, &perf_wfds); if (sd >= perf_nfds) perf_nfds = sd + 1; break; case CONNECTING_2: cb -> cb_state = cb_conn_request2; PP_TRACE (("State 2 on %s", cb_print (cb))); FD_CLR (sd, &perf_wfds); FD_SET (sd, &perf_rfds); if (sd >= perf_nfds) perf_nfds = sd + 1; break; #else case OK: cb -> cb_state = cb_conn_request2; PP_TRACE (("Switching to state 2 on %s", cb_print(cb))); FD_CLR (sd, &perf_wfds); FD_SET (sd, &perf_rfds); if (sd >= perf_nfds) perf_nfds = sd + 1; break; #endif case DONE: cb -> cb_state = cb_conn_established; PP_TRACE (("connection now established on %s", cb_print (cb))); FD_CLR (sd, &perf_wfds); FD_SET (sd, &perf_rfds); if (sd >= perf_nfds) perf_nfds = sd + 1; (void) newevblk (cb -> cb_clp, cb, cb -> cb_type, (time_t)0); break; } break; case cb_idle: freecblk (cb); break; case cb_proc_sent: PP_TRACE (("Awaiting proc response ...")); getresponse (cb, OK); break; case cb_init_sent: PP_TRACE (("Awaiting init response")); getresponse (cb, OK); break; default: PP_TRACE (("Funny state!")); break; } } static void getresponse (cb, type) Connblk *cb; int type; { caddr_t out; struct RoSAPindication rois; register struct RoSAPindication *roi = &rois; register struct RoSAPpreject *rop = &roi -> roi_preject; PP_TRACE (("getresponse (%s)", cb_print (cb))); switch (RyWait (cb -> cb_fd, &cb -> cb_id, &out, type, roi)) { case NOTOK: if (rop -> rop_reason == ROS_TIMER) break; PP_TRACE (("RyWait returned NOTOK")); ros_advise (rop, "STUB"); if (ROS_FATAL (rop -> rop_reason)) { delay_channel (cb); cleanup_conn (cb); } break; case OK: PP_TRACE (("RyWait returned OK")); cb -> cb_state = cb_active; (void) newevblk (cb -> cb_clp, cb, cb -> cb_type, (time_t)0); break; case DONE: cb -> cb_ml = NULL; cleanup_conn (cb); break; } } #ifdef notdef static void setfds () { register Connblk *cb; struct PSAPindication pi; PP_TRACE (("setfds ()")); FD_ZERO (&perf_wfds); FD_ZERO (&perf_rfds); perf_nfds = 0; for (cb = CHead -> cb_forw; cb != CHead; cb = cb -> cb_forw) switch (cb -> cb_state) { case cb_conn_request1: if (PSelectMask (cb -> cb_fd, &perf_wfds, &perf_nfds, &pi) == NOTOK) PP_LOG (LLOG_EXCEPTIONS, ("PSelectMask failed")); break; case cb_conn_request2: case cb_proc_sent: case cb_init_sent: case cb_conn_established: if (PSelectMask (cb -> cb_fd, &perf_rfds, &perf_nfds, &pi) == NOTOK) PP_LOG (LLOG_EXCEPTIONS, ("PSelectMask failed")); break; default: break; } PP_TRACE (("nrfds=%d rfds=0x%x wfds=0x%x", perf_nfds, perf_rfds.fds_bits[0], perf_wfds.fds_bits[0])); } #endif /* ARGSUSED */ static int do_quit (sd, ds, args, arg) int sd; struct client_dispatch *ds; char **args; caddr_t *arg; { Connblk *cb; PP_TRACE (("do_quit(%d)", sd)); assoc_release (sd); if (sd != NOTOK) { FD_CLR (sd, &perf_wfds); FD_CLR (sd, &perf_rfds); } if ((cb = findcblk (sd)) != NULLCB) cb -> cb_fd = NOTOK; else return DONE; PP_TRACE (("do_quit (%s)", cb_print (cb) )); if (cb -> cb_type == cb_channel) cb -> cb_clp -> chan_update = 1; PP_NOTICE (("Closing channel %s", cb_print (cb))); return DONE; } /* connection block functions */ Connblk *newcblk (type) enum cb_type type; { Connblk *cb; PP_TRACE (("newcblk () cnt=%d", cb_count)); cb = (Connblk *) calloc (1, sizeof *cb); if (cb == NULLCB) return cb; cb -> cb_fd = NOTOK; cb -> cb_type = type; if (cb_once_only == 0) { CHead -> cb_forw = CHead -> cb_back = CHead; cb_once_only ++; } insque (cb, CHead -> cb_back); cb_count ++; return cb; } void freecblk (cb) Connblk *cb; { struct AcSAPindication acis; struct RoSAPindication rois; register struct RoSAPindication *roi = &rois; if (cb == NULLCB) return; PP_TRACE (("freecblk (%s) cnt=%d", cb_print(cb), cb_count)); if (cb -> cb_fd != NOTOK) { PP_NOTICE (("Killing channel %s", cb_print (cb))); (void) AcUAbortRequest (cb -> cb_fd, NULLPEP, 0, &acis); (void) RyLose (cb -> cb_fd, roi); FD_CLR (cb -> cb_fd, &perf_wfds); FD_CLR (cb -> cb_fd, &perf_rfds); } remque (cb); cb_count --; free ((char *) cb); } Connblk *findcblk (fd) int fd; { Connblk *cb; PP_TRACE (("findcblk (%d) cnt=%d", fd, cb_count)); if (cb_once_only == 0) return NULLCB; for (cb = CHead -> cb_forw; cb != CHead; cb = cb -> cb_forw) if (cb -> cb_fd == fd) return cb; PP_TRACE (("Can't locate block with %d (%d alloc'd)", fd, cb_count)); return NULLCB; } static char *cb2name (cb) Connblk *cb; { switch (cb -> cb_type) { case cb_channel: case cb_special: if (cb -> cb_clp && cb -> cb_clp -> channame) return cb -> cb_clp -> channame; else return "unknown channel"; case cb_timer: return "timer"; case cb_responder: return "responder"; default: break; } return "something"; } static char *cb_print (cb) Connblk *cb; { static char buf[128]; switch (cb -> cb_type) { case cb_channel: case cb_special: (void) sprintf (buf, "<fd=%d chan=%s mta=%s msg=%s>", cb -> cb_fd, cb2name(cb), cb -> cb_mta ? cb -> cb_mta : "none", (cb -> cb_ml && cb -> cb_ml -> ms ) ? cb -> cb_ml -> ms -> queid : "none"); break; case cb_timer: (void) sprintf (buf, "<timer proc=0x%x reload=%d>", cb -> cb_proc, cb -> cb_reload); break; case cb_responder: (void) sprintf (buf, "<fd=%d responder auth=%d>", cb -> cb_fd, cb -> cb_authenticated); break; } return buf; } static struct eventqueue *newevblk (clp, cb, type, when) Chanlist *clp; Connblk *cb; time_t when; enum cb_type type; { struct eventqueue *ev, *pev; PP_TRACE (("newevblk (clp, cb, %d, %d) cnt=%d", type, when, ev_count)); ev = (struct eventqueue *) calloc (1, sizeof *ev); if (ev == NULLEV) return ev; ev_count ++; ev -> ev_clp = clp; ev -> ev_conn = cb; ev -> ev_type = type; for (pev = EHead -> ev_forw; pev != EHead; pev = pev -> ev_forw) { if (pev -> ev_time > when) { insque (ev, pev -> ev_back); break; } else when -= pev -> ev_time; } if (pev == EHead) insque (ev, EHead -> ev_back); for (; pev != EHead; pev = pev -> ev_forw) pev -> ev_time -= when; ev -> ev_time = when; return ev; } static void freevblk (ev) struct eventqueue *ev; { PP_TRACE (("freevblk () cnt=%d", ev_count)); if (ev == NULLEV) return; if (ev -> ev_forw != EHead) ev -> ev_forw -> ev_time += ev -> ev_time; remque (ev); free ((char *) ev); ev_count --; } #ifdef notdef static time_t evtimebyclp (clp) Chanlist *clp; { struct eventqueue *ev; time_t now = current_time; for (ev = EHead -> ev_forw; ev != EHead; ev = ev -> ev_forw) if (ev -> ev_clp == clp) return now + ev -> ev_time; else now += ev -> ev_time; return now + MAX_SLEEP + 2; } static struct eventqueue *findevbyclp (clp) Chanlist *clp; { struct eventqueue *ev; for (ev = EHead -> ev_forw; ev != EHead; ev = ev -> ev_forw) if (ev -> ev_clp == clp) return ev; return NULLEV; } #endif static void init_events () { PP_TRACE (("init_events ()")); EHead -> ev_forw = EHead -> ev_back = EHead; } #define DECAY_FACTOR 0.7 /* decay filter - set fairly high */ void inc_channel (cb, number) /* successful delivery */ Connblk *cb; int number; { MsgStruct *ms; Reciplist *rlp; Mtalist *mlp; LIST_RCHAN *clp; int i; double f; PP_TRACE (("inc_channel (%s, %d)", cb_print (cb), number)); cb -> cb_clp -> lastsuccess = current_time; ms = cb -> cb_ml -> ms; i = ms -> size / 1000; i = max (i, 1); f = (double)(current_time - cb -> cb_clp -> lastattempt) / (double)i; cb -> cb_clp -> averaget = (DECAY_FACTOR * cb -> cb_clp -> averaget) + (1 - DECAY_FACTOR) * f; PP_TRACE (("average = %g, this value = %g", cb -> cb_clp -> averaget, f)); clear_msgcache (cb -> cb_ml); delfromchan (cb -> cb_clp, cb -> cb_mta, cb -> cb_ml, number); cache_clear (&cb -> cb_clp -> cache); if (mlp = findmtalist (cb -> cb_clp, cb -> cb_mta)) { cache_clear (&mlp -> cache); mlp -> lastsuccess = current_time; } if (ms == NULL) { advise (LLOG_EXCEPTIONS, NULLCP, "Can't locate message!"); return; } for (rlp = ms -> recips; rlp; rlp = rlp -> rp_next) if (rlp -> id == number) break; if (rlp == NULL) { advise (LLOG_EXCEPTIONS, NULLCP, "No recipient %d in list!", number); return; } switch (rlp -> status) { case st_normal: i = ++rlp -> chans_done; for (clp = rlp -> chans; clp && i > 0; clp = clp -> li_next, i--) continue; if (clp) { insertinchan (clp -> li_chan, ms, rlp, chan2mta (clp -> li_chan, rlp)); return; } /* else fall through */ case st_dr: rlp -> status = st_delete; if (ms -> count == 0) insertindelchan (ms); break; case st_delete: cb -> cb_ml = NULL; maybezapmsg (ms); break; } } void delay_host (cb, rno, str) Connblk *cb; int rno; char *str; { struct Mtalist *mlp, *findmtalist (); PP_TRACE (("delay_host (%s, %d)", cb_print (cb), rno)); if (cb -> cb_ml) msg_unlock (cb -> cb_ml -> ms); if ((mlp = findmtalist (cb -> cb_clp, cb -> cb_mta)) == NULL) { advise (LLOG_EXCEPTIONS, NULLCP, "Can't locate mta in queue"); return; } if (mlp -> info) { free (mlp -> info); mlp -> info = NULLCP; } if (str) mlp -> info = strdup (str); cache_inc (&mlp -> cache, cache_time*3/2); mlp -> mta_changed = 1; if (cb -> cb_clp) cb -> cb_clp -> chan_update = 1; } void delay_message (cb, str) Connblk *cb; char *str; { Mtalist *mlp; PP_TRACE (("delay_message (%s)", cb_print (cb))); if (cb -> cb_ml == NULL) return; if ((mlp = findmtalist (cb -> cb_clp, cb -> cb_mta)) == NULL) { advise (LLOG_EXCEPTIONS, NULLCP, "Can't locate mta in queue"); return; } if (cb -> cb_ml -> info) { free (cb -> cb_ml -> info); cb -> cb_ml -> info = NULLCP; } if (str) cb -> cb_ml -> info = strdup (str); cb -> cb_ml -> ms -> nerrors ++; msg_unlock (cb -> cb_ml -> ms); msgcache_inc (cb -> cb_ml, cache_time*2); mlp -> mta_changed = 1; cb -> cb_clp -> chan_update = 1; } static void bad_recip (cb, rno) Connblk *cb; int rno; { Reciplist *rlp; Mtalist *mlp; PP_TRACE (("bad_recip (%s, %d)", cb_print (cb), rno)); cb -> cb_clp -> lastsuccess = current_time; if (mlp = findmtalist (cb -> cb_clp, cb -> cb_mta)) mlp -> lastsuccess = current_time; delfromchan (cb -> cb_clp, cb -> cb_mta, cb -> cb_ml, rno); for (rlp = cb -> cb_ml -> ms -> recips; rlp; rlp = rlp -> rp_next) if (rlp -> id == rno) break; if (rlp == NULLRL) return; rlp -> chans_done = 0; insertindrchan (cb -> cb_ml -> ms, rlp); } static void sharedDR (cb, rno) Connblk *cb; int rno; { Mtalist *mlp; PP_TRACE (("sharedDR (%s, %d)", cb_print (cb), rno)); cb -> cb_clp -> lastsuccess = current_time; if (mlp = findmtalist (cb -> cb_clp, cb -> cb_mta)) mlp -> lastsuccess = current_time; delfromchan (cb -> cb_clp, cb -> cb_mta, cb -> cb_ml, rno); } /* ARGSUSED */ static int do_processmessage (sd, ds, args, arg) int sd; struct client_dispatch *ds; char **args; struct type_Qmgr_ProcMsg **arg; { Connblk *cb; Mlist *ml; Mtalist *mlp; register struct type_Qmgr_ProcMsg *pm; struct type_Qmgr_UserList *lusers (); char *p; PP_TRACE (("do_processmessage (%d)", sd)); if ((cb = findcblk (sd)) == NULLCB) return NOTOK; if (cb -> cb_state != cb_active) return NOTOK; ml = cb -> cb_ml; if (ml == NULL) { PP_LOG (LLOG_EXCEPTIONS, ("Missing ml structure")); return NOTOK; } *arg = pm = (struct type_Qmgr_ProcMsg *) malloc (sizeof (**arg)); pm -> channel = str2qb (cb -> cb_clp -> channame, strlen (cb -> cb_clp -> channame), 1); p = ml -> ms -> queid; pm -> qid = str2qb (p, strlen (p), 1); pm -> users = lusers (ml, 0); if (pm -> users == NULL) PP_LOG (LLOG_EXCEPTIONS, ("Empty user list!")); cb -> cb_ttl = current_time + CHAN_TIMEOUT + (ml -> ms -> size /20); cb -> cb_state = cb_proc_sent; if (mlp = findmtalist (cb -> cb_clp, cb -> cb_mta)) mlp -> lastattempt = current_time; return OK; } /* ARGSUSED */ static int do_readqueue (sd, ds, args, arg) int sd; struct client_dispatch *ds; char **args; struct type_Qmgr_ReadQueue__Pseudo **arg; { Connblk *cb; PP_TRACE (("do_readqueue (%d)", sd)); if ((cb = findcblk (sd)) == NULLCB) return NOTOK; cb -> cb_state = cb_proc_sent; return OK; } /* ARGSUSED */ static int do_initchannel (sd, ds, args, arg) int sd; struct client_dispatch *ds; char **args; struct type_Qmgr_Channel **arg; { Connblk *cb; if ((cb = findcblk (sd)) == NULLCB) return NOTOK; PP_TRACE (("do_initchannel (%s)", cb_print (cb) )); *arg = str2qb (*args, strlen (*args), 1); cb -> cb_state = cb_init_sent; cb -> cb_ttl = current_time + CHAN_TIMEOUT; return OK; } /* ARGSUSED */ static int processmessage_result (sd, id, dummy, result, roi) int sd, id, dummy; register struct type_Qmgr_DeliveryStatus *result; struct RoSAPindication *roi; { Connblk *cb; struct type_Qmgr_DeliveryStatus *ds; int rno; char buf[LINESIZE]; char *info; PP_TRACE (("processmessage_result (%d)", sd)); if ((cb = findcblk (sd)) == NULLCB) { advise (LLOG_EXCEPTIONS, NULLCP, "No connection block for %d", sd); return NOTOK; } PP_TRACE (("processmessage_result (%s)", cb_print (cb))); for (ds = result; ds; ds = ds -> next) { rno = ds -> IndividualDeliveryStatus -> recipient -> parm; (void) sprintf (buf, "Recipient ID %d %s:", rno, cb_print (cb)); if (ds -> IndividualDeliveryStatus -> info) info = qb2str(ds -> IndividualDeliveryStatus -> info); else info = NULLCP; switch (ds -> IndividualDeliveryStatus -> status) { case int_Qmgr_status_success: PP_NOTICE (("%s success", buf)); inc_channel (cb, rno); break; case int_Qmgr_status_negativeDR: case int_Qmgr_status_positiveDR: PP_NOTICE (("%s %s", buf, ds -> IndividualDeliveryStatus -> status == int_Qmgr_status_positiveDR ? "positiveDR" : "negativeDR")); bad_recip (cb, rno); break; case int_Qmgr_status_successSharedDR: case int_Qmgr_status_failureSharedDR: PP_NOTICE (("%s %s", buf, ds -> IndividualDeliveryStatus -> status == int_Qmgr_status_failureSharedDR ? "failureSharedDR" : "sucessSharedDR")); sharedDR(cb, rno); break; case int_Qmgr_status_messageFailure: PP_NOTICE (("%s messageFailure (%s)", buf, info ? info : "")); delay_message (cb, info); break; case int_Qmgr_status_mtaFailure: PP_NOTICE (("%s mtaFailure (%s)", buf, info ? info : "")); delay_host (cb, rno, info); break; case int_Qmgr_status_mtaAndMessageFailure: PP_NOTICE (("%s mtaAndMessageFailure (%s)", buf, info ? info : "")); delay_message (cb, info); delay_host (cb, rno, info); break; default: PP_NOTICE (("%s Unknown response", buf)); break; } if (info) free (info); } cb -> cb_state = cb_active; cb -> cb_ml = NULL; return OK; } /* \f RPC procedures... */ /* ARGSUSED */ static int readqueue_result (sd, id, dummy, result, roi) int sd, id, dummy; register struct type_Qmgr_MsgList *result; struct RoSAPindication *roi; { struct type_Qmgr_MsgStructList *ml; MsgStruct *ms, *oldms; char *p; struct type_Qmgr_QidList *ql; PP_TRACE (("readqueue_result (%d, %d)", sd, id)); for (ml = result -> msgs; ml; ml = ml -> next) { ms = newmsgstruct (ml -> MsgStruct); if (oldms = find_msg (ms -> queid)) { (void) updatemessage (oldms, ms); freems (ms); } else (void) insertmessage (ms); } for (ql = result -> deleted; ql; ql = ql -> next) { p = qb2str (ql -> QID); if (ms = find_msg (p)) kill_msg (ms); free (p); } return OK; } /* ARGSUSED */ static int channelinit_result (sd, id, dummy, result, roi) int sd, id, dummy; struct type_Qmgr_Pseudo__channelInitialise *result; struct RoSAPindication *roi; { PP_TRACE (("channelinit_result (%d, %d)", sd, id)); return OK; } /* ARGSUSED */ static int general_error (sd, id, error, parameter, roi) int sd, id, error; caddr_t parameter; struct RoSAPindication *roi; { register struct RyError *rye; Connblk *cb; if ((cb = findcblk (sd)) == NULLCB) return NOTOK; PP_TRACE (("general_error (%s)", cb_print(cb))); if (cb -> cb_type != cb_channel) { PP_LOG (LLOG_EXCEPTIONS, ("Not a channel!!!")); return NOTOK; } if (error == RY_REJECT) { advise (LLOG_EXCEPTIONS, NULLCP, "%s", RoErrString ((int) parameter)); delay_channel (cb); return NOTOK; } if (rye = finderrbyerr (table_Qmgr_Errors, error)) advise (LLOG_EXCEPTIONS, NULLCP, "%s", rye -> rye_name); else advise (LLOG_EXCEPTIONS, NULLCP, "Error %d", error); delay_channel (cb); return NOTOK; } /* ARGSUSED */ static int processmessage_error (sd, id, error, parameter, roi) int sd, id, error; caddr_t parameter; struct RoSAPindication *roi; { register struct RyError *rye; Connblk *cb; char tbuf[128]; if ((cb = findcblk (sd)) == NULLCB) return NOTOK; PP_TRACE (("processmessage_error (%s)", cb_print(cb))); if (cb -> cb_type != cb_channel) { PP_LOG (LLOG_EXCEPTIONS, ("Not a channel!!!")); return NOTOK; } if (error == RY_REJECT) { advise (LLOG_EXCEPTIONS, NULLCP, "%s", RoErrString ((int) parameter)); delay_message (cb, RoErrString ((int) parameter)); return OK; } if (rye = finderrbyerr (table_Qmgr_Errors, error)) (void) sprintf (tbuf, "process message error %s", rye -> rye_name); else (void) sprintf (tbuf, "process message error %d", error); advise (LLOG_EXCEPTIONS, NULLCP, "%s", tbuf); delay_message (cb, tbuf); return OK; } void delay_channel (cb) Connblk *cb; { PP_TRACE (("delay_channel (%s)", cb_print (cb) )); if (cb -> cb_type == cb_responder || cb -> cb_type == cb_timer) return; if (cb -> cb_clp == NULL) return; cache_inc (&(cb -> cb_clp -> cache), cache_time); cb -> cb_clp -> chan_update = 1; } void investigate_chan (clp, now) Chanlist *clp; time_t now; { Mtalist *mlp; PP_TRACE (("investigate_chan (%s)", clp -> channame)); clp -> nextevent = now + MAX_SLEEP; clp -> oldest = now; clp -> nmtas = 0; for (mlp = clp -> mtas -> mta_forw; mlp != clp -> mtas; mlp = mlp -> mta_forw) { if (mlp -> mta_changed || mlp -> nextevent < current_time) { investigate_mta (mlp, now); mlp -> mta_changed = 0; } if (mlp -> oldest < clp -> oldest) clp -> oldest = mlp -> oldest; if (!mtaready (mlp)) continue; clp -> nmtas ++; if (mlp -> nextevent < clp -> nextevent) clp -> nextevent = mlp -> nextevent; } if (clp -> chan_special) clp -> nextevent = now; if (clp -> cache.cachetime > now) clp -> nextevent = clp -> cache.cachetime; if (clp -> chan_enabled == 0) clp -> nextevent = now + MAX_SLEEP; } void investigate_mta (mlp, now) Mtalist *mlp; time_t now; { Mlist *ml; time_t cachet; PP_TRACE (("investigate_mta (%s)", mlp -> mtaname)); mlp -> nextevent = now + MAX_SLEEP + 1; mlp -> oldest = now; for (ml = mlp -> msgs -> ml_forw; ml != mlp -> msgs; ml = ml -> ml_forw) { /* this calc has to be done - or we could optimise */ if (ml -> ms -> age < mlp -> oldest) mlp -> oldest = ml -> ms -> age; if (!msgready (ml)) continue; if ((cachet = msgmincache (ml)) > now) { if (cachet < mlp -> nextevent) mlp -> nextevent = cachet; } else if (ml -> ms -> defferedtime && ml -> ms -> defferedtime < mlp -> nextevent) mlp -> nextevent = ml -> ms -> defferedtime; else mlp -> nextevent = now; } if (mlp -> cache.cachetime > now) /* oops - cached */ mlp -> nextevent = mlp -> cache.cachetime; if (mlp -> mta_enabled == 0) mlp -> nextevent = now + MAX_SLEEP + 1; } void cleanup_conn (cb) Connblk *cb; { PP_TRACE (("cleanup_conn(%s)", cb_print (cb))); switch (cb -> cb_type) { case cb_responder: case cb_timer: break; default: if (cb -> cb_clp) { Mtalist *mlp; if (cb -> cb_mta && (mlp = findmtalist(cb ->cb_clp, cb -> cb_mta))) { mlp -> nactive --; if (cb -> cb_state == cb_idle || cb -> cb_state == cb_proc_sent) delay_message (cb, "Connection broke"); } cb -> cb_clp -> nactive --; nchansrunning --; } if (cb -> cb_ml) msg_unlock (cb -> cb_ml -> ms); break; } freecblk (cb); } static int timeout_proc () { MsgStruct **msp, *ms; PP_TRACE (("timeout_proc")); if (timeout_chan -> chan_enabled == 0) return; for (msp = msg_hash; msp < &msg_hash[HASHSIZE]; ) { for (ms = *msp; ms; ms = ms -> ms_forw) { if (ms -> m_locked) continue; if (ms -> expirytime < current_time && expiremsg (ms) == DONE) break; } if (!ms) msp ++; } } static int expiremsg (ms) MsgStruct *ms; { int first = 1; Reciplist *rlp; int i; Chanlist *clp; Mtalist *mlp; Mlist *ml; LIST_RCHAN *lrp; PP_TRACE (("expiremsg (%s)", ms -> queid)); for (rlp = ms -> recips; rlp; rlp = rlp -> rp_next) { if (rlp -> id == 0) continue; switch (rlp -> status) { case st_dr: case st_delete: case st_timeout: continue; /* leave these alone! */ case st_normal: i = rlp -> chans_done; for (lrp = rlp -> chans; lrp && i > 0; lrp = lrp -> li_next, i--) continue; if (lrp == NULL) continue; if ((clp = findchanlist (lrp -> li_chan)) == NULLCHANLIST) continue; mlp = findmtalist (clp, rlp -> mta); if (mlp == NULLMTALIST) continue; ml = findmtamsg (mlp, ms -> queid); if (ml == NULLMLIST) continue; delfromchan (clp, mlp -> mtaname, ml, rlp -> id); rlp -> status = st_timeout; if (first) { insertinchan (timeout_chan -> chan, ms, rlp, timeout_chan -> channame); first = 0; } } } return first == 1 ? OK : DONE; } static int channel_ttl () { struct AcSAPindication acis; struct RoSAPindication rois; register struct RoSAPindication *roi = &rois; Connblk *cb; PP_TRACE (("channel_ttl ()")); if (cb_once_only == 0) return OK; for (cb = CHead -> cb_forw; cb != CHead; cb = cb -> cb_forw) { if (cb -> cb_type == cb_channel && cb -> cb_ttl && cb -> cb_ttl < current_time) { PP_NOTICE (("Channel %s taking too long - aborting", cb_print (cb))); (void) AcUAbortRequest (cb -> cb_fd, NULLPEP, 0, &acis); (void) RyLose (cb -> cb_fd, roi); chan_lose (cb -> cb_fd, (struct AcSAPfinish *)0); if (cb -> cb_fd != NOTOK) { FD_CLR (cb -> cb_fd, &perf_rfds); FD_CLR (cb -> cb_fd, &perf_wfds); } } } timer_running = 0; if (nchansrunning > 0) start_timer (); return OK; } static void start_timer () { Connblk *cb; if (timer_running) return; cb = newcblk (cb_timer); cb -> cb_proc = channel_ttl; cb -> cb_reload = 0; (void) newevblk (NULLCHANLIST, cb, cb_timer, CHAN_TIMEOUT); timer_running = 1; }