@@ 13,40 13,41 @@ typedef struct Client Client;
typedef struct Read Read;
typedef struct Write Write;
-struct Group {
- Stream *streams;
- Stream *order;
+struct Read {
+ Listelem;
- enum {Message, Coalesce} mode;
- enum {Replayoff, Replaylast, Replayall} replay;
+ Req *req;
+};
+
+struct Write {
+ Listelem;
+
+ /* Twrite.ifcall */
+ vlong offset;
+ uint count;
+ uchar *data;
};
struct Stream {
- List;
+ Listelem;
- Group *parent;
+ Group *group;
Write *wqueue;
Read *rqueue;
};
-struct Client {
- Write *cursor;
- vlong offset;
-};
-
-struct Read {
- List;
+struct Group {
+ Stream *streams;
+ Stream *order;
- Req *r;
+ enum {Message, Coalesce} mode;
+ enum {Replayoff, Replaylast, Replayall} replay;
};
-struct Write {
- List;
-
- /* Twrite.ifcall */
+struct Client {
+ Write *cursor;
vlong offset;
- uint count;
- uchar *data;
+ int blocked;
};
enum {
@@ 66,7 67,7 @@ filesettype(File *f, ushort type)
* This depends on the 9pfile(2) library generating
* simple incremental qid paths.
*/
- f->qid.path &= ~(uvlong)0xF<<60;
+ f->qid.path &= ~((uvlong)0xF<<60);
f->qid.path |= (uvlong)(type&0xF)<<60;
}
@@ 77,7 78,7 @@ filetype(File *f)
}
File*
-groupcreate(File *parent, char *name, char *uid, ulong perm)
+groupcreate(File *dir, char *name, char *uid, ulong perm)
{
Stream *streamalloc(Group*);
void *streamclose(Stream*);
@@ 85,18 86,18 @@ groupcreate(File *parent, char *name, char *uid, ulong perm)
Group *group;
group = emalloc(sizeof(Group));
- group->streams = (Stream*)listalloc();
- group->order = (Stream*)streamalloc(group);
+ group->streams = (Stream*)listinit(emalloc(sizeof(Stream)));
+ group->order = streamalloc(group);
group->mode = Message;
group->replay = Replayoff;
ctl = order = nil;
if(strcmp(name, "/") == 0){
- d = parent;
+ d = dir;
d->aux = group;
}
else
- d = createfile(parent, name, uid, perm, group);
+ d = createfile(dir, name, uid, perm, group);
if(d == nil)
goto err;
filesettype(d, Qgroup);
@@ 113,7 114,6 @@ groupcreate(File *parent, char *name, char *uid, ulong perm)
return d;
err:
- free(group->streams);
streamclose(group->order);
if(d) closefile(d);
if(ctl) closefile(ctl);
@@ 122,22 122,20 @@ err:
}
void
-groupclose(File *f)
+groupclose(Group *g)
{
- Group *group = f->aux;
-
- free(group);
+ free(g);
}
Stream*
-streamalloc(Group *group)
+streamalloc(Group *g)
{
Stream *s;
s = emalloc(sizeof(Stream));
- s->parent = group;
- s->wqueue = (Write*)listalloc();
- s->rqueue = (Read*)listalloc();
+ s->group = g;
+ s->wqueue = (Write*)listinit(emalloc(sizeof(Write)));
+ s->rqueue = (Read*)listinit(emalloc(sizeof(Read)));
return s;
}
@@ 147,134 145,133 @@ streamclose(Stream *s)
Read *r;
Write *w;
- listunlink(s);
- if(s->rqueue)
- foreach(Read*, s->rqueue){
- /* eof these? */
- r = ptr;
- ptr = (Read*)r->tail;
+ listeach(Read*, s->rqueue, r){
listunlink(r);
free(r);
}
- free(s->rqueue);
- if(s->wqueue)
- foreach(Write*, s->wqueue){
- w = ptr;
- ptr = (Write*)w->tail;
+ free(s->wqueue);
+ listeach(Write*, s->wqueue, w){
listunlink(w);
free(w);
}
free(s->wqueue);
+ listunlink(s);
free(s);
}
File*
-streamcreate(File *parent, char *name, char *uid, ulong perm)
+streamcreate(File *dir, char *name, char *uid, ulong perm)
{
File *f;
Group *group;
Stream *s;
- group = parent->aux;
+ group = dir->aux;
s = streamalloc(group);
- if((f = createfile(parent, name, uid, perm, s)) == nil){
+ if((f = createfile(dir, name, uid, perm, s)) == nil){
streamclose(s);
return nil;
}
- listlink(group->streams, s);
filesettype(f, Qstream);
+ listlink(group->streams, s);
return f;
}
void
-streamopen(Stream *s, Req *r)
+streamopen(Stream *s, Req *req)
{
Client *c;
- c = r->fid->aux = emalloc(sizeof(Client));
- switch(s->parent->replay){
+ c = req->fid->aux = emalloc(sizeof(Client));
+ switch(s->group->replay){
case Replayoff:
- c->cursor = (Write*)s->wqueue->tail; break;
- case Replaylast:
- c->cursor = (Write*)s->wqueue->tail->tail; break;
- case Replayall:
- c->cursor = (Write*)s->wqueue; break;
- }
-}
+ c->offset = 0;
+ c->blocked = 1;
+ c->cursor = nil;
+ break;
+ case Replayall:
+ c->offset = 0;
+ if(listisempty(s->wqueue)){
+ c->blocked = 1;
+ c->cursor = nil;
+ }else{
+ c->blocked = 0;
+ c->cursor = s->wqueue->front;
+ }
+ break;
-void
-respondmessage(Req *r)
-{
- int n;
- Client *c = r->fid->aux;
- Write *w = c->cursor;
-
- n = w->count;
- if(n > r->ifcall.count)
- n = r->ifcall.count;
- r->ofcall.count = n;
- memmove(r->ofcall.data, w->data, n);
- respond(r, nil);
+ case Replaylast:
+ c->offset = 0;
+ if(listisempty(s->wqueue)){
+ c->blocked = 1;
+ c->cursor = nil;
+ }else{
+ c->blocked = 0;
+ c->cursor = s->wqueue->back;
+ }
+ break;
+ }
}
void
-respondcoalesce(Req *r)
+streamrespond(Req *req, int mode)
{
- Client *c = r->fid->aux;
+ Client *c = req->fid->aux;
+ Stream *s = req->fid->file->aux;
Write *w;
- /* request size and offset, chunk size and offset, total read */
- vlong rn, ro, n, o, t;
-
- ro = 0; o = 0; t = 0;
- rn = r->ifcall.count;
+ /* request size, response buffer offset */
+ vlong rn, ro;
+ /* chunk size and offset, total read */
+ vlong n, o, t;
+
+ t = 0;
+ rn = req->ifcall.count;
+ ro = 0;
w = c->cursor;
- foreach(Write*, w){
- w = ptr;
- for(o = c->offset; n = w->count - o, n > 0; o += n){
+ o = c->offset;
+ listrange(Write*, s->wqueue, w){
+ if(mode == Message && w != c->cursor)
+ break;
+ for(; n = w->count - o, n > 0; o += n, ro += n, t += n){
if(t == rn)
goto done;
if(n > rn - ro)
n = rn - ro;
- memmove(r->ofcall.data+ro, w->data+o, n);
- ro += n; t += n;
+ memmove(req->ofcall.data+ro, w->data+o, n);
}
- c->offset = 0;
+ o = 0;
}
done:
- c->cursor = w;
+ req->ofcall.count = t;
+ respond(req, nil);
+
+ /* Determine the Client state */
+ if(w == s->wqueue){
+ c->offset = 0;
+ c->blocked = 1;
+ c->cursor = nil;
+ return;
+ }
c->offset = o;
- r->ofcall.count = t;
- respond(r, nil);
+ c->blocked = 0;
+ c->cursor = w;
}
void
-streamread(Req *r)
+streamread(Req *req)
{
- File *f = r->fid->file;
- Stream *s = f->aux;
- Client *c = r->fid->aux;
- Read *rd;
-
- /* Delay the response if the wqueue is empty
- * or if we've already caught up, respond otherwise. */
- switch(s->parent->mode){
- case Message:
- if(listisempty(s->wqueue) || listislast(c->cursor))
- break;
- c->cursor = (Write*)c->cursor->link;
- respondmessage(r);
- return;
- case Coalesce:
- if(listisempty(s->wqueue)
- || (listislast(c->cursor) && c->offset == c->cursor->count))
- break;
- respondcoalesce(r);
+ Client *c = req->fid->aux;
+ Stream *s = req->fid->file->aux;
+ Read *r;
+
+ if(c->blocked){
+ r = emalloc(sizeof(Read));
+ r->req = req;
+ listlink(s->rqueue, r);
return;
}
- rd = emalloc(sizeof(Read));
- rd->r = r;
- listlink(s->rqueue, rd);
+ streamrespond(req, s->group->mode);
}
Write*
@@ 288,64 285,66 @@ writealloc(long n)
}
void
-streamwrite(Req *r)
+streamwrite(Req *req)
{
- File *f = r->fid->file;
- Stream *s = f->aux;
- Group *group = s->parent;
- Write *w, *o;
+ File *f = req->fid->file;
+ Stream *s = req->fid->file->aux;
+ Group *group = s->group;
+ Write *w, *wq, *o, *oq;
+ Read *r;
+ Client *c;
long n;
+
+ wq = s->wqueue;
+ oq = group->order->wqueue;
- /* Commit to wqueue */
- w = writealloc(r->ifcall.count);
- w->count = r->ifcall.count;
- w->offset = r->ifcall.offset;
- memmove(w->data, r->ifcall.data, w->count);
- listlink(s->wqueue->tail, w);
+ /* Commit to queue */
+ w = writealloc(req->ifcall.count);
+ w->count = req->ifcall.count;
+ w->offset = req->ifcall.offset;
+ memmove(w->data, req->ifcall.data, w->count);
+ listlink(wq->back, w);
- /* Commit to order */
+ /* Commit to group order queue */
n = strlen(f->name)+1;
o = writealloc(n);
o->offset = 0;
o->count = n;
memmove(o->data, f->name, n);
- listlink(group->order->wqueue->tail, o);
-
+ listlink(oq->back, o);
+
/* Kick the blocked stream readers */
- foreach(Read*, s->rqueue){
- Client *c = ptr->r->fid->aux;
-
+ listeach(Read*, s->rqueue, r){
+ c = r->req->fid->aux;
+
c->cursor = w;
c->offset = 0;
- switch(group->mode){
- case Message:
- respondmessage(ptr->r); break;
- case Coalesce:
- respondcoalesce(ptr->r); break;
- }
- ptr = (Read*)ptr->tail;
- free(listunlink(ptr->link));
+ c->blocked = 0;
+ streamrespond(r->req, group->mode);
+ listunlink(r);
+ free(r);
}
/* Kick the blocked order readers */
- foreach(Read*, group->order->rqueue){
- Client *c = ptr->r->fid->aux;
-
+ listeach(Read*, group->order->rqueue, r){
+ c = r->req->fid->aux;
+
c->cursor = o;
- respondmessage(ptr->r);
- ptr = (Read*)ptr->tail;
- free(listunlink(ptr->link));
+ c->offset = 0;
+ c->blocked = 0;
+ streamrespond(r->req, Message);
+ listunlink(r);
+ free(r);
}
- r->ofcall.count = r->ifcall.count;
- respond(r, nil);
+ req->ofcall.count = req->ifcall.count;
+ respond(req, nil);
}
void
-ctlread(Req *r)
+ctlread(Req *req)
{
- File *f = r->fid->file;
- Group *group = f->aux;
+ Group *group = req->fid->file->aux;
char buf[256];
char *mode2str[] = {
@@ 359,8 358,8 @@ ctlread(Req *r)
};
snprint(buf, sizeof buf, "data %s\nreplay %s\n",
mode2str[group->mode], replay2str[group->replay]);
- readstr(r, buf);
- respond(r, nil);
+ readstr(req, buf);
+ respond(req, nil);
}
enum {
@@ 381,18 380,17 @@ Cmdtab groupcmd[] = {
};
void
-ctlwrite(Req *r)
+ctlwrite(Req *req)
{
- File *f = r->fid->file;
- Group *group = f->aux;
+ Group *group = req->fid->file->aux;
char *e = nil;
Cmdbuf *cmd;
Cmdtab *t;
- cmd = parsecmd(r->ifcall.data, r->ifcall.count);
+ cmd = parsecmd(req->ifcall.data, req->ifcall.count);
t = lookupcmd(cmd, groupcmd, nelem(groupcmd));
if(t == nil){
- respondcmderror(r, cmd, "%r");
+ respondcmderror(req, cmd, "%r");
free(cmd);
return;
}
@@ 441,92 439,93 @@ ctlwrite(Req *r)
break;
}}
free(cmd);
- respond(r, e);
+ respond(req, e);
}
void
-xcreate(Req *r)
+xcreate(Req *req)
{
- char *name = r->ifcall.name;
- char *uid = r->fid->uid;
- ulong perm = r->ifcall.perm;
- File *parent = r->fid->file;
+ char *name = req->ifcall.name;
+ char *uid = req->fid->uid;
+ ulong perm = req->ifcall.perm;
+ File *group = req->fid->file;
File *f = nil;
- switch(filetype(parent)){
+ switch(filetype(group)){
case Qroot:
case Qgroup:
if(perm&DMDIR)
- f = groupcreate(parent, name, uid, perm);
+ f = groupcreate(group, name, uid, perm);
else{
- f = streamcreate(parent, name, uid, perm);
- r->fid->file = f;
- r->ofcall.qid = f->qid;
- streamopen(f->aux, r);
+ f = streamcreate(group, name, uid, perm);
+ req->fid->file = f;
+ req->ofcall.qid = f->qid;
+ streamopen(f->aux, req);
}
break;
}
if(f == nil)
- respond(r, "internal failure");
+ respond(req, "internal failure");
else
- respond(r, nil);
+ respond(req, nil);
}
void
-xopen(Req *r)
+xopen(Req *req)
{
- File *f = r->fid->file;
+ File *f = req->fid->file;
switch(filetype(f)){
case Qstream:
case Qorder:
- streamopen(f->aux, r);
+ streamopen(f->aux, req);
break;
}
- respond(r, nil);
+ respond(req, nil);
}
void
-xwrite(Req *r)
+xwrite(Req *req)
{
- File *f = r->fid->file;
+ File *f = req->fid->file;
switch(filetype(f)){
case Qstream:
- streamwrite(r);
+ streamwrite(req);
break;
case Qctl:
- ctlwrite(r);
+ ctlwrite(req);
break;
default:
- respond(r, "forbidden");
+ respond(req, "forbidden");
return;
}
}
void
-xread(Req *r)
+xread(Req *req)
{
- File *f = r->fid->file;
+ File *f = req->fid->file;
switch(filetype(f)){
case Qstream:
case Qorder:
- streamread(r);
+ streamread(req);
break;
case Qctl:
- ctlread(r);
+ ctlread(req);
break;
default:
- respond(r, "forbidden");
+ respond(req, "forbidden");
}
}
void
-xflush(Req *r)
+xflush(Req *req)
{
- Req *old = r->oldreq;
+ Req *old = req->oldreq;
File *f = old->fid->file;
+ Read *r;
switch(filetype(f)){
case Qstream:
@@ 535,25 534,26 @@ xflush(Req *r)
if(old->ifcall.type != Tread)
break;
- foreach(Read*, s->rqueue){
- if(ptr->r == old){
- free(listunlink(ptr));
+ listeach(Read*, s->rqueue, r){
+ if(r->req == old){
+ listunlink(r);
+ free(r);
break;
}
}
respond(old, "interrupted");
}}
- respond(r, nil);
+ respond(req, nil);
}
void
-xwstat(Req *r)
+xwstat(Req *req)
{
- File *w, *f = r->fid->file;
- char *uid = r->fid->uid;
+ File *w, *f = req->fid->file;
+ char *uid = req->fid->uid;
- /* To change name, must have write permission in parent. */
- if(r->d.name[0] != '\0' && strcmp(r->d.name, f->name) != 0){
+ /* To change name, must have write permission in group. */
+ if(req->d.name[0] != '\0' && strcmp(req->d.name, f->name) != 0){
if((w = f->parent) == nil)
goto perm;
incref(w);
@@ 561,9 561,9 @@ xwstat(Req *r)
closefile(w);
goto perm;
}
- if((w = walkfile(w, r->d.name)) != nil){
+ if((w = walkfile(w, req->d.name)) != nil){
closefile(w);
- respond(r, "file already exists");
+ respond(req, "file already exists");
return;
}
}
@@ 571,47 571,47 @@ xwstat(Req *r)
/* To change group, must be owner and member of new group,
* or leader of current group and leader of new group.
* Second case cannot happen, but we check anyway. */
- while(r->d.gid[0] != '\0' && strcmp(f->gid, r->d.gid) != 0){
+ while(req->d.gid[0] != '\0' && strcmp(f->gid, req->d.gid) != 0){
if(strcmp(uid, f->uid) == 0)
break;
if(strcmp(uid, f->gid) == 0)
- if(strcmp(uid, r->d.gid) == 0)
+ if(strcmp(uid, req->d.gid) == 0)
break;
- respond(r, "not owner");
+ respond(req, "not owner");
return;
}
/* To change mode, must be owner or group leader.
* Because of lack of users file, leader=>group itself. */
- if(r->d.mode != ~0 && f->mode != r->d.mode){
+ if(req->d.mode != ~0 && f->mode != req->d.mode){
if(strcmp(uid, f->uid) != 0)
if(strcmp(uid, f->gid) != 0){
- respond(r, "not owner");
+ respond(req, "not owner");
return;
}
}
- if(r->d.name[0] != '\0'){
+ if(req->d.name[0] != '\0'){
free(f->name);
- f->name = estrdup(r->d.name);
+ f->name = estrdup(req->d.name);
}
- if(r->d.uid[0] != '\0'){
+ if(req->d.uid[0] != '\0'){
free(f->uid);
- f->uid = estrdup(r->d.uid);
+ f->uid = estrdup(req->d.uid);
}
- if(r->d.gid[0] != '\0'){
+ if(req->d.gid[0] != '\0'){
free(f->gid);
- f->gid = estrdup(r->d.gid);
+ f->gid = estrdup(req->d.gid);
}
- if(r->d.mode != ~0){
- f->mode = r->d.mode;
+ if(req->d.mode != ~0){
+ f->mode = req->d.mode;
f->qid.type = f->mode >> 24;
}
- respond(r, nil);
+ respond(req, nil);
return;
perm:
- respond(r, "permission denied");
+ respond(req, "permission denied");
}
void
@@ 627,7 627,7 @@ xdestroyfile(File *f)
{
switch(filetype(f)){
case Qgroup:
- groupclose(f);
+ groupclose(f->aux);
break;
case Qstream:
streamclose(f->aux);