Close for channels, similar to go, but the details differ because this is C. Added two functions, chanclose and chanisclosed. We also added an err field to the alt entry, which does not need to be initialized. Chanclose prevents further elements to be sent to the channel c. After closing a channel, send and recv never block. Send always returns -1. Recv returns -1 if the channel is empty. Alt may choose a CHANSND or CHANRCV that failed because the channel was closed. In this case, the err field of the Alt entry points to an error string stating that the channel was closed and the operation was completed with failure. If all entries have been selected and failed because they were closed, alt returns -1. Reference: /n/sources/patch/applied/closeforchannels Date: Tue Feb 16 15:51:08 CET 2010 Signed-off-by: paurea@gmail.com --- /sys/src/libthread/channel.c Tue Feb 16 15:50:07 2010 +++ /sys/src/libthread/channel.c Tue Feb 16 15:49:58 2010 @@ -3,6 +3,12 @@ #include #include "threadimpl.h" +/* Value to indicate the channel is closed */ +enum { + CHANCLOSD = 0xc105ed, +}; + +static char *errcl = "channel was closed"; static Lock chanlock; /* central channel access lock */ static void enqueue(Alt*, Channel**); @@ -10,15 +16,22 @@ static int canexec(Alt*); static int altexec(Alt*, int); +#define Closed ((void*)CHANCLOSD) +#define Inted ((void*)~0) + static void _chanfree(Channel *c) { int i, inuse; - inuse = 0; - for(i = 0; i < c->nentry; i++) - if(c->qentry[i]) - inuse = 1; + if(c->closed == 1) /* chanclose is ongoing */ + inuse = 1; + else{ + inuse = 0; + for(i = 0; i < c->nentry; i++) /* alt ongoing */ + if(c->qentry[i]) + inuse = 1; + } if(inuse) c->freed = 1; else{ @@ -43,6 +56,7 @@ return -1; c->f = 0; c->n = 0; + c->closed = 0; c->freed = 0; c->e = elemsize; c->s = elemcnt; @@ -64,12 +78,18 @@ return c; } +static int +isopenfor(Channel *c, int op) +{ + return c->closed == 0 || (op == CHANRCV && c->n > 0); +} + int alt(Alt *alts) { - Alt *a, *xa; + Alt *a, *xa, *ca; Channel volatile *c; - int n, s; + int n, s, waiting, allreadycl; void* r; Thread *t; @@ -96,7 +116,7 @@ xa->entryno = -1; if(xa->op == CHANNOP) continue; - + c = xa->c; if(c==nil){ unlock(&chanlock); @@ -105,34 +125,64 @@ return -1; } - if(canexec(xa)) + if(isopenfor(c, xa->op) && canexec(xa)) if(nrand(++n) == 0) a = xa; } + if(a==nil){ /* nothing can proceed */ if(xa->op == CHANNOBLK){ unlock(&chanlock); _procsplx(s); t->chan = Channone; - return xa - alts; + if(xa->op == CHANNOBLK) + return xa - alts; } - /* enqueue on all channels. */ + /* enqueue on all channels open for us. */ c = nil; + ca = nil; + waiting = 0; + allreadycl = 0; for(xa=alts; xa->op!=CHANEND; xa++){ if(xa->op==CHANNOP) continue; - enqueue(xa, &c); + if(isopenfor(xa->c, xa->op)){ + waiting = 1; + enqueue(xa, &c); + } + else if(xa->err != errcl) + ca = xa; + else + allreadycl = 1; } + if(waiting == 0) + if(ca != nil){ + /* everything was closed, select last channel */ + ca->err = errcl; + unlock(&chanlock); + _procsplx(s); + t->chan = Channone; + return ca - alts; + } + else if(allreadycl){ + /* everything was already closed */ + unlock(&chanlock); + _procsplx(s); + t->chan = Channone; + return -1; + } /* * wait for successful rendezvous. * we can't just give up if the rendezvous * is interrupted -- someone else might come * along and try to rendezvous with us, so * we need to be here. + * if the channel was closed, the op is done + * and we flag an error for the entry. */ Again: unlock(&chanlock); @@ -141,7 +191,7 @@ s = _procsplhi(); lock(&chanlock); - if(r==(void*)~0){ /* interrupted */ + if(r==Inted){ /* interrupted */ if(c!=nil) /* someone will meet us; go back */ goto Again; c = (Channel*)~0; /* so no one tries to meet us */ @@ -152,8 +202,12 @@ for(xa=alts; xa->op!=CHANEND; xa++){ if(xa->op==CHANNOP) continue; - if(xa->c == c) + if(xa->c == c){ a = xa; + a->err = nil; + if(r == Closed) + a->err = errcl; + } dequeue(xa); } unlock(&chanlock); @@ -162,14 +216,69 @@ assert(c==(Channel*)~0); return -1; } - }else{ + }else altexec(a, s); /* unlocks chanlock, does splx */ - } _sched(); t->chan = Channone; return a - alts; } +int +chanclose(Channel *c) +{ + Alt *a; + int i, s, some; + + s = _procsplhi(); /* note handlers; see :/^alt */ + lock(&chanlock); + if(c->closed){ + /* Already close; we fail but it's ok. don't print */ + unlock(&chanlock); + _procsplx(s); + return -1; + } + c->closed = 1; /* Being closed */ + + /* + * locate entries that will fail due to close + * (send, and receive if nothing buffered) and wake them up. + * Continue doing so until we make a full pass with no work, + * otherwise we might miss alts being made while the lock is released. + * Hopefully this is O(2n) and not O(n*n) + */ + do{ + some = 0; + for(i=0; inentry; i++){ + if((a = c->qentry[i]) == nil || *a->tag != nil) + continue; + if(a->op != CHANSND && (a->op != CHANRCV || c->n != 0)) + continue; + *a->tag = c; + unlock(&chanlock); + _procsplx(s); + while(_threadrendezvous(a->tag, Closed) == Inted) + ; + s = _procsplhi(); + lock(&chanlock); + some++; + } + }while(some); + + c->closed = 2; /* Fully closed */ + if(c->freed) + _chanfree(c); + unlock(&chanlock); + _procsplx(s); + return 0; +} + +int +chanisclosed(Channel *c) +{ + /* No need to get the lock */ + return c->closed != 0; +} + static int runop(int op, Channel *c, void *v, int nb) { @@ -184,6 +293,7 @@ a[0].op = op; a[0].c = c; a[0].v = v; + a[0].err = nil; a[1].op = CHANEND; if(nb) a[1].op = CHANNOBLK; @@ -194,6 +304,11 @@ assert(nb); return 0; case 0: + /* Ok. but return -1 if the op is done because of a + * close. + */ + if(a[0].err != nil) + return -1; return 1; default: fprint(2, "ERROR: channel alt returned %d\n", r); @@ -350,7 +465,8 @@ if(c->qentry[i]==a){ _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c); c->qentry[i] = nil; - if(c->freed) + /* release if freed and not closing */ + if(c->freed && c->closed != 1) _chanfree(c); return; } @@ -467,7 +583,7 @@ unlock(&chanlock); _procsplx(spl); _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock); - while(_threadrendezvous(b->tag, 0) == (void*)~0) + while(_threadrendezvous(b->tag, 0) == Inted) ; return 1; } --- /sys/include/thread.h Tue Feb 16 15:50:22 2010 +++ /sys/include/thread.h Tue Feb 16 15:50:12 2010 @@ -28,6 +28,7 @@ int freed; /* Set when channel is being deleted */ volatile Alt **qentry; /* Receivers/senders waiting (malloc) */ volatile int nentry; /* # of entries malloc-ed */ + volatile int closed; /* channel is closed */ uchar v[1]; /* Array of s values in the channel */ }; @@ -45,6 +46,7 @@ Channel *c; /* channel */ void *v; /* pointer to value */ ChanOp op; /* operation */ + char *err; /* did the op fail? */ /* * the next variables are used internally to alt * they need not be initialized @@ -58,8 +60,10 @@ }; int alt(Alt alts[]); +int chanclose(Channel*); Channel*chancreate(int elemsize, int bufsize); int chaninit(Channel *c, int elemsize, int elemcnt); +int chanisclosed(Channel *c); void chanfree(Channel *c); int chanprint(Channel *, char *, ...); long decref(Ref *r); /* returns 0 iff value is now zero */ --- /sys/man/2/thread Tue Feb 16 15:50:45 2010 +++ /sys/man/2/thread Tue Feb 16 15:50:31 2010 @@ -1,9 +1,11 @@ .TH THREAD 2 .SH NAME alt, +chanclose, chancreate, chanfree, chaninit, +chanisclosed, chanprint, mainstacksize, proccreate, @@ -48,20 +50,27 @@ #include #include .sp -#define CHANEND 0 -#define CHANSND 1 -#define CHANRCV 2 -#define CHANNOP 3 -#define CHANNOBLK 4 +typedef enum { + CHANEND, + CHANSND, + CHANRCV, + CHANNOP, + CHANNOBLK, +} ChanOp; .sp .ta \w' 'u +\w'Channel 'u typedef struct Alt Alt; struct Alt { - Channel *c; - void *v; - int op; - Channel **tag; - int entryno; + Channel *c; /* channel */ + void *v; /* pointer to value */ + ChanOp op; /* operation */ + char *err; /* did the op fail? */ + /* + * the next variables are used internally to alt + * they need not be initialized + */ + Channel **tag; /* pointer to rendez-vous tag */ + int entryno; /* entry number */ }; .fi .de XX @@ -116,6 +125,8 @@ int nbsendp(Channel *c, void *v) int nbsendul(Channel *c, ulong v) int chanprint(Channel *c, char *fmt, ...) +int chanclose(Channel *c); +int chanisclosed(Channel *c); .XX void procexecl(Channel *cpid, char *file, ...) void procexec(Channel *cpid, char *file, char *args[]) @@ -520,12 +531,43 @@ formats its arguments in the manner of .IR print (2) and sends the result to the channel -.IR c. +.IR c . The string delivered by .I chanprint is allocated with .IR malloc (2) and should be freed upon receipt. +.PP +.I Chanclose +prevents further elements to be sent to the channel +.IR c . +After closing a channel, +.I send +and +.I recv +never block. +.I Send +always +returns \-1. +.I Recv +returns \-1 if the channel is empty. +.I Alt +may choose a +.B CHANSND +or +.B CHANRCV +that failed because the channel was closed. +In this case, the +.B err +field of the +.B Alt +entry points to an error string stating that the +channel was closed and the operation was completed +with failure. +If all entries have been selected and failed because +they were closed, +.I alt +returns \-1. .SS Errors, notes and resources Thread library functions do not return on failure; if errors occur, the entire program is aborted.