~kvik/mq

668a30f10fce153d238bd784f632338d735ad6e3 — kvik 1 year, 2 months ago 58eb78b
mq: various code quality improvements
3 files changed, 249 insertions(+), 271 deletions(-)

M src/list.c
M src/list.h
M src/mq.c
M src/list.c => src/list.c +17 -40
@@ 4,56 4,33 @@
#include "list.h"
#include "util.h"

List*
listalloc(void)
Listelem*
listinit(Listelem *l)
{
	List *n;

	n = emalloc(sizeof(List));
	n->tag = Listlead;
	n->link = n;
	n->tail = n;
	return n;
	l->front = l->back = l;
	return l;
}

List*
listlink(List *p, List *n)
Listelem*
listlink(Listelem *list, Listelem *n)
{
	n->link = p->link;
	p->link = n;
	n->tail = p;
	n->link->tail = n;
	n->front = list->front;
	n->back = list;
	((Listelem*)list->front)->back = n;
	list->front = n;
	return n;
}

List*
listunlink(List *p)
{
	p->link->tail = p->tail;
	p->tail->link = p->link;
	return p;
}

int
listisempty(List *p)
Listelem*
listunlink(Listelem *n)
{
	return p->link == p;
}

int
listislead(List *p)
{
	return p->tag == Listlead;
}

int
listisfirst(List *p)
{
	return p->tail->tag == Listlead;
	((Listelem*)n->front)->back = n->back;
	((Listelem*)n->back)->front = n->front;
	return n;
}

int
listislast(List *p)
listisempty(Listelem *list)
{
	return p->link->tag == Listlead;
	return list->front == list;
}

M src/list.h => src/list.h +19 -18
@@ 1,22 1,23 @@
enum { Listlead = 0xAA };
typedef struct Listelem Listelem;

typedef struct List List;

/* Must be embedded at the top of struct */
struct List {
	uchar tag;
	List *link;
	List *tail;
struct Listelem {
	void *front;
	void *back;
};

/* What. */
#define foreach(type, list) \
	for(type ptr = listislead((list)) ? (type)(list)->link : (list); ptr->tag != Listlead; ptr = (type)ptr->link)
Listelem* listinit(Listelem*);
Listelem* listlink(Listelem*, Listelem*);
Listelem* listunlink(Listelem*);
int listisempty(Listelem*);
int listisfirst(Listelem*, Listelem*);
int listislast(Listelem*, Listelem*);

#define listeach(type, sentinel, ptr) \
	for(type _next = (sentinel)->front; \
	    (ptr) = _next, _next = (ptr)->front, (ptr) != (sentinel); )

#define listrange(type, sentinel, ptr) \
	for(type _next; \
	    _next = (ptr)->front, (ptr) != (sentinel); \
	    (ptr) = _next)

List* listalloc(void);
List* listlink(List*, List*);
List* listunlink(List*);
int listisempty(List*);
int listislead(List*);
int listisfirst(List*);
int listislast(List*);

M src/mq.c => src/mq.c +213 -213
@@ 13,40 13,41 @@ typedef struct Client Client;
typedef struct Read Read;
typedef struct Write Write;

struct Group {
	Stream *streams;
	Stream *order;
struct Read {
	Listelem;

	enum {Message, Coalesce} mode;
	enum {Replayoff, Replaylast, Replayall} replay;
	Req *req;
};

struct Write {
	Listelem;

	/* Twrite.ifcall */
	vlong offset;
	uint count;
	uchar *data;
};

struct Stream {
	List;
	Listelem;

	Group *parent;
	Group *group;
	Write *wqueue;
	Read *rqueue;
};

struct Client {
	Write *cursor;
	vlong offset;
};

struct Read {
	List;
struct Group {
	Stream *streams;
	Stream *order;

	Req *r;
	enum {Message, Coalesce} mode;
	enum {Replayoff, Replaylast, Replayall} replay;
};

struct Write {
	List;

	/* Twrite.ifcall */
struct Client {
	Write *cursor;
	vlong offset;
	uint count;
	uchar *data;
	int blocked;
};

enum {


@@ 66,7 67,7 @@ filesettype(File *f, ushort type)
	 * This depends on the 9pfile(2) library generating
	 * simple incremental qid paths.
	 */
	f->qid.path &= ~(uvlong)0xF<<60;
	f->qid.path &= ~((uvlong)0xF<<60);
	f->qid.path |= (uvlong)(type&0xF)<<60;
}



@@ 77,7 78,7 @@ filetype(File *f)
}

File*
groupcreate(File *parent, char *name, char *uid, ulong perm)
groupcreate(File *dir, char *name, char *uid, ulong perm)
{
	Stream *streamalloc(Group*);
	void *streamclose(Stream*);


@@ 85,18 86,18 @@ groupcreate(File *parent, char *name, char *uid, ulong perm)
	Group *group;

	group = emalloc(sizeof(Group));
	group->streams = (Stream*)listalloc();
	group->order = (Stream*)streamalloc(group);
	group->streams = (Stream*)listinit(emalloc(sizeof(Stream)));
	group->order = streamalloc(group);
	group->mode = Message;
	group->replay = Replayoff;

	ctl = order = nil;
	if(strcmp(name, "/") == 0){
		d = parent;
		d = dir;
		d->aux = group;
	}
	else
		d = createfile(parent, name, uid, perm, group);
		d = createfile(dir, name, uid, perm, group);
	if(d == nil)
		goto err;
	filesettype(d, Qgroup);


@@ 113,7 114,6 @@ groupcreate(File *parent, char *name, char *uid, ulong perm)

	return d;
err:
	free(group->streams);
	streamclose(group->order);
	if(d) closefile(d);
	if(ctl) closefile(ctl);


@@ 122,22 122,20 @@ err:
}

void
groupclose(File *f)
groupclose(Group *g)
{
	Group *group = f->aux;

	free(group);
	free(g);
}

Stream*
streamalloc(Group *group)
streamalloc(Group *g)
{
	Stream *s;
	
	s = emalloc(sizeof(Stream));
	s->parent = group;
	s->wqueue = (Write*)listalloc();
	s->rqueue = (Read*)listalloc();
	s->group = g;
	s->wqueue = (Write*)listinit(emalloc(sizeof(Write)));
	s->rqueue = (Read*)listinit(emalloc(sizeof(Read)));
	return s;
}



@@ 147,134 145,133 @@ streamclose(Stream *s)
	Read *r;
	Write *w;

	listunlink(s);
	if(s->rqueue)
	foreach(Read*, s->rqueue){
		/* eof these? */
		r = ptr;
		ptr = (Read*)r->tail;
	listeach(Read*, s->rqueue, r){
		listunlink(r);
		free(r);
	}
	free(s->rqueue);
	if(s->wqueue)
	foreach(Write*, s->wqueue){
		w = ptr;
		ptr = (Write*)w->tail;
	free(s->wqueue);
	listeach(Write*, s->wqueue, w){
		listunlink(w);
		free(w);
	}
	free(s->wqueue);
	listunlink(s);
	free(s);
}

File*
streamcreate(File *parent, char *name, char *uid, ulong perm)
streamcreate(File *dir, char *name, char *uid, ulong perm)
{
	File *f;
	Group *group;
	Stream *s;

	group = parent->aux;
	group = dir->aux;
	s = streamalloc(group);
	if((f = createfile(parent, name, uid, perm, s)) == nil){
	if((f = createfile(dir, name, uid, perm, s)) == nil){
		streamclose(s);
		return nil;
	}
	listlink(group->streams, s);
	filesettype(f, Qstream);
	listlink(group->streams, s);
	return f;
}

void
streamopen(Stream *s, Req *r)
streamopen(Stream *s, Req *req)
{
	Client *c;
	
	c = r->fid->aux = emalloc(sizeof(Client));
	switch(s->parent->replay){
	c = req->fid->aux = emalloc(sizeof(Client));
	switch(s->group->replay){
	case Replayoff:
		c->cursor = (Write*)s->wqueue->tail; break;
	case Replaylast:
		c->cursor = (Write*)s->wqueue->tail->tail; break;
	case Replayall:
		c->cursor = (Write*)s->wqueue; break;
	}
}
		c->offset = 0;
		c->blocked = 1;
		c->cursor = nil;
		break;

	case Replayall:
		c->offset = 0;
		if(listisempty(s->wqueue)){
			c->blocked = 1;
			c->cursor = nil;
		}else{
			c->blocked = 0;
			c->cursor = s->wqueue->front;
		}
		break;

void
respondmessage(Req *r)
{
	int n;
	Client *c = r->fid->aux;
	Write *w = c->cursor;
	
	n = w->count;
	if(n > r->ifcall.count)
		n = r->ifcall.count;
	r->ofcall.count = n;
	memmove(r->ofcall.data, w->data, n);
	respond(r, nil);
	case Replaylast:
		c->offset = 0;
		if(listisempty(s->wqueue)){
			c->blocked = 1;
			c->cursor = nil;
		}else{
			c->blocked = 0;
			c->cursor = s->wqueue->back;
		}
		break;
	}
}

void
respondcoalesce(Req *r)
streamrespond(Req *req, int mode)
{
	Client *c = r->fid->aux;
	Client *c = req->fid->aux;
	Stream *s = req->fid->file->aux;
	Write *w;
	/* request size and offset, chunk size and offset, total read */
	vlong rn, ro, n, o, t;

	ro = 0; o = 0; t = 0;
	rn = r->ifcall.count;
	/* request size, response buffer offset */
	vlong rn, ro;
	/* chunk size and offset, total read */
	vlong n, o, t;

	t = 0;
	rn = req->ifcall.count;
	ro = 0;
	w = c->cursor;
	foreach(Write*, w){
		w = ptr;
		for(o = c->offset; n = w->count - o, n > 0; o += n){
	o = c->offset;
	listrange(Write*, s->wqueue, w){
		if(mode == Message && w != c->cursor)
			break;
		for(; n = w->count - o, n > 0; o += n, ro += n, t += n){
			if(t == rn)
				goto done;
			if(n > rn - ro)
				n = rn - ro;
			memmove(r->ofcall.data+ro, w->data+o, n);
			ro += n; t += n;
			memmove(req->ofcall.data+ro, w->data+o, n);
		}
		c->offset = 0;
		o = 0;
	}
done:
	c->cursor = w;
	req->ofcall.count = t;
	respond(req, nil);
	
	/* Determine the Client state */
	if(w == s->wqueue){
		c->offset = 0;
		c->blocked = 1;
		c->cursor = nil;
		return;
	}
	c->offset = o;
	r->ofcall.count = t;
	respond(r, nil);
	c->blocked = 0;
	c->cursor = w;
}

void
streamread(Req *r)
streamread(Req *req)
{
	File *f = r->fid->file;
	Stream *s = f->aux;
	Client *c = r->fid->aux;
	Read *rd;

	/* Delay the response if the wqueue is empty
	 * or if we've already caught up, respond otherwise. */
	switch(s->parent->mode){
	case Message:
		if(listisempty(s->wqueue) || listislast(c->cursor))
			break;
		c->cursor = (Write*)c->cursor->link;
		respondmessage(r);
		return;
	case Coalesce:
		if(listisempty(s->wqueue)
		|| (listislast(c->cursor) && c->offset == c->cursor->count))
			break;
		respondcoalesce(r);
	Client *c = req->fid->aux;
	Stream *s = req->fid->file->aux;
	Read *r;
	
	if(c->blocked){
		r = emalloc(sizeof(Read));
		r->req = req;
		listlink(s->rqueue, r);
		return;
	}
	rd = emalloc(sizeof(Read));
	rd->r = r;
	listlink(s->rqueue, rd);
	streamrespond(req, s->group->mode);
}

Write*


@@ 288,64 285,66 @@ writealloc(long n)
}

void
streamwrite(Req *r)
streamwrite(Req *req)
{
	File *f = r->fid->file;
	Stream *s = f->aux;
	Group *group = s->parent;
	Write *w, *o;
	File *f = req->fid->file;
	Stream *s = req->fid->file->aux;
	Group *group = s->group;
	Write *w, *wq, *o, *oq;
	Read *r;
	Client *c;
	long n;
	
	wq = s->wqueue;
	oq = group->order->wqueue;

	/* Commit to wqueue */
	w = writealloc(r->ifcall.count);
	w->count = r->ifcall.count;
	w->offset = r->ifcall.offset;
	memmove(w->data, r->ifcall.data, w->count);
	listlink(s->wqueue->tail, w);
	/* Commit to queue */
	w = writealloc(req->ifcall.count);
	w->count = req->ifcall.count;
	w->offset = req->ifcall.offset;
	memmove(w->data, req->ifcall.data, w->count);
	listlink(wq->back, w);

	/* Commit to order */
	/* Commit to group order queue */
	n = strlen(f->name)+1;
	o = writealloc(n);
	o->offset = 0;
	o->count = n;
	memmove(o->data, f->name, n);
	listlink(group->order->wqueue->tail, o);

	listlink(oq->back, o);
 
	/* Kick the blocked stream readers */
	foreach(Read*, s->rqueue){
		Client *c = ptr->r->fid->aux;

	listeach(Read*, s->rqueue, r){
		c = r->req->fid->aux;
		
		c->cursor = w;
		c->offset = 0;
		switch(group->mode){
		case Message:
			respondmessage(ptr->r); break;
		case Coalesce:
			respondcoalesce(ptr->r); break;
		}
		ptr = (Read*)ptr->tail;
		free(listunlink(ptr->link));
		c->blocked = 0;
		streamrespond(r->req, group->mode);
		listunlink(r);
		free(r);
	}

	/* Kick the blocked order readers */
	foreach(Read*, group->order->rqueue){
		Client *c = ptr->r->fid->aux;

	listeach(Read*, group->order->rqueue, r){
		c = r->req->fid->aux;
		
		c->cursor = o;
		respondmessage(ptr->r);
		ptr = (Read*)ptr->tail;
		free(listunlink(ptr->link));
		c->offset = 0;
		c->blocked = 0;
		streamrespond(r->req, Message);
		listunlink(r);
		free(r);
	}

	r->ofcall.count = r->ifcall.count;
	respond(r, nil);
	req->ofcall.count = req->ifcall.count;
	respond(req, nil);
}

void
ctlread(Req *r)
ctlread(Req *req)
{
	File *f = r->fid->file;
	Group *group = f->aux;
	Group *group = req->fid->file->aux;
	char buf[256];

	char *mode2str[] = {


@@ 359,8 358,8 @@ ctlread(Req *r)
	};
	snprint(buf, sizeof buf, "data %s\nreplay %s\n",
		mode2str[group->mode], replay2str[group->replay]);
	readstr(r, buf);
	respond(r, nil);
	readstr(req, buf);
	respond(req, nil);
}

enum {


@@ 381,18 380,17 @@ Cmdtab groupcmd[] = {
};

void
ctlwrite(Req *r)
ctlwrite(Req *req)
{
	File *f = r->fid->file;
	Group *group = f->aux;
	Group *group = req->fid->file->aux;
	char *e = nil;
	Cmdbuf *cmd;
	Cmdtab *t;

	cmd = parsecmd(r->ifcall.data, r->ifcall.count);
	cmd = parsecmd(req->ifcall.data, req->ifcall.count);
	t = lookupcmd(cmd, groupcmd, nelem(groupcmd));
	if(t == nil){
		respondcmderror(r, cmd, "%r");
		respondcmderror(req, cmd, "%r");
		free(cmd);
		return;
	}


@@ 441,92 439,93 @@ ctlwrite(Req *r)
		break;
	}}
	free(cmd);
	respond(r, e);
	respond(req, e);
}

void
xcreate(Req *r)
xcreate(Req *req)
{
	char *name = r->ifcall.name;
	char *uid = r->fid->uid;
	ulong perm = r->ifcall.perm;
	File *parent = r->fid->file;
	char *name = req->ifcall.name;
	char *uid = req->fid->uid;
	ulong perm = req->ifcall.perm;
	File *group = req->fid->file;
	File *f = nil;

	switch(filetype(parent)){
	switch(filetype(group)){
	case Qroot:
	case Qgroup:
		if(perm&DMDIR)
			f = groupcreate(parent, name, uid, perm);
			f = groupcreate(group, name, uid, perm);
		else{
			f = streamcreate(parent, name, uid, perm);
			r->fid->file = f;
			r->ofcall.qid = f->qid;
			streamopen(f->aux, r);
			f = streamcreate(group, name, uid, perm);
			req->fid->file = f;
			req->ofcall.qid = f->qid;
			streamopen(f->aux, req);
		}
		break;
	}
	if(f == nil)
		respond(r, "internal failure");
		respond(req, "internal failure");
	else
		respond(r, nil);
		respond(req, nil);
}

void
xopen(Req *r)
xopen(Req *req)
{
	File *f = r->fid->file;
	File *f = req->fid->file;

	switch(filetype(f)){
	case Qstream:
	case Qorder:
		streamopen(f->aux, r);
		streamopen(f->aux, req);
		break;
	}
	respond(r, nil);
	respond(req, nil);
}

void
xwrite(Req *r)
xwrite(Req *req)
{
	File *f = r->fid->file;
	File *f = req->fid->file;

	switch(filetype(f)){
	case Qstream:
		streamwrite(r);
		streamwrite(req);
		break;
	case Qctl:
		ctlwrite(r);
		ctlwrite(req);
		break;
	default:
		respond(r, "forbidden");
		respond(req, "forbidden");
		return;
	}
}

void
xread(Req *r)
xread(Req *req)
{
	File *f = r->fid->file;
	File *f = req->fid->file;

	switch(filetype(f)){
	case Qstream:
	case Qorder:
		streamread(r);
		streamread(req);
		break;
	case Qctl:
		ctlread(r);
		ctlread(req);
		break;
	default:
		respond(r, "forbidden");
		respond(req, "forbidden");
	}
}

void
xflush(Req *r)
xflush(Req *req)
{
	Req *old = r->oldreq;
	Req *old = req->oldreq;
	File *f = old->fid->file;
	Read *r;

	switch(filetype(f)){
	case Qstream:


@@ 535,25 534,26 @@ xflush(Req *r)

		if(old->ifcall.type != Tread)
			break;
		foreach(Read*, s->rqueue){
			if(ptr->r == old){
				free(listunlink(ptr));
		listeach(Read*, s->rqueue, r){
			if(r->req == old){
				listunlink(r);
				free(r);
				break;
			}
		}
		respond(old, "interrupted");
	}}
	respond(r, nil);
	respond(req, nil);
}

void
xwstat(Req *r)
xwstat(Req *req)
{
	File *w, *f = r->fid->file;
	char *uid = r->fid->uid;
	File *w, *f = req->fid->file;
	char *uid = req->fid->uid;

	/* To change name, must have write permission in parent. */
	if(r->d.name[0] != '\0' && strcmp(r->d.name, f->name) != 0){
	/* To change name, must have write permission in group. */
	if(req->d.name[0] != '\0' && strcmp(req->d.name, f->name) != 0){
		if((w = f->parent) == nil)
			goto perm;
		incref(w);


@@ 561,9 561,9 @@ xwstat(Req *r)
			closefile(w);
			goto perm;
		}
		if((w = walkfile(w, r->d.name)) != nil){
		if((w = walkfile(w, req->d.name)) != nil){
			closefile(w);
			respond(r, "file already exists");
			respond(req, "file already exists");
			return;
		}
	}


@@ 571,47 571,47 @@ xwstat(Req *r)
	/* To change group, must be owner and member of new group,
	 * or leader of current group and leader of new group.
	 * Second case cannot happen, but we check anyway. */
	while(r->d.gid[0] != '\0' && strcmp(f->gid, r->d.gid) != 0){
	while(req->d.gid[0] != '\0' && strcmp(f->gid, req->d.gid) != 0){
		if(strcmp(uid, f->uid) == 0)
			break;
		if(strcmp(uid, f->gid) == 0)
		if(strcmp(uid, r->d.gid) == 0)
		if(strcmp(uid, req->d.gid) == 0)
			break;
		respond(r, "not owner");
		respond(req, "not owner");
		return;
	}

	/* To change mode, must be owner or group leader.
	 * Because of lack of users file, leader=>group itself. */
	if(r->d.mode != ~0 && f->mode != r->d.mode){
	if(req->d.mode != ~0 && f->mode != req->d.mode){
		if(strcmp(uid, f->uid) != 0)
		if(strcmp(uid, f->gid) != 0){
			respond(r, "not owner");
			respond(req, "not owner");
			return;
		}
	}

	if(r->d.name[0] != '\0'){
	if(req->d.name[0] != '\0'){
		free(f->name);
		f->name = estrdup(r->d.name);
		f->name = estrdup(req->d.name);
	}
	if(r->d.uid[0] != '\0'){
	if(req->d.uid[0] != '\0'){
		free(f->uid);
		f->uid = estrdup(r->d.uid);
		f->uid = estrdup(req->d.uid);
	}
	if(r->d.gid[0] != '\0'){
	if(req->d.gid[0] != '\0'){
		free(f->gid);
		f->gid = estrdup(r->d.gid);
		f->gid = estrdup(req->d.gid);
	}
	if(r->d.mode != ~0){
		f->mode = r->d.mode;
	if(req->d.mode != ~0){
		f->mode = req->d.mode;
		f->qid.type = f->mode >> 24;
	}

	respond(r, nil);
	respond(req, nil);
	return;
perm:
	respond(r, "permission denied");
	respond(req, "permission denied");
}

void


@@ 627,7 627,7 @@ xdestroyfile(File *f)
{
	switch(filetype(f)){
	case Qgroup:
		groupclose(f);
		groupclose(f->aux);
		break;
	case Qstream:
		streamclose(f->aux);