DataMuseum.dk

Presents historical artifacts from the history of:

DKUUG/EUUG Conference tapes

This is an automatic "excavation" of a thematic subset of
artifacts from Datamuseum.dk's BitArchive.

See our Wiki for more about DKUUG/EUUG Conference tapes

Excavated with: AutoArchaeologist - Free & Open Source Software.


top - metrics - download
Index: T c

⟦1205719f4⟧ TextFile

    Length: 39839 (0x9b9f)
    Types: TextFile
    Names: »chans.c«

Derivation

└─⟦2d1937cfd⟧ Bits:30007241 EUUGD22: P.P 5.0
    └─⟦dc59850a2⟧ »EurOpenD22/pp5.0/pp-5.tar.Z« 
        └─⟦e5a54fb17⟧ 
            └─⟦this⟧ »pp-5.0/Src/qmgr/chans.c« 

TextFile

/* chans.c: channel specific stuff for the qmgr */

# ifndef lint
static char Rcsid[] = "@(#)$Header: /cs/research/pp/hubris/pp-beta/Src/qmgr/RCS/chans.c,v 5.0 90/09/20 16:21:02 pp Exp Locker: pp $";
# endif

/*
 * $Header: /cs/research/pp/hubris/pp-beta/Src/qmgr/RCS/chans.c,v 5.0 90/09/20 16:21:02 pp Exp Locker: pp $
 *
 * $Log:	chans.c,v $
 * Revision 5.0  90/09/20  16:21:02  pp
 * rcsforce : 5.0 public release
 * 
 */



#include <isode/rosy.h>
#include <isode/logger.h>
#include "types.h"
#include "ryinitiator.h"
#include "Qmgr-ops.h"
#include "util.h"

extern struct Mtalist *findmtalist ();
extern MsgStruct *newmsgstruct ();
extern MsgStruct *find_msg ();
extern Chanlist *findchanlist ();
extern Mlist	*findmtamsg ();


static int	cb_once_only = 0;
static Connblk cbqueue;
static Connblk *CHead = &cbqueue;
static char	*cb2name ();
static char	*cb_print ();
static int	cb_count = 0;

struct eventqueue {
	struct eventqueue *ev_forw;
	struct eventqueue *ev_back;

	time_t	ev_time;
	Connblk *ev_conn;
	enum cb_type ev_type;
	Chanlist *ev_clp;
};

static struct eventqueue evqueue;
static struct eventqueue *EHead = &evqueue;
static int ev_count = 0;
static struct eventqueue *newevblk ();
static void freevblk (), init_events ();
static void do_event ();
static void channel_event ();
static void special_event ();
static void restart ();
#define NULLEV	((struct eventqueue *)0)

static int runnable ();
static int invoke ();
static int timeout_proc ();
static int channel_ttl ();
static int expiremsg ();
/* static void setfds (); */
static void getresponse ();
static void start_timer ();

extern Mlist *nextmsg ();

extern fd_set perf_rfds, perf_wfds;
extern int perf_nfds;
extern int delaytime;

void	review_channels ();
void	investigate_chan ();
void	investigate_mta ();
void	delay_channel ();
void	cleanup_conn ();

int nchansrunning = 0;
static int timer_running = 0;

static int	do_processmessage (), processmessage_result (),
	processmessage_error (), do_quit ();
static int	do_initchannel (), channelinit_result ();
static int	do_readqueue (), readqueue_result ();
static int	general_error ();
static time_t	nextchantime;
static int	rqueue;
static int 	noperations;
double	load_avg[2];

static struct client_dispatch dis[] = {
#define	DIS_PROC	0
{
	"processmessage", operation_Qmgr_processmessage,
	do_processmessage, free_Qmgr_processmessage_argument,
	processmessage_result, processmessage_error,
	"Process some messages"
},
#define DIS_INIT	1
{
	"channelInitialise", operation_Qmgr_channelInitialise,
	do_initchannel, free_Qmgr_channelInitialise_argument,
	channelinit_result, general_error,
	"Initialise a channel"
},
#define DIS_QUIT	2
{
	"quit", 0, do_quit, NULLIFP, NULLIFP, NULLIFP,
	"terminate the association"
},
#define DIS_READQ	3
{
	"readqueue", operation_Qmgr_readqueue,
	do_readqueue, free_Qmgr_readqueue_argument,
	readqueue_result, general_error,
	"Load up the queue"
},
	NULL
};

void init_chans ()
{
	extern int ch_maxchans;
	extern void create_chan ();
	int	i;

	PP_TRACE (("init_chans ()"));
	for (i = 0; i < ch_maxchans && ch_all[i]; i++)
		create_chan (ch_all[i]);
	sort_chans ();
	init_events ();
}

/* ARGSUSED */
int chan_lose (fd, acf)
int	fd;
struct AcSAPfinish *acf;
{
	Connblk *cb;

	PP_TRACE (("chan_lose (%d)", fd));
	if ((cb = findcblk (fd)) == NULLCB)
		return ACS_ACCEPT;
	/* restore the state */
	delay_channel (cb);
	if (fd != NOTOK) {
		FD_CLR (fd, &perf_rfds);
		FD_CLR (fd, &perf_wfds);
	}
	cb -> cb_fd = NOTOK;
	cleanup_conn (cb);

	return ACS_ACCEPT;
}

void start_specials ()
{
	Connblk *cb;

	PP_TRACE (("start_specials ()"));

	if (loader_chan == NULLCHANLIST)
		advise (LLOG_EXCEPTIONS, NULLCP,
			"NO loading channel specified");
	else {
		loader_chan -> chan_update = 1;
	}

	if (trash_chan == NULLCHANLIST)
		advise (LLOG_EXCEPTIONS, NULLCP,
			"No debris channel specified.");
	else {
		cache_inc (&trash_chan -> cache,
			   TRASH_START_DELAY);
		trash_chan -> chan_update = 1;
	}
	if (timeout_chan == NULLCHANLIST)
		advise (LLOG_EXCEPTIONS, NULLCP,
			"No timeout channel specified");
	else {
		cb = newcblk (cb_timer);
		cb -> cb_proc = timeout_proc;
		cb -> cb_reload = timeout_time;
		(void) newevblk (NULLCHANLIST, cb, cb_timer,
				 TIMEOUT_START_DELAY);
	}
}	

#define LOAD_UPDATE_INTERVAL	15
#define L1_DECAY	(0.9)
#define L2_DECAY	(0.92)

static void do_load_avg ()
{
	static time_t lastload;

	if (lastload == 0)
		lastload = current_time;
	if (current_time - lastload < LOAD_UPDATE_INTERVAL) {
		load_avg[0] = L1_DECAY * load_avg[0] +
			(1.0 - L1_DECAY) * rqueue;
		return;
	}

	while (lastload < current_time) {
		load_avg[0] = L1_DECAY * load_avg[0] +
			(1.0 - L1_DECAY) * rqueue;
		load_avg[1] = L2_DECAY * load_avg[1] +
			(1.0 - L2_DECAY) *
				((double)noperations /
					 (double)LOAD_UPDATE_INTERVAL);
		lastload += LOAD_UPDATE_INTERVAL;
	}
	noperations = 0;
	PP_TRACE (("Load averages rq %g, ops %g",
		   load_avg[0], load_avg[1]));
	lastload = current_time;
}

void schedule ()
{
	struct eventqueue *ev;
	static time_t lasttime;
	time_t	delta;
	static int	countdown = 50;

	PP_TRACE (("schedule ()"));

	if (opmode && nchansrunning == 0) {
		if (opmode == OP_SHUTDOWN)
			exit (0);
		if (opmode == OP_RESTART)
			restart ();
	}
	if (countdown -- < 0) {
		countdown = 50;
		sort_chans ();
	}

	if (lasttime == 0)
		lasttime = current_time;
	delta = current_time - lasttime;

	review_channels (current_time);
	do_load_avg ();

	if (EHead -> ev_forw != EHead)
		EHead -> ev_forw -> ev_time -= delta;


	for (ev = EHead -> ev_forw; ev != EHead;) {
		struct eventqueue *ev2 = ev -> ev_forw;

		if (ev -> ev_time <= 0) {
			do_event (ev);
			freevblk (ev);
		}
		else
			break;
		ev = ev2;
	}
	review_channels (current_time);

	if (EHead -> ev_forw == EHead )
		delaytime = nextchantime;
	else {
		PP_DBG (("event head time = %d delta = %d",
			 EHead -> ev_forw -> ev_time, delta));
		delaytime = max(0, EHead -> ev_forw -> ev_time);
	}
	if (nextchantime < delaytime)
		delaytime = nextchantime;

	PP_TRACE (("delaytime == %d", delaytime));
	lasttime = current_time;
	if (opmode && nchansrunning == 0) {
		if (opmode == OP_SHUTDOWN)
			exit (0);
		if (opmode == OP_RESTART)
			restart ();
	}
}

static void restart ()
{
	int 	n, i;
	extern char *cmddfldir, *dupfpath ();
	char	*path;

	PP_TRACE (("restart ()"));

	while (CHead -> cb_forw != CHead)
		freecblk (CHead -> cb_forw);	/* shut down connecitons */

	n = getdtablesize ();

	i = (isatty(2) ? 3 : 0);
	for (; i <= n; i++)
		(void) close (i);
#ifdef notdef
	for (i = 0; i < NSIG; i++)
		(void) signal (i, SIG_DFL);
#endif
	path = dupfpath (cmddfldir, myname);
	execl (path, myname, NULLCP);
	execl (myname, myname, NULLCP);
	_exit(1);
}

void review_channels (now)
time_t	now;
{
	Chanlist **clpp;
	time_t	tdiff = 0;

	nextchantime = MAX_SLEEP;
	rqueue = nchansrunning;
	for (clpp = chan_list; clpp < &chan_list[nchanlist]; clpp++) {
		switch (runnable (*clpp, now)) {
		    case OK:
			(void) newevblk (*clpp, NULLCB,
					 (*clpp)-> chan_special ?
					 cb_special : cb_channel,
					 (time_t)0);
			/* fall */
		    case DONE:
			rqueue ++;
			break;
		    case NOTOK:
			tdiff = (*clpp) -> nextevent - current_time;
			if (tdiff > 0 && tdiff < nextchantime)
				nextchantime = tdiff;
			break;
		}
	}
}

static int runnable (clp, now)
Chanlist *clp;
time_t	now;
{
	if (opmode)
		return NOTOK;
	if (clp -> chan_update) {
		investigate_chan (clp, now);
		clp -> chan_update = 0;
	}
	if (clp -> chan_special == 0 && clp -> num_msgs <= 0
	    && clp -> num_drs <= 0) /* special channel with things to do */
		return NOTOK;
	if (clp -> chan_enabled == 0) /* channel switched off */
		return NOTOK;
	if (clp -> nextevent > now ) /* not time yet */
		return NOTOK;
	if (nchansrunning >= maxchansrunning) /* too many running already */
		return DONE;
	if (clp -> nactive <= 0)
		return OK; /* None running - start one */
	if (clp -> chan -> ch_maxproc > 0 &&
	    clp -> nactive >= clp -> chan -> ch_maxproc)
		return NOTOK;
/*
 * So there is a channel already running:
 * OK - should we start more - to do this we need the following
 * o More mtas ready than chans currently running
 * o We started the last channel more than a 15 secs ago
 *
 * o We aren't consuming all the channels allowed
 * o More messages than channels * 5 OR last time we started was ages ago
 */
	if (clp -> nmtas <= clp -> nactive
	    || clp -> laststart + 15 > current_time)
		return NOTOK;

	
	/* ok - we ought to start a new channel - but should we? */

	if (clp -> nactive >= max(1,maxchansrunning-1))
		return DONE;	/* Would like to - but out of resources */
	
	if ((clp -> num_msgs + clp -> num_drs) > clp -> nactive * 5
	    || clp -> laststart + 60*5 < current_time) {
		char	*mta = NULLCP;
		
		if (nextmsg (clp, &mta, 0) == NULL)
			return NOTOK; /* nope all locked */
		PP_TRACE (("Can start a new chan on %s", mta));
		free (mta);
		return OK;
	}
	/* No - this channel is just not ready to start another one */
	return NOTOK;
}

static void do_event (ev)
struct eventqueue *ev;
{
	Connblk *cb;

	PP_TRACE (("do_event ()"));

	if ((cb = ev -> ev_conn) == NULLCB) {
		ev -> ev_conn = cb = newcblk (ev -> ev_type);
		cb -> cb_clp = ev -> ev_clp;
	}

	switch (cb -> cb_type) {
	    case cb_channel:
		channel_event (ev);
		break;

	    case cb_special:
		special_event (ev);
		break;

	    case cb_timer:
		PP_TRACE (("Timeout event"));
		if (cb -> cb_proc)
			(*cb -> cb_proc)();
		if (cb -> cb_reload)
			newevblk (NULLCHANLIST, cb, cb_timer, cb -> cb_reload);
		else	freecblk (cb);
		break;

	    default:
		PP_LOG (LLOG_EXCEPTIONS, ("Unknown type %d", cb -> cb_type));
		break;
	}
}

static void channel_event (ev)
struct eventqueue *ev;
{
	Connblk *cb = ev -> ev_conn;
	Mlist	*ml;
	Chanlist *clp;

	PP_TRACE (("channel_event ()"));

	if ((clp = ev -> ev_clp) == NULLCHANLIST)
		PP_TRACE (("NO chanlist of event queue"));

	switch (cb -> cb_state) {
	    case cb_idle:
		if (runnable (clp, current_time) != OK) {
			freecblk (cb);
			return;
		}
		if ((ml = nextmsg (cb -> cb_clp, &cb -> cb_mta, 1)) == NULL) {
			clp -> chan_update = 1;
			freecblk (cb);
			return;
		}
		cb -> cb_ml = ml;
		cb -> cb_clp -> lastattempt = current_time;
		cb -> cb_clp -> laststart = current_time;
		switch (start_async (cb, clp -> channame)) {
		    case NOTOK:
			cache_inc (&clp -> cache, cache_time);
			clp -> chan_update = 1;
			cleanup_conn (cb);
			break;

		    case DONE:
			(void) newevblk (cb -> cb_clp, cb,
					 cb -> cb_type, (time_t)0);
			/* and fall */
		    case OK:
			clp -> nactive ++;
			nchansrunning ++;
			break;
		}
		break;

	    case cb_conn_established:
		chan_invoke (cb, DIS_INIT, (caddr_t *) &clp -> channame);
		break;

	    case cb_conn_request1:
	    case cb_conn_request2:
	    case cb_init_sent:
	    case cb_proc_sent:
	    case cb_close_sent:
		PP_TRACE (("Shouldn't be in this state - %d",
			   cb -> cb_state));
		break;
		
	    case cb_active:
		if (cb -> cb_clp -> chan_enabled == 0 || opmode)
			chan_invoke (cb, DIS_QUIT,
				     (caddr_t *)0);
		else if (cb -> cb_ml == NULL &&
			 (cb -> cb_ml = nextmsg (cb -> cb_clp, &cb -> cb_mta, 1))
			 == NULL) {
			
			if (cb -> cb_clp -> cache.cachetime < current_time)
				/* stop it thrashing... */
				cb -> cb_clp -> cache.cachetime =
					current_time + 15;

			chan_invoke (cb, DIS_QUIT, (caddr_t *)0);
		}
		else {
			chan_invoke (cb, DIS_PROC, (caddr_t *)0);
		}
		break;
	}
}

static void special_event (ev)
struct eventqueue *ev;
{
	Connblk *cb = ev -> ev_conn;
	Chanlist	*clp = ev -> ev_clp;

	PP_TRACE (("special_event (%s)", cb_print (cb)));
	switch (cb -> cb_state) {
	    case cb_idle:
		if (runnable (clp, current_time) != OK) {
			freecblk (cb);
			return;
		}
		clp -> lastattempt = current_time;
		clp -> laststart = current_time;

		switch (start_async (cb, clp -> channame)) {
		    case NOTOK:
			PP_LOG (LLOG_EXCEPTIONS,
				("Can't start special channel %s",
				 clp -> channame));
			cache_inc (&clp -> cache, (time_t)60);
			(void) newevblk (NULLCHANLIST, NULLCB, cb_timer,
					 (time_t) 60);
			clp -> chan_update = 1;
			freecblk (cb);
			break;

		    case DONE:
			(void) newevblk (clp, cb, cb -> cb_type,
					 (time_t)0);
		    case OK:
			clp -> nactive ++;
			nchansrunning ++;
			break;
		}
		break;

	    case cb_active:
		chan_invoke (cb, DIS_QUIT, (caddr_t *)0);
		switch (clp -> chan -> ch_chan_type) {
		    case CH_QMGR_LOAD:
			clp -> cache.cachetime = current_time + load_time;
			break;
		    case CH_DEBRIS:
			clp -> cache.cachetime = current_time + debris_time;
			break;
		}
		clp -> chan_update = 1;
		break;

	    case cb_conn_established:
		switch (clp -> chan -> ch_chan_type) {
		    case CH_QMGR_LOAD:
			PP_TRACE (("start loading"));
			chan_invoke (cb, DIS_READQ, (caddr_t *)0);
			break;
		    case CH_DEBRIS:
			PP_TRACE (("start debris collection"));
			chan_invoke (cb, DIS_INIT,
				     (caddr_t *)&clp -> channame);
			break;
		    case CH_TIMEOUT:
			break;
		}
		break;
	    default:
		PP_LOG (LLOG_EXCEPTIONS, ("Bad state in special %d",
					  cb -> cb_state));
		break;
	}
}

int start_async (cb, title)
Connblk *cb;
char	*title;
{
	int	sd;
	int	result;

	PP_NOTICE (("Starting channel %s %s", title, cb_print(cb)));
	switch(result = assoc_start (title, &sd)) {
	    case NOTOK:
		advise (LLOG_EXCEPTIONS, NULLCP, "Can't start channel %s",
			title);
		return NOTOK;

#ifdef CONNECTING_1
	    case CONNECTING_1:
		cb -> cb_state = cb_conn_request1;
		cb -> cb_fd = sd;
		FD_CLR (sd, &perf_rfds);
		FD_SET (sd, &perf_wfds);
		if (sd >= perf_nfds)
			perf_nfds = sd + 1;
		break;

	    case CONNECTING_2:
		cb -> cb_state = cb_conn_request2;
		cb -> cb_fd = sd;
		FD_CLR (sd, &perf_wfds);
		FD_SET (sd, &perf_rfds);
		if (sd >= perf_nfds)
			perf_nfds = sd + 1;
		break;
#else
	    case OK:
		cb -> cb_state = cb_conn_request1;
		cb -> cb_fd = sd;
		FD_CLR (sd, &perf_rfds);
		FD_SET (sd, &perf_wfds);
		if (sd >= perf_nfds)
			perf_nfds = sd + 1;
		break;
#endif

	    case DONE:
		cb -> cb_state = cb_conn_established;
		cb -> cb_fd = sd;
		FD_CLR (sd, &perf_wfds);
		FD_SET (sd, &perf_rfds);
		if (sd >= perf_nfds)
			perf_nfds = sd + 1;
		break;

	    default:
		PP_LOG (LLOG_EXCEPTIONS,
			("Unknown return code from assoc_start"));
		return NOTOK;
	}
	start_timer ();
	cb -> cb_ttl = current_time + CHAN_TIMEOUT;
	PP_DBG (("Association descriptor=%d", sd));
	return result;
}

int chan_invoke (cb, type, arg)
Connblk *cb;
int	type;
caddr_t	*arg;
{
	struct client_dispatch *ds;
	int result;

	noperations ++;
	ds = &dis[type];
	switch (type) {
	    case DIS_INIT:
	    case DIS_QUIT:
		PP_TRACE (("%s %s", ds -> ds_name, cb_print (cb)));
		break;
	    case DIS_READQ:
	    case DIS_PROC:
		PP_NOTICE (("%s %s", ds -> ds_name, cb_print (cb)));
		break;
	}
	switch (result = invoke (cb, table_Qmgr_Operations, 
				 ds, arg)) {
	    case OK:
		break;

	    case NOTOK:
		cb -> cb_state = cb_active;
		break;

	    case DONE:
		cleanup_conn (cb);
		break;
	}
	return result;
}

/* \f

 */

static int invoke (cb, ops, ds, args)
Connblk *cb;
struct RyOperation ops[];
register struct client_dispatch *ds;
char  **args;
{
    int	    result;
    caddr_t in;
    struct RoSAPindication  rois;
    register struct RoSAPindication *roi = &rois;
    register struct RoSAPpreject   *rop = &roi -> roi_preject;
    int	sd;

    PP_TRACE (("invoke (%s)", cb_print (cb)));

    sd = cb -> cb_fd;
    in = NULL;
    if (ds -> ds_argument &&
	(result = (*ds -> ds_argument) (sd, ds, args, &in)) != OK)
	return result;

     switch (result = RyStub (sd, ops, ds -> ds_operation,
			      cb -> cb_id = RyGenID (sd),
			      NULLIP, in, ds -> ds_result, ds -> ds_error,
			      ROS_ASYNC, roi)) {
	case NOTOK:		/* failure */
	    ros_advise (rop, "STUB");
	    if (ROS_FATAL (rop -> rop_reason))
		    return DONE;
	    return NOTOK;

	case OK:		/* got a result/error response */
	    break;

	case DONE:		/* got RO-END? */
	    adios (NULLCP, "got RO-END.INDICATION");
	    return NOTOK;

	default:
	    adios (NULLCP, "unknown return from RyStub=%d", result);
	    /* NOTREACHED */
    }

    if (ds -> ds_free && in)
	(void) (*ds -> ds_free) (in);
    return OK;
}

void chan_manage (fd)
int	fd;
{
	register Connblk *cb;
	int	sd;

	if ((cb = findcblk (fd)) == NULLCB) {
		advise (LLOG_EXCEPTIONS, NULLCP, "fd %d is not registered!", fd);
		FD_CLR (fd, &perf_rfds);
		FD_CLR (fd, &perf_wfds);
		return;
	}

	PP_TRACE (("chan_manage (%s)", cb_print (cb) ));

	switch (cb -> cb_state) {
	    case cb_conn_request1:
	    case cb_conn_request2:
		PP_TRACE (("Awaiting async connection (state %d)",
			   cb -> cb_state == cb_conn_request1 ? 1 : 2));
		switch (acsap_retry (sd = cb -> cb_fd)) {
		    case NOTOK:
			PP_TRACE (("Connection error on %s", cb_print (cb)));
			delay_channel (cb);
			cb -> cb_state = cb_idle;
			cleanup_conn (cb);
			return;
#ifdef CONNECTING_1
		    case CONNECTING_1:
			cb -> cb_state = cb_conn_request1;
			PP_TRACE (("State 1 on %s", cb_print (cb)));
			FD_CLR (sd, &perf_rfds);
			FD_SET (sd, &perf_wfds);
			if (sd >= perf_nfds)
				perf_nfds = sd + 1;
			break;

		    case CONNECTING_2:
			cb -> cb_state = cb_conn_request2;
			PP_TRACE (("State 2 on %s", cb_print (cb)));
			FD_CLR (sd, &perf_wfds);
			FD_SET (sd, &perf_rfds);
			if (sd >= perf_nfds)
				perf_nfds = sd + 1;
			break;
#else
		    case OK:
			cb -> cb_state = cb_conn_request2;
			PP_TRACE (("Switching to state 2 on %s",
				   cb_print(cb)));
			FD_CLR (sd, &perf_wfds);
			FD_SET (sd, &perf_rfds);
			if (sd >= perf_nfds)
				perf_nfds = sd + 1;
			break;
#endif
		    case DONE:
			cb -> cb_state = cb_conn_established;
			PP_TRACE (("connection now established on %s",
				   cb_print (cb)));
			FD_CLR (sd, &perf_wfds);
			FD_SET (sd, &perf_rfds);
			if (sd >= perf_nfds)
				perf_nfds = sd + 1;
			(void) newevblk (cb -> cb_clp, cb,
					 cb -> cb_type, (time_t)0);
			break;
		}
		break;
	    case cb_idle:
		freecblk (cb);
		break;
	    case cb_proc_sent:
		PP_TRACE (("Awaiting proc response ..."));
		getresponse (cb, OK);
		break;
	    case cb_init_sent:
		PP_TRACE (("Awaiting init response"));
		getresponse (cb, OK);
		break;
	    default:
		PP_TRACE (("Funny state!"));
		break;
	}
}

static void getresponse (cb, type)
Connblk *cb;
int	type;
{
	caddr_t	out;
	struct RoSAPindication	rois;
	register struct RoSAPindication *roi = &rois;
	register struct RoSAPpreject   *rop = &roi -> roi_preject;
	
	PP_TRACE (("getresponse (%s)", cb_print (cb)));

	switch (RyWait (cb -> cb_fd, &cb -> cb_id, &out, type, roi)) {
	    case NOTOK:
		if (rop -> rop_reason == ROS_TIMER)
			break;
		PP_TRACE (("RyWait returned NOTOK"));
		ros_advise (rop, "STUB");
		if (ROS_FATAL (rop -> rop_reason)) {
			delay_channel (cb);
			cleanup_conn (cb);
		}
		break;

	    case OK:
		PP_TRACE (("RyWait returned OK"));
		cb -> cb_state = cb_active;
		(void) newevblk (cb -> cb_clp, cb,
				 cb -> cb_type, (time_t)0);
		break;

	    case DONE:
		cb -> cb_ml = NULL;
		cleanup_conn (cb);
		break;
	}
}

#ifdef notdef
static void setfds ()
{
	register Connblk *cb;
	struct PSAPindication pi;

	PP_TRACE (("setfds ()"));

	FD_ZERO (&perf_wfds);
	FD_ZERO (&perf_rfds);
	perf_nfds = 0;

	for (cb = CHead -> cb_forw; cb != CHead; cb = cb -> cb_forw)
		switch (cb -> cb_state) {
		    case cb_conn_request1:
			if (PSelectMask (cb -> cb_fd, &perf_wfds,
					 &perf_nfds, &pi) == NOTOK)
				PP_LOG (LLOG_EXCEPTIONS,
					 ("PSelectMask failed"));
			break;
		    case cb_conn_request2:
		    case cb_proc_sent:
		    case cb_init_sent:
		    case cb_conn_established:
			if (PSelectMask (cb -> cb_fd, &perf_rfds,
					 &perf_nfds, &pi) == NOTOK)
				PP_LOG (LLOG_EXCEPTIONS,
					("PSelectMask failed"));
			break;
		    default:
			break;
		}
	PP_TRACE (("nrfds=%d rfds=0x%x wfds=0x%x", perf_nfds,
		 perf_rfds.fds_bits[0], perf_wfds.fds_bits[0]));
}
#endif

/* ARGSUSED */
static int  do_quit (sd, ds, args, arg)
int	sd;
struct client_dispatch *ds;
char  **args;
caddr_t	*arg;
{
	Connblk *cb;

	PP_TRACE (("do_quit(%d)", sd));

	assoc_release (sd);
	if (sd != NOTOK) {
		FD_CLR (sd, &perf_wfds);
		FD_CLR (sd, &perf_rfds);
	}
	if ((cb = findcblk (sd)) != NULLCB)
		cb -> cb_fd = NOTOK;
	else
		return DONE;

	PP_TRACE (("do_quit (%s)", cb_print (cb) ));
	if (cb -> cb_type == cb_channel)
		cb -> cb_clp -> chan_update = 1;

	PP_NOTICE (("Closing channel %s", cb_print (cb)));

	return DONE;
}

/* connection block functions */

Connblk *newcblk (type)
enum	cb_type type;
{
	Connblk *cb;

	PP_TRACE (("newcblk () cnt=%d", cb_count));

	cb = (Connblk *) calloc (1, sizeof *cb);
	if (cb == NULLCB)
		return cb;

	cb -> cb_fd = NOTOK;
	cb -> cb_type = type;

	if (cb_once_only == 0) {
		CHead -> cb_forw = CHead -> cb_back = CHead;
		cb_once_only ++;
	}
	insque (cb, CHead -> cb_back);
	cb_count ++;
	return cb;
}

void freecblk (cb)
Connblk *cb;
{
	struct AcSAPindication	acis;
	struct RoSAPindication	rois;
	register struct RoSAPindication *roi = &rois;
	
	if (cb == NULLCB)
		return;

	PP_TRACE (("freecblk (%s) cnt=%d",
		cb_print(cb), cb_count));

	if (cb -> cb_fd != NOTOK) {
		PP_NOTICE (("Killing channel %s",
			    cb_print (cb)));
		(void) AcUAbortRequest (cb -> cb_fd, NULLPEP, 0, &acis);
		(void) RyLose (cb -> cb_fd, roi);
		FD_CLR (cb -> cb_fd, &perf_wfds);
		FD_CLR (cb -> cb_fd, &perf_rfds);
	}
	remque (cb);

	cb_count --;

	free ((char *) cb);
}

Connblk *findcblk (fd)
int	fd;
{
	Connblk *cb;

	PP_TRACE (("findcblk (%d) cnt=%d", fd, cb_count));

	if (cb_once_only == 0)
		return NULLCB;

	for (cb = CHead -> cb_forw; cb != CHead; cb = cb -> cb_forw)
		if (cb -> cb_fd == fd)
			return cb;

	PP_TRACE (("Can't locate block with %d (%d alloc'd)",
		fd, cb_count));
	return NULLCB;
}

static char	*cb2name (cb)
Connblk *cb;
{
	switch (cb -> cb_type) {
	    case cb_channel:
	    case cb_special:
		if (cb -> cb_clp && cb -> cb_clp -> channame)
			return cb -> cb_clp -> channame;
		else	return "unknown channel";

	    case cb_timer:
		return "timer";

	    case cb_responder:
		return "responder";

	    default:
		break;
	}
	return "something";
}

static char	*cb_print (cb)
Connblk *cb;
{
	static char buf[128];

	switch (cb -> cb_type) {
	    case cb_channel:
	    case cb_special:
		(void) sprintf (buf, "<fd=%d chan=%s mta=%s msg=%s>",
				cb -> cb_fd,
				cb2name(cb),
				cb -> cb_mta ? cb -> cb_mta : "none",
				(cb -> cb_ml && cb -> cb_ml -> ms ) ?
				cb -> cb_ml -> ms -> queid : "none");
		break;
	    case cb_timer:
		(void) sprintf (buf, "<timer proc=0x%x reload=%d>",
				cb -> cb_proc, cb -> cb_reload);
		break;

	    case cb_responder:
		(void) sprintf (buf, "<fd=%d responder auth=%d>",
				cb -> cb_fd, cb -> cb_authenticated);
		break;
	}
			
	return buf;
}

static struct eventqueue *newevblk (clp, cb, type, when)
Chanlist *clp;
Connblk *cb;
time_t	when;
enum cb_type type;
{
	struct eventqueue *ev, *pev;

	PP_TRACE (("newevblk (clp, cb, %d, %d) cnt=%d",
		   type, when, ev_count));

	ev = (struct eventqueue *) calloc (1, sizeof *ev);
	if (ev == NULLEV) 
		return ev;

	ev_count ++;
	ev -> ev_clp = clp;
	ev -> ev_conn = cb;
	ev -> ev_type = type;

	for (pev = EHead -> ev_forw; pev != EHead; pev = pev -> ev_forw) {
		if (pev -> ev_time > when) {
			insque (ev, pev -> ev_back);
			break;
		}
		else
			when -= pev -> ev_time;
	}
	if (pev == EHead)
		insque (ev, EHead -> ev_back);
	for (; pev != EHead; pev = pev -> ev_forw)
		pev -> ev_time -= when;

	ev -> ev_time = when;

	return ev;
}

static void freevblk (ev)
struct eventqueue *ev;
{
	PP_TRACE (("freevblk () cnt=%d", ev_count));

	if (ev == NULLEV)
		return;

	if (ev -> ev_forw != EHead)
		ev -> ev_forw -> ev_time += ev -> ev_time;
			
	remque (ev);

	free ((char *) ev);
	ev_count --;
}

#ifdef notdef
static time_t evtimebyclp (clp)
Chanlist *clp;
{
	struct eventqueue *ev;
	time_t	now = current_time;
	
	for (ev = EHead -> ev_forw; ev != EHead; ev = ev -> ev_forw)
		if (ev -> ev_clp == clp)
			return now + ev -> ev_time;
		else now += ev -> ev_time;
	return now + MAX_SLEEP + 2;
}

static struct eventqueue *findevbyclp (clp)
Chanlist *clp;
{
	struct eventqueue *ev;

	for (ev = EHead -> ev_forw; ev != EHead; ev = ev -> ev_forw)
		if (ev -> ev_clp == clp)
			return ev;
	return NULLEV;
}
#endif

static void init_events ()
{
	PP_TRACE (("init_events ()"));

	EHead -> ev_forw = EHead -> ev_back = EHead;
}

#define DECAY_FACTOR 0.7	/* decay filter - set fairly high */
void inc_channel (cb, number)	/* successful delivery */
Connblk *cb;
int	number;
{
	MsgStruct *ms;
	Reciplist *rlp;
	Mtalist *mlp;
	LIST_RCHAN *clp;
	int i;
	double	f;

	PP_TRACE (("inc_channel (%s, %d)",
		   cb_print (cb), number));

	cb -> cb_clp -> lastsuccess = current_time;
	ms = cb -> cb_ml -> ms;
	i = ms -> size / 1000;
	i = max (i, 1);
	f = (double)(current_time - cb -> cb_clp -> lastattempt) /
		(double)i;

	cb -> cb_clp -> averaget =
		(DECAY_FACTOR * cb -> cb_clp -> averaget) +
			(1 - DECAY_FACTOR) * f;
	PP_TRACE (("average = %g, this value = %g",
		    cb -> cb_clp -> averaget, f));

	clear_msgcache (cb -> cb_ml);
	delfromchan (cb -> cb_clp, cb -> cb_mta, cb -> cb_ml, number);
	cache_clear (&cb -> cb_clp -> cache);

	if (mlp = findmtalist (cb -> cb_clp, cb -> cb_mta)) {
		cache_clear (&mlp -> cache);
		mlp -> lastsuccess = current_time;
	}

	if (ms == NULL) {
		advise (LLOG_EXCEPTIONS, NULLCP, "Can't locate message!");
		return;
	}
	for (rlp = ms  -> recips; rlp; rlp = rlp -> rp_next)
		if (rlp -> id == number)
			break;
	if (rlp == NULL) {
		advise (LLOG_EXCEPTIONS, NULLCP, "No recipient %d in list!",
			number);
		return;
	}

	switch (rlp -> status) {
	    case st_normal:
		i = ++rlp -> chans_done;

		for (clp = rlp -> chans; clp && i > 0;
		     clp = clp -> li_next, i--)
			continue;
		if (clp) {
			insertinchan (clp -> li_chan, ms, rlp,
				      chan2mta (clp -> li_chan, rlp));
			return;
		}
		/* else fall through */
	    case st_dr:
		rlp -> status = st_delete;
		if (ms -> count == 0)
			insertindelchan (ms);
		break;
	    case st_delete:
		cb -> cb_ml = NULL;
		maybezapmsg (ms);
		break;
	}
}

void delay_host (cb, rno, str)
Connblk *cb;
int	rno;
char	*str;
{
	struct Mtalist *mlp, *findmtalist ();

	PP_TRACE (("delay_host (%s, %d)", cb_print (cb), rno));

	if (cb -> cb_ml)
		msg_unlock (cb -> cb_ml -> ms);

	if ((mlp = findmtalist (cb -> cb_clp, cb -> cb_mta)) == NULL) {
		advise (LLOG_EXCEPTIONS, NULLCP, "Can't locate mta in queue");
		return;
	}
	if (mlp -> info) {
		free (mlp -> info);
		mlp -> info = NULLCP;
	}
	if (str)
		mlp -> info = strdup (str);
	cache_inc (&mlp -> cache, cache_time*3/2);
	mlp -> mta_changed = 1;
	if (cb -> cb_clp)
		cb -> cb_clp -> chan_update = 1;
}

void delay_message (cb, str)
Connblk *cb;
char	*str;
{
	Mtalist	*mlp;

	PP_TRACE (("delay_message (%s)", cb_print (cb)));

	if (cb -> cb_ml == NULL)
		return;
	if ((mlp = findmtalist (cb -> cb_clp, cb -> cb_mta)) == NULL) {
		advise (LLOG_EXCEPTIONS, NULLCP, "Can't locate mta in queue");
		return;
	}
	if (cb -> cb_ml -> info) {
		free (cb -> cb_ml -> info);
		cb -> cb_ml -> info = NULLCP;
	}
	if (str)
		cb -> cb_ml -> info = strdup (str);
	cb -> cb_ml -> ms -> nerrors ++;
	msg_unlock (cb -> cb_ml -> ms);
	msgcache_inc (cb -> cb_ml, cache_time*2);
	mlp -> mta_changed = 1;
	cb -> cb_clp -> chan_update = 1;
}

static void bad_recip (cb, rno)
Connblk *cb;
int	rno;
{
	Reciplist *rlp;
	Mtalist *mlp;
	PP_TRACE (("bad_recip (%s, %d)", cb_print (cb), rno));

	cb -> cb_clp -> lastsuccess = current_time;
	if (mlp = findmtalist (cb -> cb_clp, cb -> cb_mta))
		mlp -> lastsuccess = current_time;

	delfromchan (cb -> cb_clp, cb -> cb_mta, cb -> cb_ml, rno); 

	for (rlp = cb -> cb_ml -> ms -> recips; rlp; rlp = rlp -> rp_next)
		if (rlp -> id == rno)
			break;
	if (rlp == NULLRL)
		return;

	rlp -> chans_done = 0;
	insertindrchan (cb -> cb_ml -> ms, rlp);
}

static void sharedDR (cb, rno)
Connblk *cb;
int	rno;
{
	Mtalist *mlp;
	PP_TRACE (("sharedDR (%s, %d)", cb_print (cb), rno));

	cb -> cb_clp -> lastsuccess = current_time;
	if (mlp = findmtalist (cb -> cb_clp, cb -> cb_mta))
		mlp -> lastsuccess = current_time;
	delfromchan (cb -> cb_clp, cb -> cb_mta, cb -> cb_ml, rno);
}

/* ARGSUSED */

static int do_processmessage (sd, ds, args, arg)
int	sd;
struct client_dispatch *ds;
char	**args;
struct type_Qmgr_ProcMsg **arg;
{
	Connblk *cb;
	Mlist *ml;
	Mtalist *mlp;
	register struct type_Qmgr_ProcMsg *pm;
	struct type_Qmgr_UserList *lusers ();
	char	*p;

	PP_TRACE (("do_processmessage (%d)", sd));

	if ((cb = findcblk (sd)) == NULLCB)
		return NOTOK;
	if (cb -> cb_state != cb_active)
		return NOTOK;
	ml = cb -> cb_ml;
	if (ml == NULL) {
		PP_LOG (LLOG_EXCEPTIONS, ("Missing ml structure"));
		return NOTOK;
	}
	*arg = pm = (struct type_Qmgr_ProcMsg *) malloc (sizeof (**arg));
	
	pm -> channel = str2qb (cb -> cb_clp -> channame,
			     strlen (cb -> cb_clp -> channame), 1);

	p = ml -> ms -> queid;
	pm -> qid = str2qb (p, strlen (p), 1);
	pm -> users = lusers (ml, 0);
	if (pm -> users == NULL)
		PP_LOG (LLOG_EXCEPTIONS, ("Empty user list!"));
	cb -> cb_ttl = current_time + CHAN_TIMEOUT + (ml -> ms -> size /20);
	cb -> cb_state = cb_proc_sent;
	if (mlp = findmtalist (cb -> cb_clp, cb -> cb_mta))
		mlp -> lastattempt = current_time;
	return OK;
}

/* ARGSUSED */
static int do_readqueue (sd, ds, args, arg)
int	sd;
struct client_dispatch *ds;
char	**args;
struct type_Qmgr_ReadQueue__Pseudo **arg;
{
	Connblk *cb;

	PP_TRACE (("do_readqueue (%d)", sd));
	if ((cb = findcblk (sd)) == NULLCB)
		return NOTOK;
	cb -> cb_state = cb_proc_sent;
	return OK;
}



/* ARGSUSED */
static int do_initchannel (sd, ds, args, arg)
int	sd;
struct client_dispatch *ds;
char **args;
struct type_Qmgr_Channel **arg;
{
	Connblk *cb;

	if ((cb = findcblk (sd)) == NULLCB)
		return NOTOK;

	PP_TRACE (("do_initchannel (%s)", cb_print (cb) ));

	*arg = str2qb (*args, strlen (*args), 1);
	cb -> cb_state = cb_init_sent;
	cb -> cb_ttl = current_time + CHAN_TIMEOUT;
	return OK;
}

/* ARGSUSED */
static int processmessage_result (sd, id, dummy, result, roi)
int	sd,
	id,
	dummy;
register struct type_Qmgr_DeliveryStatus *result;
struct RoSAPindication *roi;
{
	Connblk *cb;
	struct type_Qmgr_DeliveryStatus *ds;
	int	rno;
	char	buf[LINESIZE];
	char	*info;

	PP_TRACE (("processmessage_result (%d)", sd));

	if ((cb = findcblk (sd)) == NULLCB) {
		advise (LLOG_EXCEPTIONS, NULLCP, "No connection block for %d",
			sd);
		return NOTOK;
	}
	PP_TRACE (("processmessage_result (%s)", cb_print (cb)));
	for (ds = result; ds; ds = ds -> next) {
		rno = ds -> IndividualDeliveryStatus -> recipient -> parm;
		(void) sprintf (buf, "Recipient ID %d %s:", rno,
				cb_print (cb));
		if (ds -> IndividualDeliveryStatus -> info)
			info = qb2str(ds -> IndividualDeliveryStatus -> info);
		else	info = NULLCP;
		switch (ds -> IndividualDeliveryStatus -> status) {
		    case int_Qmgr_status_success:
			PP_NOTICE (("%s success", buf));
			inc_channel (cb, rno);
			break;

		    case int_Qmgr_status_negativeDR:
		    case int_Qmgr_status_positiveDR:
			PP_NOTICE (("%s %s", buf,
				    ds -> IndividualDeliveryStatus -> status
				    == int_Qmgr_status_positiveDR ?
				    "positiveDR" : "negativeDR"));
			bad_recip (cb, rno);
			break;

		    case int_Qmgr_status_successSharedDR:
		    case int_Qmgr_status_failureSharedDR:
			PP_NOTICE (("%s %s", buf,
				    ds -> IndividualDeliveryStatus -> status
				    == int_Qmgr_status_failureSharedDR ?
				    "failureSharedDR" : "sucessSharedDR"));
			sharedDR(cb, rno);
			break;

		    case int_Qmgr_status_messageFailure:
			PP_NOTICE (("%s messageFailure (%s)", buf,
				    info ? info : ""));
			delay_message (cb, info);
			break;

		    case int_Qmgr_status_mtaFailure:
			PP_NOTICE (("%s mtaFailure (%s)", buf,
				    info ? info : ""));
			delay_host (cb, rno, info);
			break;

		    case int_Qmgr_status_mtaAndMessageFailure:
			PP_NOTICE (("%s mtaAndMessageFailure (%s)", buf,
				    info ? info : ""));
			delay_message (cb, info);
			delay_host (cb, rno, info);
			break;

		    default:
			PP_NOTICE (("%s Unknown response", buf));
			break;
		}
		if (info)
			free (info);
	}
	cb -> cb_state = cb_active;
	cb -> cb_ml = NULL;
	return OK;
}

/* \f

 RPC procedures... */

/* ARGSUSED */
static int readqueue_result (sd, id, dummy, result, roi)
int	sd,
	id,
	dummy;
register struct type_Qmgr_MsgList *result;
struct RoSAPindication *roi;
{
	struct type_Qmgr_MsgStructList *ml;
	MsgStruct *ms, *oldms;
	char	*p;
	struct type_Qmgr_QidList *ql;

	PP_TRACE (("readqueue_result (%d, %d)", sd, id));

	for (ml = result -> msgs; ml; ml = ml -> next) {
		ms = newmsgstruct (ml -> MsgStruct);
		if (oldms = find_msg (ms -> queid)) {
			(void) updatemessage (oldms, ms);
			freems (ms);
		}
		else
			(void) insertmessage (ms);
	}
	for (ql = result -> deleted; ql; ql = ql -> next) {
		p = qb2str (ql -> QID);
		if (ms = find_msg (p))
			kill_msg (ms);
		free (p);
	}
	return OK;
}

/* ARGSUSED */
static int channelinit_result (sd, id, dummy, result, roi)
int	sd,
	id,
	dummy;
struct type_Qmgr_Pseudo__channelInitialise *result;
struct RoSAPindication *roi;
{
	PP_TRACE (("channelinit_result (%d, %d)", sd, id));

	return OK;
}
	
/* ARGSUSED */
static int general_error (sd, id, error, parameter, roi)
int	sd,
	id,
	error;
caddr_t	parameter;
struct RoSAPindication *roi;
{
	register struct RyError *rye;
	Connblk *cb;

	if ((cb = findcblk (sd)) == NULLCB)
		return NOTOK;
	PP_TRACE (("general_error (%s)", cb_print(cb)));
	if (cb -> cb_type != cb_channel) {
		PP_LOG (LLOG_EXCEPTIONS, ("Not a channel!!!"));
		return NOTOK;
	}

	if (error == RY_REJECT) {
		advise (LLOG_EXCEPTIONS, NULLCP, "%s",
			RoErrString ((int) parameter));
		delay_channel (cb);
		return NOTOK;
	}

	if (rye = finderrbyerr (table_Qmgr_Errors, error))
		advise (LLOG_EXCEPTIONS, NULLCP, "%s", rye -> rye_name);
	else
		advise (LLOG_EXCEPTIONS, NULLCP, "Error %d", error);

	delay_channel (cb);
	return NOTOK;
}

/* ARGSUSED */
static int processmessage_error (sd, id, error, parameter, roi)
int	sd,
	id,
	error;
caddr_t	parameter;
struct RoSAPindication *roi;
{
	register struct RyError *rye;
	Connblk *cb;
	char	tbuf[128];

	if ((cb = findcblk (sd)) == NULLCB)
		return NOTOK;
	PP_TRACE (("processmessage_error (%s)", cb_print(cb)));

	if (cb -> cb_type != cb_channel) {
		PP_LOG (LLOG_EXCEPTIONS, ("Not a channel!!!"));
		return NOTOK;
	}

	if (error == RY_REJECT) {
		advise (LLOG_EXCEPTIONS, NULLCP, "%s",
			RoErrString ((int) parameter));
		delay_message (cb, RoErrString ((int) parameter));
		return OK;
	}

	if (rye = finderrbyerr (table_Qmgr_Errors, error))
		(void) sprintf (tbuf, "process message error %s",
				rye -> rye_name);
	else
		(void) sprintf (tbuf, "process message error %d", error);
	advise (LLOG_EXCEPTIONS, NULLCP, "%s", tbuf);

	delay_message (cb, tbuf);
	return OK;
}

void delay_channel (cb)
Connblk *cb;
{
	PP_TRACE (("delay_channel (%s)", cb_print (cb) ));

	if (cb -> cb_type == cb_responder || cb -> cb_type == cb_timer)
		return;
	if (cb -> cb_clp == NULL)
		return;
	cache_inc (&(cb -> cb_clp -> cache), cache_time);
	cb -> cb_clp -> chan_update = 1;
}

void investigate_chan (clp, now)
Chanlist *clp;
time_t	now;
{
	Mtalist *mlp;

	PP_TRACE (("investigate_chan (%s)", clp -> channame));

	clp -> nextevent = now + MAX_SLEEP;
	clp -> oldest = now;
	clp -> nmtas = 0;

	for (mlp = clp -> mtas -> mta_forw; mlp != clp -> mtas;
	     mlp = mlp -> mta_forw) {
		if (mlp -> mta_changed || mlp -> nextevent < current_time) {
			investigate_mta (mlp, now);
			mlp -> mta_changed = 0;
		}
		if (mlp -> oldest < clp -> oldest)
			clp -> oldest = mlp -> oldest;
		if (!mtaready (mlp))
			continue;
		clp -> nmtas ++;
		if (mlp -> nextevent < clp -> nextevent)
			clp -> nextevent = mlp -> nextevent;
	}
	if (clp -> chan_special)
		clp -> nextevent = now;
	if (clp -> cache.cachetime > now)
		clp -> nextevent = clp -> cache.cachetime;
	if (clp -> chan_enabled == 0)
		clp -> nextevent = now + MAX_SLEEP;
}

void investigate_mta (mlp, now)
Mtalist *mlp;
time_t	now;
{
	Mlist	*ml;
	time_t cachet;

	PP_TRACE (("investigate_mta (%s)", mlp -> mtaname));

	mlp -> nextevent = now + MAX_SLEEP + 1;
	mlp -> oldest = now;

	for (ml = mlp -> msgs -> ml_forw; ml != mlp -> msgs;
	     ml = ml -> ml_forw) {
		/* this calc has to be done  - or we could optimise */
		if (ml -> ms -> age < mlp -> oldest)
			mlp -> oldest = ml -> ms -> age;

		if (!msgready (ml))
			continue;

		if ((cachet = msgmincache (ml)) > now) {
			if (cachet < mlp -> nextevent)
				mlp -> nextevent = cachet;
		}
		else if (ml -> ms -> defferedtime &&
			 ml -> ms -> defferedtime < mlp -> nextevent)
			mlp -> nextevent = ml -> ms -> defferedtime;
		else mlp -> nextevent = now;
	}
	if (mlp -> cache.cachetime > now) 	/* oops - cached */
		mlp -> nextevent = mlp -> cache.cachetime;
	if (mlp -> mta_enabled == 0)
		mlp -> nextevent = now + MAX_SLEEP + 1;
}

void cleanup_conn (cb)
Connblk *cb;
{
	PP_TRACE (("cleanup_conn(%s)", cb_print (cb)));

	switch (cb -> cb_type) {
	    case cb_responder:
	    case cb_timer:
		break;
	    default:
		if (cb -> cb_clp) {
			Mtalist *mlp;

			if (cb -> cb_mta &&
			    (mlp = findmtalist(cb ->cb_clp, cb -> cb_mta))) {
				mlp -> nactive --;
				if (cb -> cb_state == cb_idle ||
				    cb -> cb_state == cb_proc_sent)
					delay_message (cb, "Connection broke");
			}
			cb -> cb_clp -> nactive --;
			nchansrunning --;

		}
		if (cb -> cb_ml)
			msg_unlock (cb -> cb_ml -> ms);
			
		break;
	}
	freecblk (cb);
}

static int timeout_proc ()
{
	MsgStruct **msp, *ms;

	PP_TRACE (("timeout_proc"));
	if (timeout_chan -> chan_enabled == 0)
		return;
	for (msp = msg_hash; msp < &msg_hash[HASHSIZE]; ) {
		for (ms = *msp; ms; ms = ms -> ms_forw) {
			if (ms -> m_locked)
				continue;
			if (ms -> expirytime < current_time &&
			    expiremsg (ms) == DONE)
				break;
		}
		if (!ms)
			msp ++;
	}
}

static int expiremsg (ms)
MsgStruct *ms;
{
	int first = 1;
	Reciplist *rlp;
	int i;
	Chanlist *clp;
	Mtalist *mlp;
	Mlist *ml;
	LIST_RCHAN *lrp;

	PP_TRACE (("expiremsg (%s)", ms -> queid));

	for (rlp = ms -> recips; rlp; rlp = rlp -> rp_next) {
		if (rlp -> id == 0)
			continue;
		switch (rlp -> status) {
		    case st_dr:
		    case st_delete:
		    case st_timeout:
			continue; /* leave these alone! */

		    case st_normal:
			i = rlp -> chans_done;
			for (lrp = rlp -> chans; lrp && i > 0;
			     lrp = lrp -> li_next, i--)
				continue;

			if (lrp == NULL)
				continue;
			if ((clp = findchanlist (lrp -> li_chan))
			    == NULLCHANLIST)
				continue;
			mlp = findmtalist (clp, rlp -> mta);
			if (mlp == NULLMTALIST)
				continue;
			ml = findmtamsg (mlp, ms -> queid);
			if (ml == NULLMLIST)
				continue;
			delfromchan (clp, mlp -> mtaname, ml, rlp -> id);
			rlp -> status = st_timeout;
			if (first) {
				insertinchan (timeout_chan -> chan, ms,
					      rlp, timeout_chan -> channame);
				first = 0;
			}
		}
	}
	return first == 1 ? OK : DONE;
}

static int channel_ttl ()
{
	struct AcSAPindication	acis;
	struct RoSAPindication	rois;
	register struct RoSAPindication *roi = &rois;
	Connblk *cb;

	PP_TRACE (("channel_ttl ()"));

	if (cb_once_only == 0)
		return OK;

	for (cb = CHead -> cb_forw; cb != CHead; cb = cb -> cb_forw) {
		if (cb -> cb_type == cb_channel && cb -> cb_ttl &&
		    cb -> cb_ttl < current_time) {
			PP_NOTICE (("Channel %s taking too long - aborting",
				    cb_print (cb)));
			(void) AcUAbortRequest (cb -> cb_fd, NULLPEP,
						0, &acis);
			(void) RyLose (cb -> cb_fd, roi);
			chan_lose (cb -> cb_fd, (struct AcSAPfinish *)0);
			if (cb -> cb_fd != NOTOK) {
				FD_CLR (cb -> cb_fd, &perf_rfds);
				FD_CLR (cb -> cb_fd, &perf_wfds);
			}
		}
	}
	timer_running = 0;
	if (nchansrunning > 0)
		start_timer ();
	return OK;			
}

static void start_timer ()
{
	Connblk *cb;

	if (timer_running)
		return;

	cb = newcblk (cb_timer);
	cb -> cb_proc = channel_ttl;
	cb -> cb_reload = 0;
	(void) newevblk (NULLCHANLIST, cb, cb_timer, CHAN_TIMEOUT);
	timer_running = 1;
}