# HG changeset patch # User Francisco J Ballesteros # Date 1329499160 0 # Node ID a8d2211aa14a250db6de4f18eb4cf2afdfe0d92a # Parent 50a0f958bd0ce7086474983c2b2bf0b8d5993e07 new: worker library. Useful to keep pools of threads/procs to handle requests. This comes from Plan B, and will be used in creepy. R=nixiedev, quanstro CC=nix-dev http://codereview.appspot.com/5671054 diff -r 50a0f958bd0c -r a8d2211aa14a sys/include/worker.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sys/include/worker.h Fri Feb 17 17:19:20 2012 +0000 @@ -0,0 +1,8 @@ +#pragma lib "libworker.a" +#pragma src "/sys/src/libworker" + +typedef char* (*Worker)(void *arg, void **aux); +int getworker(Worker work, void *arg, Channel *rc); +void workerdebug(int); + +extern int (*workerthreadcreate)(void(*)(void*), void*, uint); diff -r 50a0f958bd0c -r a8d2211aa14a sys/man/2/worker --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sys/man/2/worker Fri Feb 17 17:19:20 2012 +0000 @@ -0,0 +1,56 @@ +.TH WORKER 2 +.SH NAME +Worker, getworker, workerdebug \- auxiliary worker threads +.SH SYNOPSIS +.B #include +.br +.B #include +.br +.B #include +.br +.B #include +.PP +.B +typedef char* (*Worker)(void *arg, void **aux); +.br +.B +int getworker(Worker work, void *arg, Channel *rc); +.br +.B +void workerdebug(int); +.B +extern int (*workerthreadcreate)(void(*)(void*), void*, uint); +.SH DESCRIPTION +This library provices a pool of worker threads to handle user requests. +Worker threads are created on demand and never destroyed. +Idle workers are kept around waiting for more busy times. +.PP +.I Getworker +allocates a new worker to perform +.I work +on the given +.I arg , +as supplied by the caller. If +.I rc +is non null, the result value of +.I work +is sent through it upon completion. +.PP +The argument +.I aux +given to +.I work +points to a per-worker storage area that may hold a single pointer. +This is useful, for example, to keep reply channels cached. +.PP +The global +.I workerthreadcreate +is used to create new worker thread. By default, it is initialized to +.I threadcreate (2). +Initialize with the address of +.I proccreate +to create worker processes instead of threads. +.SH SOURCE +.B /sys/src/libworker +.SH BUGS +This is a new library and is expected to evolve. diff -r 50a0f958bd0c -r a8d2211aa14a sys/src/libworker/mkfile --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sys/src/libworker/mkfile Fri Feb 17 17:19:20 2012 +0000 @@ -0,0 +1,15 @@ + +#include +#include +#include "worker.h" + +typedef struct Work Work; +typedef struct Workproc Workproc; + +enum +{ + Stack = 32 * 1024 +}; + +struct Work +{ + Worker work; + void* arg; + Channel*rc; /* of char* */ +}; + +struct Workproc +{ + Workproc* next; + Channel* wc; + void* aux; /* per-worker storage */ + int id; + Work; +}; + +static Channel *workerc; /* of Work[N] */ +static Channel *workerdonec; /* of Workproc* */ +static Channel *workeridc; /* of ulong */ +static int debug; + +typedef int (*Forker)(void(*)(void*), void*, uint); + +Forker workerthreadcreate = threadcreate; + +static void +workproc(void *a) +{ + Workproc *w; + int id; + char *r; + + w = a; + threadsetname("worker"); + id = threadid(); + if(debug) + fprint(2, "worker %d: started\n", id); + sendul(w->wc, threadid()); + for(;;){ + if(recvul(w->wc) == ~0) + break; + if(debug) + fprint(2, "worker %d: work %p\n", id, w->work); + r = w->work(w->arg, &w->aux); + if(debug && r != nil) + fprint(2, "worker %d: work %p: %s\n", id, w->work, r); + if(w->rc != nil) + sendp(w->rc, r); + w->work = nil; + w->arg = nil; + w->rc = nil; + if(debug) + fprint(2, "worker %d: idle\n", id); + sendp(workerdonec, w); + } + if(debug) + fprint(2, "worker %d: exiting\n", id); + threadexits(nil); +} + +static void +ctlproc(void*) +{ + Work w; + Workproc *wp, *wl; + Alt a[] = { + {workerc, &w, CHANRCV}, + {workerdonec, &wp, CHANRCV}, + {nil, nil, CHANEND} + }; + + threadsetname("workctl"); + wl = nil; + for(;;){ + switch(alt(a)){ + case 0: + if(wl == nil){ + wl = mallocz(sizeof *wl, 1); + if(wl == nil) + sysfatal("ctlproc: no memory"); + wl->wc = chancreate(sizeof(ulong), 0); + if(wl->wc == nil) + sysfatal("chancreate"); + if(workerthreadcreate(workproc, wl, Stack) < 0) + sysfatal("threadcreate"); + wl->id = recvul(wl->wc); + } + wp = wl; + wl = wl->next; + wp->Work = w; + sendul(wp->wc, 0); + sendul(workeridc, wp->id); + break; + case 1: + wp->next = wl; + wl = wp; + break; + default: + sysfatal("alt"); + } + } +} + +static void +init(void) +{ + if(workerc != nil) + return; + workerc = chancreate(sizeof(Work), 0); + workeridc = chancreate(sizeof(ulong), 1); + workerdonec = chancreate(sizeof(Workproc*), 0); + if(workerc == nil || workeridc == nil || workerdonec == nil) + sysfatal("chancreate"); + if(threadcreate(ctlproc, nil, Stack) < 0) + sysfatal("threadcreate"); +} + +int +getworker(Worker work, void *arg, Channel *rc) +{ + Work w; + + init(); + w.work = work; + w.arg = arg; + w.rc = rc; + send(workerc, &w); + return recvul(workeridc); +} + +void +workerdebug(int on) +{ + debug = on; +}