|
|
DataMuseum.dkPresents historical artifacts from the history of: DKUUG/EUUG Conference tapes |
This is an automatic "excavation" of a thematic subset of
See our Wiki for more about DKUUG/EUUG Conference tapes Excavated with: AutoArchaeologist - Free & Open Source Software. |
top - metrics - downloadIndex: T c
Length: 39839 (0x9b9f)
Types: TextFile
Names: »chans.c«
└─⟦2d1937cfd⟧ Bits:30007241 EUUGD22: P.P 5.0
└─⟦dc59850a2⟧ »EurOpenD22/pp5.0/pp-5.tar.Z«
└─⟦e5a54fb17⟧
└─⟦this⟧ »pp-5.0/Src/qmgr/chans.c«
/* chans.c: channel specific stuff for the qmgr */
# ifndef lint
static char Rcsid[] = "@(#)$Header: /cs/research/pp/hubris/pp-beta/Src/qmgr/RCS/chans.c,v 5.0 90/09/20 16:21:02 pp Exp Locker: pp $";
# endif
/*
* $Header: /cs/research/pp/hubris/pp-beta/Src/qmgr/RCS/chans.c,v 5.0 90/09/20 16:21:02 pp Exp Locker: pp $
*
* $Log: chans.c,v $
* Revision 5.0 90/09/20 16:21:02 pp
* rcsforce : 5.0 public release
*
*/
#include <isode/rosy.h>
#include <isode/logger.h>
#include "types.h"
#include "ryinitiator.h"
#include "Qmgr-ops.h"
#include "util.h"
extern struct Mtalist *findmtalist ();
extern MsgStruct *newmsgstruct ();
extern MsgStruct *find_msg ();
extern Chanlist *findchanlist ();
extern Mlist *findmtamsg ();
static int cb_once_only = 0;
static Connblk cbqueue;
static Connblk *CHead = &cbqueue;
static char *cb2name ();
static char *cb_print ();
static int cb_count = 0;
struct eventqueue {
struct eventqueue *ev_forw;
struct eventqueue *ev_back;
time_t ev_time;
Connblk *ev_conn;
enum cb_type ev_type;
Chanlist *ev_clp;
};
static struct eventqueue evqueue;
static struct eventqueue *EHead = &evqueue;
static int ev_count = 0;
static struct eventqueue *newevblk ();
static void freevblk (), init_events ();
static void do_event ();
static void channel_event ();
static void special_event ();
static void restart ();
#define NULLEV ((struct eventqueue *)0)
static int runnable ();
static int invoke ();
static int timeout_proc ();
static int channel_ttl ();
static int expiremsg ();
/* static void setfds (); */
static void getresponse ();
static void start_timer ();
extern Mlist *nextmsg ();
extern fd_set perf_rfds, perf_wfds;
extern int perf_nfds;
extern int delaytime;
void review_channels ();
void investigate_chan ();
void investigate_mta ();
void delay_channel ();
void cleanup_conn ();
int nchansrunning = 0;
static int timer_running = 0;
static int do_processmessage (), processmessage_result (),
processmessage_error (), do_quit ();
static int do_initchannel (), channelinit_result ();
static int do_readqueue (), readqueue_result ();
static int general_error ();
static time_t nextchantime;
static int rqueue;
static int noperations;
double load_avg[2];
static struct client_dispatch dis[] = {
#define DIS_PROC 0
{
"processmessage", operation_Qmgr_processmessage,
do_processmessage, free_Qmgr_processmessage_argument,
processmessage_result, processmessage_error,
"Process some messages"
},
#define DIS_INIT 1
{
"channelInitialise", operation_Qmgr_channelInitialise,
do_initchannel, free_Qmgr_channelInitialise_argument,
channelinit_result, general_error,
"Initialise a channel"
},
#define DIS_QUIT 2
{
"quit", 0, do_quit, NULLIFP, NULLIFP, NULLIFP,
"terminate the association"
},
#define DIS_READQ 3
{
"readqueue", operation_Qmgr_readqueue,
do_readqueue, free_Qmgr_readqueue_argument,
readqueue_result, general_error,
"Load up the queue"
},
NULL
};
void init_chans ()
{
extern int ch_maxchans;
extern void create_chan ();
int i;
PP_TRACE (("init_chans ()"));
for (i = 0; i < ch_maxchans && ch_all[i]; i++)
create_chan (ch_all[i]);
sort_chans ();
init_events ();
}
/* ARGSUSED */
int chan_lose (fd, acf)
int fd;
struct AcSAPfinish *acf;
{
Connblk *cb;
PP_TRACE (("chan_lose (%d)", fd));
if ((cb = findcblk (fd)) == NULLCB)
return ACS_ACCEPT;
/* restore the state */
delay_channel (cb);
if (fd != NOTOK) {
FD_CLR (fd, &perf_rfds);
FD_CLR (fd, &perf_wfds);
}
cb -> cb_fd = NOTOK;
cleanup_conn (cb);
return ACS_ACCEPT;
}
void start_specials ()
{
Connblk *cb;
PP_TRACE (("start_specials ()"));
if (loader_chan == NULLCHANLIST)
advise (LLOG_EXCEPTIONS, NULLCP,
"NO loading channel specified");
else {
loader_chan -> chan_update = 1;
}
if (trash_chan == NULLCHANLIST)
advise (LLOG_EXCEPTIONS, NULLCP,
"No debris channel specified.");
else {
cache_inc (&trash_chan -> cache,
TRASH_START_DELAY);
trash_chan -> chan_update = 1;
}
if (timeout_chan == NULLCHANLIST)
advise (LLOG_EXCEPTIONS, NULLCP,
"No timeout channel specified");
else {
cb = newcblk (cb_timer);
cb -> cb_proc = timeout_proc;
cb -> cb_reload = timeout_time;
(void) newevblk (NULLCHANLIST, cb, cb_timer,
TIMEOUT_START_DELAY);
}
}
#define LOAD_UPDATE_INTERVAL 15
#define L1_DECAY (0.9)
#define L2_DECAY (0.92)
static void do_load_avg ()
{
static time_t lastload;
if (lastload == 0)
lastload = current_time;
if (current_time - lastload < LOAD_UPDATE_INTERVAL) {
load_avg[0] = L1_DECAY * load_avg[0] +
(1.0 - L1_DECAY) * rqueue;
return;
}
while (lastload < current_time) {
load_avg[0] = L1_DECAY * load_avg[0] +
(1.0 - L1_DECAY) * rqueue;
load_avg[1] = L2_DECAY * load_avg[1] +
(1.0 - L2_DECAY) *
((double)noperations /
(double)LOAD_UPDATE_INTERVAL);
lastload += LOAD_UPDATE_INTERVAL;
}
noperations = 0;
PP_TRACE (("Load averages rq %g, ops %g",
load_avg[0], load_avg[1]));
lastload = current_time;
}
void schedule ()
{
struct eventqueue *ev;
static time_t lasttime;
time_t delta;
static int countdown = 50;
PP_TRACE (("schedule ()"));
if (opmode && nchansrunning == 0) {
if (opmode == OP_SHUTDOWN)
exit (0);
if (opmode == OP_RESTART)
restart ();
}
if (countdown -- < 0) {
countdown = 50;
sort_chans ();
}
if (lasttime == 0)
lasttime = current_time;
delta = current_time - lasttime;
review_channels (current_time);
do_load_avg ();
if (EHead -> ev_forw != EHead)
EHead -> ev_forw -> ev_time -= delta;
for (ev = EHead -> ev_forw; ev != EHead;) {
struct eventqueue *ev2 = ev -> ev_forw;
if (ev -> ev_time <= 0) {
do_event (ev);
freevblk (ev);
}
else
break;
ev = ev2;
}
review_channels (current_time);
if (EHead -> ev_forw == EHead )
delaytime = nextchantime;
else {
PP_DBG (("event head time = %d delta = %d",
EHead -> ev_forw -> ev_time, delta));
delaytime = max(0, EHead -> ev_forw -> ev_time);
}
if (nextchantime < delaytime)
delaytime = nextchantime;
PP_TRACE (("delaytime == %d", delaytime));
lasttime = current_time;
if (opmode && nchansrunning == 0) {
if (opmode == OP_SHUTDOWN)
exit (0);
if (opmode == OP_RESTART)
restart ();
}
}
static void restart ()
{
int n, i;
extern char *cmddfldir, *dupfpath ();
char *path;
PP_TRACE (("restart ()"));
while (CHead -> cb_forw != CHead)
freecblk (CHead -> cb_forw); /* shut down connecitons */
n = getdtablesize ();
i = (isatty(2) ? 3 : 0);
for (; i <= n; i++)
(void) close (i);
#ifdef notdef
for (i = 0; i < NSIG; i++)
(void) signal (i, SIG_DFL);
#endif
path = dupfpath (cmddfldir, myname);
execl (path, myname, NULLCP);
execl (myname, myname, NULLCP);
_exit(1);
}
void review_channels (now)
time_t now;
{
Chanlist **clpp;
time_t tdiff = 0;
nextchantime = MAX_SLEEP;
rqueue = nchansrunning;
for (clpp = chan_list; clpp < &chan_list[nchanlist]; clpp++) {
switch (runnable (*clpp, now)) {
case OK:
(void) newevblk (*clpp, NULLCB,
(*clpp)-> chan_special ?
cb_special : cb_channel,
(time_t)0);
/* fall */
case DONE:
rqueue ++;
break;
case NOTOK:
tdiff = (*clpp) -> nextevent - current_time;
if (tdiff > 0 && tdiff < nextchantime)
nextchantime = tdiff;
break;
}
}
}
static int runnable (clp, now)
Chanlist *clp;
time_t now;
{
if (opmode)
return NOTOK;
if (clp -> chan_update) {
investigate_chan (clp, now);
clp -> chan_update = 0;
}
if (clp -> chan_special == 0 && clp -> num_msgs <= 0
&& clp -> num_drs <= 0) /* special channel with things to do */
return NOTOK;
if (clp -> chan_enabled == 0) /* channel switched off */
return NOTOK;
if (clp -> nextevent > now ) /* not time yet */
return NOTOK;
if (nchansrunning >= maxchansrunning) /* too many running already */
return DONE;
if (clp -> nactive <= 0)
return OK; /* None running - start one */
if (clp -> chan -> ch_maxproc > 0 &&
clp -> nactive >= clp -> chan -> ch_maxproc)
return NOTOK;
/*
* So there is a channel already running:
* OK - should we start more - to do this we need the following
* o More mtas ready than chans currently running
* o We started the last channel more than a 15 secs ago
*
* o We aren't consuming all the channels allowed
* o More messages than channels * 5 OR last time we started was ages ago
*/
if (clp -> nmtas <= clp -> nactive
|| clp -> laststart + 15 > current_time)
return NOTOK;
/* ok - we ought to start a new channel - but should we? */
if (clp -> nactive >= max(1,maxchansrunning-1))
return DONE; /* Would like to - but out of resources */
if ((clp -> num_msgs + clp -> num_drs) > clp -> nactive * 5
|| clp -> laststart + 60*5 < current_time) {
char *mta = NULLCP;
if (nextmsg (clp, &mta, 0) == NULL)
return NOTOK; /* nope all locked */
PP_TRACE (("Can start a new chan on %s", mta));
free (mta);
return OK;
}
/* No - this channel is just not ready to start another one */
return NOTOK;
}
static void do_event (ev)
struct eventqueue *ev;
{
Connblk *cb;
PP_TRACE (("do_event ()"));
if ((cb = ev -> ev_conn) == NULLCB) {
ev -> ev_conn = cb = newcblk (ev -> ev_type);
cb -> cb_clp = ev -> ev_clp;
}
switch (cb -> cb_type) {
case cb_channel:
channel_event (ev);
break;
case cb_special:
special_event (ev);
break;
case cb_timer:
PP_TRACE (("Timeout event"));
if (cb -> cb_proc)
(*cb -> cb_proc)();
if (cb -> cb_reload)
newevblk (NULLCHANLIST, cb, cb_timer, cb -> cb_reload);
else freecblk (cb);
break;
default:
PP_LOG (LLOG_EXCEPTIONS, ("Unknown type %d", cb -> cb_type));
break;
}
}
static void channel_event (ev)
struct eventqueue *ev;
{
Connblk *cb = ev -> ev_conn;
Mlist *ml;
Chanlist *clp;
PP_TRACE (("channel_event ()"));
if ((clp = ev -> ev_clp) == NULLCHANLIST)
PP_TRACE (("NO chanlist of event queue"));
switch (cb -> cb_state) {
case cb_idle:
if (runnable (clp, current_time) != OK) {
freecblk (cb);
return;
}
if ((ml = nextmsg (cb -> cb_clp, &cb -> cb_mta, 1)) == NULL) {
clp -> chan_update = 1;
freecblk (cb);
return;
}
cb -> cb_ml = ml;
cb -> cb_clp -> lastattempt = current_time;
cb -> cb_clp -> laststart = current_time;
switch (start_async (cb, clp -> channame)) {
case NOTOK:
cache_inc (&clp -> cache, cache_time);
clp -> chan_update = 1;
cleanup_conn (cb);
break;
case DONE:
(void) newevblk (cb -> cb_clp, cb,
cb -> cb_type, (time_t)0);
/* and fall */
case OK:
clp -> nactive ++;
nchansrunning ++;
break;
}
break;
case cb_conn_established:
chan_invoke (cb, DIS_INIT, (caddr_t *) &clp -> channame);
break;
case cb_conn_request1:
case cb_conn_request2:
case cb_init_sent:
case cb_proc_sent:
case cb_close_sent:
PP_TRACE (("Shouldn't be in this state - %d",
cb -> cb_state));
break;
case cb_active:
if (cb -> cb_clp -> chan_enabled == 0 || opmode)
chan_invoke (cb, DIS_QUIT,
(caddr_t *)0);
else if (cb -> cb_ml == NULL &&
(cb -> cb_ml = nextmsg (cb -> cb_clp, &cb -> cb_mta, 1))
== NULL) {
if (cb -> cb_clp -> cache.cachetime < current_time)
/* stop it thrashing... */
cb -> cb_clp -> cache.cachetime =
current_time + 15;
chan_invoke (cb, DIS_QUIT, (caddr_t *)0);
}
else {
chan_invoke (cb, DIS_PROC, (caddr_t *)0);
}
break;
}
}
static void special_event (ev)
struct eventqueue *ev;
{
Connblk *cb = ev -> ev_conn;
Chanlist *clp = ev -> ev_clp;
PP_TRACE (("special_event (%s)", cb_print (cb)));
switch (cb -> cb_state) {
case cb_idle:
if (runnable (clp, current_time) != OK) {
freecblk (cb);
return;
}
clp -> lastattempt = current_time;
clp -> laststart = current_time;
switch (start_async (cb, clp -> channame)) {
case NOTOK:
PP_LOG (LLOG_EXCEPTIONS,
("Can't start special channel %s",
clp -> channame));
cache_inc (&clp -> cache, (time_t)60);
(void) newevblk (NULLCHANLIST, NULLCB, cb_timer,
(time_t) 60);
clp -> chan_update = 1;
freecblk (cb);
break;
case DONE:
(void) newevblk (clp, cb, cb -> cb_type,
(time_t)0);
case OK:
clp -> nactive ++;
nchansrunning ++;
break;
}
break;
case cb_active:
chan_invoke (cb, DIS_QUIT, (caddr_t *)0);
switch (clp -> chan -> ch_chan_type) {
case CH_QMGR_LOAD:
clp -> cache.cachetime = current_time + load_time;
break;
case CH_DEBRIS:
clp -> cache.cachetime = current_time + debris_time;
break;
}
clp -> chan_update = 1;
break;
case cb_conn_established:
switch (clp -> chan -> ch_chan_type) {
case CH_QMGR_LOAD:
PP_TRACE (("start loading"));
chan_invoke (cb, DIS_READQ, (caddr_t *)0);
break;
case CH_DEBRIS:
PP_TRACE (("start debris collection"));
chan_invoke (cb, DIS_INIT,
(caddr_t *)&clp -> channame);
break;
case CH_TIMEOUT:
break;
}
break;
default:
PP_LOG (LLOG_EXCEPTIONS, ("Bad state in special %d",
cb -> cb_state));
break;
}
}
int start_async (cb, title)
Connblk *cb;
char *title;
{
int sd;
int result;
PP_NOTICE (("Starting channel %s %s", title, cb_print(cb)));
switch(result = assoc_start (title, &sd)) {
case NOTOK:
advise (LLOG_EXCEPTIONS, NULLCP, "Can't start channel %s",
title);
return NOTOK;
#ifdef CONNECTING_1
case CONNECTING_1:
cb -> cb_state = cb_conn_request1;
cb -> cb_fd = sd;
FD_CLR (sd, &perf_rfds);
FD_SET (sd, &perf_wfds);
if (sd >= perf_nfds)
perf_nfds = sd + 1;
break;
case CONNECTING_2:
cb -> cb_state = cb_conn_request2;
cb -> cb_fd = sd;
FD_CLR (sd, &perf_wfds);
FD_SET (sd, &perf_rfds);
if (sd >= perf_nfds)
perf_nfds = sd + 1;
break;
#else
case OK:
cb -> cb_state = cb_conn_request1;
cb -> cb_fd = sd;
FD_CLR (sd, &perf_rfds);
FD_SET (sd, &perf_wfds);
if (sd >= perf_nfds)
perf_nfds = sd + 1;
break;
#endif
case DONE:
cb -> cb_state = cb_conn_established;
cb -> cb_fd = sd;
FD_CLR (sd, &perf_wfds);
FD_SET (sd, &perf_rfds);
if (sd >= perf_nfds)
perf_nfds = sd + 1;
break;
default:
PP_LOG (LLOG_EXCEPTIONS,
("Unknown return code from assoc_start"));
return NOTOK;
}
start_timer ();
cb -> cb_ttl = current_time + CHAN_TIMEOUT;
PP_DBG (("Association descriptor=%d", sd));
return result;
}
int chan_invoke (cb, type, arg)
Connblk *cb;
int type;
caddr_t *arg;
{
struct client_dispatch *ds;
int result;
noperations ++;
ds = &dis[type];
switch (type) {
case DIS_INIT:
case DIS_QUIT:
PP_TRACE (("%s %s", ds -> ds_name, cb_print (cb)));
break;
case DIS_READQ:
case DIS_PROC:
PP_NOTICE (("%s %s", ds -> ds_name, cb_print (cb)));
break;
}
switch (result = invoke (cb, table_Qmgr_Operations,
ds, arg)) {
case OK:
break;
case NOTOK:
cb -> cb_state = cb_active;
break;
case DONE:
cleanup_conn (cb);
break;
}
return result;
}
/* \f
*/
static int invoke (cb, ops, ds, args)
Connblk *cb;
struct RyOperation ops[];
register struct client_dispatch *ds;
char **args;
{
int result;
caddr_t in;
struct RoSAPindication rois;
register struct RoSAPindication *roi = &rois;
register struct RoSAPpreject *rop = &roi -> roi_preject;
int sd;
PP_TRACE (("invoke (%s)", cb_print (cb)));
sd = cb -> cb_fd;
in = NULL;
if (ds -> ds_argument &&
(result = (*ds -> ds_argument) (sd, ds, args, &in)) != OK)
return result;
switch (result = RyStub (sd, ops, ds -> ds_operation,
cb -> cb_id = RyGenID (sd),
NULLIP, in, ds -> ds_result, ds -> ds_error,
ROS_ASYNC, roi)) {
case NOTOK: /* failure */
ros_advise (rop, "STUB");
if (ROS_FATAL (rop -> rop_reason))
return DONE;
return NOTOK;
case OK: /* got a result/error response */
break;
case DONE: /* got RO-END? */
adios (NULLCP, "got RO-END.INDICATION");
return NOTOK;
default:
adios (NULLCP, "unknown return from RyStub=%d", result);
/* NOTREACHED */
}
if (ds -> ds_free && in)
(void) (*ds -> ds_free) (in);
return OK;
}
void chan_manage (fd)
int fd;
{
register Connblk *cb;
int sd;
if ((cb = findcblk (fd)) == NULLCB) {
advise (LLOG_EXCEPTIONS, NULLCP, "fd %d is not registered!", fd);
FD_CLR (fd, &perf_rfds);
FD_CLR (fd, &perf_wfds);
return;
}
PP_TRACE (("chan_manage (%s)", cb_print (cb) ));
switch (cb -> cb_state) {
case cb_conn_request1:
case cb_conn_request2:
PP_TRACE (("Awaiting async connection (state %d)",
cb -> cb_state == cb_conn_request1 ? 1 : 2));
switch (acsap_retry (sd = cb -> cb_fd)) {
case NOTOK:
PP_TRACE (("Connection error on %s", cb_print (cb)));
delay_channel (cb);
cb -> cb_state = cb_idle;
cleanup_conn (cb);
return;
#ifdef CONNECTING_1
case CONNECTING_1:
cb -> cb_state = cb_conn_request1;
PP_TRACE (("State 1 on %s", cb_print (cb)));
FD_CLR (sd, &perf_rfds);
FD_SET (sd, &perf_wfds);
if (sd >= perf_nfds)
perf_nfds = sd + 1;
break;
case CONNECTING_2:
cb -> cb_state = cb_conn_request2;
PP_TRACE (("State 2 on %s", cb_print (cb)));
FD_CLR (sd, &perf_wfds);
FD_SET (sd, &perf_rfds);
if (sd >= perf_nfds)
perf_nfds = sd + 1;
break;
#else
case OK:
cb -> cb_state = cb_conn_request2;
PP_TRACE (("Switching to state 2 on %s",
cb_print(cb)));
FD_CLR (sd, &perf_wfds);
FD_SET (sd, &perf_rfds);
if (sd >= perf_nfds)
perf_nfds = sd + 1;
break;
#endif
case DONE:
cb -> cb_state = cb_conn_established;
PP_TRACE (("connection now established on %s",
cb_print (cb)));
FD_CLR (sd, &perf_wfds);
FD_SET (sd, &perf_rfds);
if (sd >= perf_nfds)
perf_nfds = sd + 1;
(void) newevblk (cb -> cb_clp, cb,
cb -> cb_type, (time_t)0);
break;
}
break;
case cb_idle:
freecblk (cb);
break;
case cb_proc_sent:
PP_TRACE (("Awaiting proc response ..."));
getresponse (cb, OK);
break;
case cb_init_sent:
PP_TRACE (("Awaiting init response"));
getresponse (cb, OK);
break;
default:
PP_TRACE (("Funny state!"));
break;
}
}
static void getresponse (cb, type)
Connblk *cb;
int type;
{
caddr_t out;
struct RoSAPindication rois;
register struct RoSAPindication *roi = &rois;
register struct RoSAPpreject *rop = &roi -> roi_preject;
PP_TRACE (("getresponse (%s)", cb_print (cb)));
switch (RyWait (cb -> cb_fd, &cb -> cb_id, &out, type, roi)) {
case NOTOK:
if (rop -> rop_reason == ROS_TIMER)
break;
PP_TRACE (("RyWait returned NOTOK"));
ros_advise (rop, "STUB");
if (ROS_FATAL (rop -> rop_reason)) {
delay_channel (cb);
cleanup_conn (cb);
}
break;
case OK:
PP_TRACE (("RyWait returned OK"));
cb -> cb_state = cb_active;
(void) newevblk (cb -> cb_clp, cb,
cb -> cb_type, (time_t)0);
break;
case DONE:
cb -> cb_ml = NULL;
cleanup_conn (cb);
break;
}
}
#ifdef notdef
static void setfds ()
{
register Connblk *cb;
struct PSAPindication pi;
PP_TRACE (("setfds ()"));
FD_ZERO (&perf_wfds);
FD_ZERO (&perf_rfds);
perf_nfds = 0;
for (cb = CHead -> cb_forw; cb != CHead; cb = cb -> cb_forw)
switch (cb -> cb_state) {
case cb_conn_request1:
if (PSelectMask (cb -> cb_fd, &perf_wfds,
&perf_nfds, &pi) == NOTOK)
PP_LOG (LLOG_EXCEPTIONS,
("PSelectMask failed"));
break;
case cb_conn_request2:
case cb_proc_sent:
case cb_init_sent:
case cb_conn_established:
if (PSelectMask (cb -> cb_fd, &perf_rfds,
&perf_nfds, &pi) == NOTOK)
PP_LOG (LLOG_EXCEPTIONS,
("PSelectMask failed"));
break;
default:
break;
}
PP_TRACE (("nrfds=%d rfds=0x%x wfds=0x%x", perf_nfds,
perf_rfds.fds_bits[0], perf_wfds.fds_bits[0]));
}
#endif
/* ARGSUSED */
static int do_quit (sd, ds, args, arg)
int sd;
struct client_dispatch *ds;
char **args;
caddr_t *arg;
{
Connblk *cb;
PP_TRACE (("do_quit(%d)", sd));
assoc_release (sd);
if (sd != NOTOK) {
FD_CLR (sd, &perf_wfds);
FD_CLR (sd, &perf_rfds);
}
if ((cb = findcblk (sd)) != NULLCB)
cb -> cb_fd = NOTOK;
else
return DONE;
PP_TRACE (("do_quit (%s)", cb_print (cb) ));
if (cb -> cb_type == cb_channel)
cb -> cb_clp -> chan_update = 1;
PP_NOTICE (("Closing channel %s", cb_print (cb)));
return DONE;
}
/* connection block functions */
Connblk *newcblk (type)
enum cb_type type;
{
Connblk *cb;
PP_TRACE (("newcblk () cnt=%d", cb_count));
cb = (Connblk *) calloc (1, sizeof *cb);
if (cb == NULLCB)
return cb;
cb -> cb_fd = NOTOK;
cb -> cb_type = type;
if (cb_once_only == 0) {
CHead -> cb_forw = CHead -> cb_back = CHead;
cb_once_only ++;
}
insque (cb, CHead -> cb_back);
cb_count ++;
return cb;
}
void freecblk (cb)
Connblk *cb;
{
struct AcSAPindication acis;
struct RoSAPindication rois;
register struct RoSAPindication *roi = &rois;
if (cb == NULLCB)
return;
PP_TRACE (("freecblk (%s) cnt=%d",
cb_print(cb), cb_count));
if (cb -> cb_fd != NOTOK) {
PP_NOTICE (("Killing channel %s",
cb_print (cb)));
(void) AcUAbortRequest (cb -> cb_fd, NULLPEP, 0, &acis);
(void) RyLose (cb -> cb_fd, roi);
FD_CLR (cb -> cb_fd, &perf_wfds);
FD_CLR (cb -> cb_fd, &perf_rfds);
}
remque (cb);
cb_count --;
free ((char *) cb);
}
Connblk *findcblk (fd)
int fd;
{
Connblk *cb;
PP_TRACE (("findcblk (%d) cnt=%d", fd, cb_count));
if (cb_once_only == 0)
return NULLCB;
for (cb = CHead -> cb_forw; cb != CHead; cb = cb -> cb_forw)
if (cb -> cb_fd == fd)
return cb;
PP_TRACE (("Can't locate block with %d (%d alloc'd)",
fd, cb_count));
return NULLCB;
}
static char *cb2name (cb)
Connblk *cb;
{
switch (cb -> cb_type) {
case cb_channel:
case cb_special:
if (cb -> cb_clp && cb -> cb_clp -> channame)
return cb -> cb_clp -> channame;
else return "unknown channel";
case cb_timer:
return "timer";
case cb_responder:
return "responder";
default:
break;
}
return "something";
}
static char *cb_print (cb)
Connblk *cb;
{
static char buf[128];
switch (cb -> cb_type) {
case cb_channel:
case cb_special:
(void) sprintf (buf, "<fd=%d chan=%s mta=%s msg=%s>",
cb -> cb_fd,
cb2name(cb),
cb -> cb_mta ? cb -> cb_mta : "none",
(cb -> cb_ml && cb -> cb_ml -> ms ) ?
cb -> cb_ml -> ms -> queid : "none");
break;
case cb_timer:
(void) sprintf (buf, "<timer proc=0x%x reload=%d>",
cb -> cb_proc, cb -> cb_reload);
break;
case cb_responder:
(void) sprintf (buf, "<fd=%d responder auth=%d>",
cb -> cb_fd, cb -> cb_authenticated);
break;
}
return buf;
}
static struct eventqueue *newevblk (clp, cb, type, when)
Chanlist *clp;
Connblk *cb;
time_t when;
enum cb_type type;
{
struct eventqueue *ev, *pev;
PP_TRACE (("newevblk (clp, cb, %d, %d) cnt=%d",
type, when, ev_count));
ev = (struct eventqueue *) calloc (1, sizeof *ev);
if (ev == NULLEV)
return ev;
ev_count ++;
ev -> ev_clp = clp;
ev -> ev_conn = cb;
ev -> ev_type = type;
for (pev = EHead -> ev_forw; pev != EHead; pev = pev -> ev_forw) {
if (pev -> ev_time > when) {
insque (ev, pev -> ev_back);
break;
}
else
when -= pev -> ev_time;
}
if (pev == EHead)
insque (ev, EHead -> ev_back);
for (; pev != EHead; pev = pev -> ev_forw)
pev -> ev_time -= when;
ev -> ev_time = when;
return ev;
}
static void freevblk (ev)
struct eventqueue *ev;
{
PP_TRACE (("freevblk () cnt=%d", ev_count));
if (ev == NULLEV)
return;
if (ev -> ev_forw != EHead)
ev -> ev_forw -> ev_time += ev -> ev_time;
remque (ev);
free ((char *) ev);
ev_count --;
}
#ifdef notdef
static time_t evtimebyclp (clp)
Chanlist *clp;
{
struct eventqueue *ev;
time_t now = current_time;
for (ev = EHead -> ev_forw; ev != EHead; ev = ev -> ev_forw)
if (ev -> ev_clp == clp)
return now + ev -> ev_time;
else now += ev -> ev_time;
return now + MAX_SLEEP + 2;
}
static struct eventqueue *findevbyclp (clp)
Chanlist *clp;
{
struct eventqueue *ev;
for (ev = EHead -> ev_forw; ev != EHead; ev = ev -> ev_forw)
if (ev -> ev_clp == clp)
return ev;
return NULLEV;
}
#endif
static void init_events ()
{
PP_TRACE (("init_events ()"));
EHead -> ev_forw = EHead -> ev_back = EHead;
}
#define DECAY_FACTOR 0.7 /* decay filter - set fairly high */
void inc_channel (cb, number) /* successful delivery */
Connblk *cb;
int number;
{
MsgStruct *ms;
Reciplist *rlp;
Mtalist *mlp;
LIST_RCHAN *clp;
int i;
double f;
PP_TRACE (("inc_channel (%s, %d)",
cb_print (cb), number));
cb -> cb_clp -> lastsuccess = current_time;
ms = cb -> cb_ml -> ms;
i = ms -> size / 1000;
i = max (i, 1);
f = (double)(current_time - cb -> cb_clp -> lastattempt) /
(double)i;
cb -> cb_clp -> averaget =
(DECAY_FACTOR * cb -> cb_clp -> averaget) +
(1 - DECAY_FACTOR) * f;
PP_TRACE (("average = %g, this value = %g",
cb -> cb_clp -> averaget, f));
clear_msgcache (cb -> cb_ml);
delfromchan (cb -> cb_clp, cb -> cb_mta, cb -> cb_ml, number);
cache_clear (&cb -> cb_clp -> cache);
if (mlp = findmtalist (cb -> cb_clp, cb -> cb_mta)) {
cache_clear (&mlp -> cache);
mlp -> lastsuccess = current_time;
}
if (ms == NULL) {
advise (LLOG_EXCEPTIONS, NULLCP, "Can't locate message!");
return;
}
for (rlp = ms -> recips; rlp; rlp = rlp -> rp_next)
if (rlp -> id == number)
break;
if (rlp == NULL) {
advise (LLOG_EXCEPTIONS, NULLCP, "No recipient %d in list!",
number);
return;
}
switch (rlp -> status) {
case st_normal:
i = ++rlp -> chans_done;
for (clp = rlp -> chans; clp && i > 0;
clp = clp -> li_next, i--)
continue;
if (clp) {
insertinchan (clp -> li_chan, ms, rlp,
chan2mta (clp -> li_chan, rlp));
return;
}
/* else fall through */
case st_dr:
rlp -> status = st_delete;
if (ms -> count == 0)
insertindelchan (ms);
break;
case st_delete:
cb -> cb_ml = NULL;
maybezapmsg (ms);
break;
}
}
void delay_host (cb, rno, str)
Connblk *cb;
int rno;
char *str;
{
struct Mtalist *mlp, *findmtalist ();
PP_TRACE (("delay_host (%s, %d)", cb_print (cb), rno));
if (cb -> cb_ml)
msg_unlock (cb -> cb_ml -> ms);
if ((mlp = findmtalist (cb -> cb_clp, cb -> cb_mta)) == NULL) {
advise (LLOG_EXCEPTIONS, NULLCP, "Can't locate mta in queue");
return;
}
if (mlp -> info) {
free (mlp -> info);
mlp -> info = NULLCP;
}
if (str)
mlp -> info = strdup (str);
cache_inc (&mlp -> cache, cache_time*3/2);
mlp -> mta_changed = 1;
if (cb -> cb_clp)
cb -> cb_clp -> chan_update = 1;
}
void delay_message (cb, str)
Connblk *cb;
char *str;
{
Mtalist *mlp;
PP_TRACE (("delay_message (%s)", cb_print (cb)));
if (cb -> cb_ml == NULL)
return;
if ((mlp = findmtalist (cb -> cb_clp, cb -> cb_mta)) == NULL) {
advise (LLOG_EXCEPTIONS, NULLCP, "Can't locate mta in queue");
return;
}
if (cb -> cb_ml -> info) {
free (cb -> cb_ml -> info);
cb -> cb_ml -> info = NULLCP;
}
if (str)
cb -> cb_ml -> info = strdup (str);
cb -> cb_ml -> ms -> nerrors ++;
msg_unlock (cb -> cb_ml -> ms);
msgcache_inc (cb -> cb_ml, cache_time*2);
mlp -> mta_changed = 1;
cb -> cb_clp -> chan_update = 1;
}
static void bad_recip (cb, rno)
Connblk *cb;
int rno;
{
Reciplist *rlp;
Mtalist *mlp;
PP_TRACE (("bad_recip (%s, %d)", cb_print (cb), rno));
cb -> cb_clp -> lastsuccess = current_time;
if (mlp = findmtalist (cb -> cb_clp, cb -> cb_mta))
mlp -> lastsuccess = current_time;
delfromchan (cb -> cb_clp, cb -> cb_mta, cb -> cb_ml, rno);
for (rlp = cb -> cb_ml -> ms -> recips; rlp; rlp = rlp -> rp_next)
if (rlp -> id == rno)
break;
if (rlp == NULLRL)
return;
rlp -> chans_done = 0;
insertindrchan (cb -> cb_ml -> ms, rlp);
}
static void sharedDR (cb, rno)
Connblk *cb;
int rno;
{
Mtalist *mlp;
PP_TRACE (("sharedDR (%s, %d)", cb_print (cb), rno));
cb -> cb_clp -> lastsuccess = current_time;
if (mlp = findmtalist (cb -> cb_clp, cb -> cb_mta))
mlp -> lastsuccess = current_time;
delfromchan (cb -> cb_clp, cb -> cb_mta, cb -> cb_ml, rno);
}
/* ARGSUSED */
static int do_processmessage (sd, ds, args, arg)
int sd;
struct client_dispatch *ds;
char **args;
struct type_Qmgr_ProcMsg **arg;
{
Connblk *cb;
Mlist *ml;
Mtalist *mlp;
register struct type_Qmgr_ProcMsg *pm;
struct type_Qmgr_UserList *lusers ();
char *p;
PP_TRACE (("do_processmessage (%d)", sd));
if ((cb = findcblk (sd)) == NULLCB)
return NOTOK;
if (cb -> cb_state != cb_active)
return NOTOK;
ml = cb -> cb_ml;
if (ml == NULL) {
PP_LOG (LLOG_EXCEPTIONS, ("Missing ml structure"));
return NOTOK;
}
*arg = pm = (struct type_Qmgr_ProcMsg *) malloc (sizeof (**arg));
pm -> channel = str2qb (cb -> cb_clp -> channame,
strlen (cb -> cb_clp -> channame), 1);
p = ml -> ms -> queid;
pm -> qid = str2qb (p, strlen (p), 1);
pm -> users = lusers (ml, 0);
if (pm -> users == NULL)
PP_LOG (LLOG_EXCEPTIONS, ("Empty user list!"));
cb -> cb_ttl = current_time + CHAN_TIMEOUT + (ml -> ms -> size /20);
cb -> cb_state = cb_proc_sent;
if (mlp = findmtalist (cb -> cb_clp, cb -> cb_mta))
mlp -> lastattempt = current_time;
return OK;
}
/* ARGSUSED */
static int do_readqueue (sd, ds, args, arg)
int sd;
struct client_dispatch *ds;
char **args;
struct type_Qmgr_ReadQueue__Pseudo **arg;
{
Connblk *cb;
PP_TRACE (("do_readqueue (%d)", sd));
if ((cb = findcblk (sd)) == NULLCB)
return NOTOK;
cb -> cb_state = cb_proc_sent;
return OK;
}
/* ARGSUSED */
static int do_initchannel (sd, ds, args, arg)
int sd;
struct client_dispatch *ds;
char **args;
struct type_Qmgr_Channel **arg;
{
Connblk *cb;
if ((cb = findcblk (sd)) == NULLCB)
return NOTOK;
PP_TRACE (("do_initchannel (%s)", cb_print (cb) ));
*arg = str2qb (*args, strlen (*args), 1);
cb -> cb_state = cb_init_sent;
cb -> cb_ttl = current_time + CHAN_TIMEOUT;
return OK;
}
/* ARGSUSED */
static int processmessage_result (sd, id, dummy, result, roi)
int sd,
id,
dummy;
register struct type_Qmgr_DeliveryStatus *result;
struct RoSAPindication *roi;
{
Connblk *cb;
struct type_Qmgr_DeliveryStatus *ds;
int rno;
char buf[LINESIZE];
char *info;
PP_TRACE (("processmessage_result (%d)", sd));
if ((cb = findcblk (sd)) == NULLCB) {
advise (LLOG_EXCEPTIONS, NULLCP, "No connection block for %d",
sd);
return NOTOK;
}
PP_TRACE (("processmessage_result (%s)", cb_print (cb)));
for (ds = result; ds; ds = ds -> next) {
rno = ds -> IndividualDeliveryStatus -> recipient -> parm;
(void) sprintf (buf, "Recipient ID %d %s:", rno,
cb_print (cb));
if (ds -> IndividualDeliveryStatus -> info)
info = qb2str(ds -> IndividualDeliveryStatus -> info);
else info = NULLCP;
switch (ds -> IndividualDeliveryStatus -> status) {
case int_Qmgr_status_success:
PP_NOTICE (("%s success", buf));
inc_channel (cb, rno);
break;
case int_Qmgr_status_negativeDR:
case int_Qmgr_status_positiveDR:
PP_NOTICE (("%s %s", buf,
ds -> IndividualDeliveryStatus -> status
== int_Qmgr_status_positiveDR ?
"positiveDR" : "negativeDR"));
bad_recip (cb, rno);
break;
case int_Qmgr_status_successSharedDR:
case int_Qmgr_status_failureSharedDR:
PP_NOTICE (("%s %s", buf,
ds -> IndividualDeliveryStatus -> status
== int_Qmgr_status_failureSharedDR ?
"failureSharedDR" : "sucessSharedDR"));
sharedDR(cb, rno);
break;
case int_Qmgr_status_messageFailure:
PP_NOTICE (("%s messageFailure (%s)", buf,
info ? info : ""));
delay_message (cb, info);
break;
case int_Qmgr_status_mtaFailure:
PP_NOTICE (("%s mtaFailure (%s)", buf,
info ? info : ""));
delay_host (cb, rno, info);
break;
case int_Qmgr_status_mtaAndMessageFailure:
PP_NOTICE (("%s mtaAndMessageFailure (%s)", buf,
info ? info : ""));
delay_message (cb, info);
delay_host (cb, rno, info);
break;
default:
PP_NOTICE (("%s Unknown response", buf));
break;
}
if (info)
free (info);
}
cb -> cb_state = cb_active;
cb -> cb_ml = NULL;
return OK;
}
/* \f
RPC procedures... */
/* ARGSUSED */
static int readqueue_result (sd, id, dummy, result, roi)
int sd,
id,
dummy;
register struct type_Qmgr_MsgList *result;
struct RoSAPindication *roi;
{
struct type_Qmgr_MsgStructList *ml;
MsgStruct *ms, *oldms;
char *p;
struct type_Qmgr_QidList *ql;
PP_TRACE (("readqueue_result (%d, %d)", sd, id));
for (ml = result -> msgs; ml; ml = ml -> next) {
ms = newmsgstruct (ml -> MsgStruct);
if (oldms = find_msg (ms -> queid)) {
(void) updatemessage (oldms, ms);
freems (ms);
}
else
(void) insertmessage (ms);
}
for (ql = result -> deleted; ql; ql = ql -> next) {
p = qb2str (ql -> QID);
if (ms = find_msg (p))
kill_msg (ms);
free (p);
}
return OK;
}
/* ARGSUSED */
static int channelinit_result (sd, id, dummy, result, roi)
int sd,
id,
dummy;
struct type_Qmgr_Pseudo__channelInitialise *result;
struct RoSAPindication *roi;
{
PP_TRACE (("channelinit_result (%d, %d)", sd, id));
return OK;
}
/* ARGSUSED */
static int general_error (sd, id, error, parameter, roi)
int sd,
id,
error;
caddr_t parameter;
struct RoSAPindication *roi;
{
register struct RyError *rye;
Connblk *cb;
if ((cb = findcblk (sd)) == NULLCB)
return NOTOK;
PP_TRACE (("general_error (%s)", cb_print(cb)));
if (cb -> cb_type != cb_channel) {
PP_LOG (LLOG_EXCEPTIONS, ("Not a channel!!!"));
return NOTOK;
}
if (error == RY_REJECT) {
advise (LLOG_EXCEPTIONS, NULLCP, "%s",
RoErrString ((int) parameter));
delay_channel (cb);
return NOTOK;
}
if (rye = finderrbyerr (table_Qmgr_Errors, error))
advise (LLOG_EXCEPTIONS, NULLCP, "%s", rye -> rye_name);
else
advise (LLOG_EXCEPTIONS, NULLCP, "Error %d", error);
delay_channel (cb);
return NOTOK;
}
/* ARGSUSED */
static int processmessage_error (sd, id, error, parameter, roi)
int sd,
id,
error;
caddr_t parameter;
struct RoSAPindication *roi;
{
register struct RyError *rye;
Connblk *cb;
char tbuf[128];
if ((cb = findcblk (sd)) == NULLCB)
return NOTOK;
PP_TRACE (("processmessage_error (%s)", cb_print(cb)));
if (cb -> cb_type != cb_channel) {
PP_LOG (LLOG_EXCEPTIONS, ("Not a channel!!!"));
return NOTOK;
}
if (error == RY_REJECT) {
advise (LLOG_EXCEPTIONS, NULLCP, "%s",
RoErrString ((int) parameter));
delay_message (cb, RoErrString ((int) parameter));
return OK;
}
if (rye = finderrbyerr (table_Qmgr_Errors, error))
(void) sprintf (tbuf, "process message error %s",
rye -> rye_name);
else
(void) sprintf (tbuf, "process message error %d", error);
advise (LLOG_EXCEPTIONS, NULLCP, "%s", tbuf);
delay_message (cb, tbuf);
return OK;
}
void delay_channel (cb)
Connblk *cb;
{
PP_TRACE (("delay_channel (%s)", cb_print (cb) ));
if (cb -> cb_type == cb_responder || cb -> cb_type == cb_timer)
return;
if (cb -> cb_clp == NULL)
return;
cache_inc (&(cb -> cb_clp -> cache), cache_time);
cb -> cb_clp -> chan_update = 1;
}
void investigate_chan (clp, now)
Chanlist *clp;
time_t now;
{
Mtalist *mlp;
PP_TRACE (("investigate_chan (%s)", clp -> channame));
clp -> nextevent = now + MAX_SLEEP;
clp -> oldest = now;
clp -> nmtas = 0;
for (mlp = clp -> mtas -> mta_forw; mlp != clp -> mtas;
mlp = mlp -> mta_forw) {
if (mlp -> mta_changed || mlp -> nextevent < current_time) {
investigate_mta (mlp, now);
mlp -> mta_changed = 0;
}
if (mlp -> oldest < clp -> oldest)
clp -> oldest = mlp -> oldest;
if (!mtaready (mlp))
continue;
clp -> nmtas ++;
if (mlp -> nextevent < clp -> nextevent)
clp -> nextevent = mlp -> nextevent;
}
if (clp -> chan_special)
clp -> nextevent = now;
if (clp -> cache.cachetime > now)
clp -> nextevent = clp -> cache.cachetime;
if (clp -> chan_enabled == 0)
clp -> nextevent = now + MAX_SLEEP;
}
void investigate_mta (mlp, now)
Mtalist *mlp;
time_t now;
{
Mlist *ml;
time_t cachet;
PP_TRACE (("investigate_mta (%s)", mlp -> mtaname));
mlp -> nextevent = now + MAX_SLEEP + 1;
mlp -> oldest = now;
for (ml = mlp -> msgs -> ml_forw; ml != mlp -> msgs;
ml = ml -> ml_forw) {
/* this calc has to be done - or we could optimise */
if (ml -> ms -> age < mlp -> oldest)
mlp -> oldest = ml -> ms -> age;
if (!msgready (ml))
continue;
if ((cachet = msgmincache (ml)) > now) {
if (cachet < mlp -> nextevent)
mlp -> nextevent = cachet;
}
else if (ml -> ms -> defferedtime &&
ml -> ms -> defferedtime < mlp -> nextevent)
mlp -> nextevent = ml -> ms -> defferedtime;
else mlp -> nextevent = now;
}
if (mlp -> cache.cachetime > now) /* oops - cached */
mlp -> nextevent = mlp -> cache.cachetime;
if (mlp -> mta_enabled == 0)
mlp -> nextevent = now + MAX_SLEEP + 1;
}
void cleanup_conn (cb)
Connblk *cb;
{
PP_TRACE (("cleanup_conn(%s)", cb_print (cb)));
switch (cb -> cb_type) {
case cb_responder:
case cb_timer:
break;
default:
if (cb -> cb_clp) {
Mtalist *mlp;
if (cb -> cb_mta &&
(mlp = findmtalist(cb ->cb_clp, cb -> cb_mta))) {
mlp -> nactive --;
if (cb -> cb_state == cb_idle ||
cb -> cb_state == cb_proc_sent)
delay_message (cb, "Connection broke");
}
cb -> cb_clp -> nactive --;
nchansrunning --;
}
if (cb -> cb_ml)
msg_unlock (cb -> cb_ml -> ms);
break;
}
freecblk (cb);
}
static int timeout_proc ()
{
MsgStruct **msp, *ms;
PP_TRACE (("timeout_proc"));
if (timeout_chan -> chan_enabled == 0)
return;
for (msp = msg_hash; msp < &msg_hash[HASHSIZE]; ) {
for (ms = *msp; ms; ms = ms -> ms_forw) {
if (ms -> m_locked)
continue;
if (ms -> expirytime < current_time &&
expiremsg (ms) == DONE)
break;
}
if (!ms)
msp ++;
}
}
static int expiremsg (ms)
MsgStruct *ms;
{
int first = 1;
Reciplist *rlp;
int i;
Chanlist *clp;
Mtalist *mlp;
Mlist *ml;
LIST_RCHAN *lrp;
PP_TRACE (("expiremsg (%s)", ms -> queid));
for (rlp = ms -> recips; rlp; rlp = rlp -> rp_next) {
if (rlp -> id == 0)
continue;
switch (rlp -> status) {
case st_dr:
case st_delete:
case st_timeout:
continue; /* leave these alone! */
case st_normal:
i = rlp -> chans_done;
for (lrp = rlp -> chans; lrp && i > 0;
lrp = lrp -> li_next, i--)
continue;
if (lrp == NULL)
continue;
if ((clp = findchanlist (lrp -> li_chan))
== NULLCHANLIST)
continue;
mlp = findmtalist (clp, rlp -> mta);
if (mlp == NULLMTALIST)
continue;
ml = findmtamsg (mlp, ms -> queid);
if (ml == NULLMLIST)
continue;
delfromchan (clp, mlp -> mtaname, ml, rlp -> id);
rlp -> status = st_timeout;
if (first) {
insertinchan (timeout_chan -> chan, ms,
rlp, timeout_chan -> channame);
first = 0;
}
}
}
return first == 1 ? OK : DONE;
}
static int channel_ttl ()
{
struct AcSAPindication acis;
struct RoSAPindication rois;
register struct RoSAPindication *roi = &rois;
Connblk *cb;
PP_TRACE (("channel_ttl ()"));
if (cb_once_only == 0)
return OK;
for (cb = CHead -> cb_forw; cb != CHead; cb = cb -> cb_forw) {
if (cb -> cb_type == cb_channel && cb -> cb_ttl &&
cb -> cb_ttl < current_time) {
PP_NOTICE (("Channel %s taking too long - aborting",
cb_print (cb)));
(void) AcUAbortRequest (cb -> cb_fd, NULLPEP,
0, &acis);
(void) RyLose (cb -> cb_fd, roi);
chan_lose (cb -> cb_fd, (struct AcSAPfinish *)0);
if (cb -> cb_fd != NOTOK) {
FD_CLR (cb -> cb_fd, &perf_rfds);
FD_CLR (cb -> cb_fd, &perf_wfds);
}
}
}
timer_running = 0;
if (nchansrunning > 0)
start_timer ();
return OK;
}
static void start_timer ()
{
Connblk *cb;
if (timer_running)
return;
cb = newcblk (cb_timer);
cb -> cb_proc = channel_ttl;
cb -> cb_reload = 0;
(void) newevblk (NULLCHANLIST, cb, cb_timer, CHAN_TIMEOUT);
timer_running = 1;
}