#include <fcntl.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <time.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <sys/select.h>
#include "package.h"
#include "context.h"
#include "queue.h"
#include "parser.h"
#include "c3/util.h"
#include "c3/stb.h"
#include "c3/tags.h"
#include "c3/tree.h"
struct job {
enum {
QUEUE_UNSTARTED,
QUEUE_RUNNING,
QUEUE_FAILED,
QUEUE_SUCCESS,
QUEUE_CRASHED,
} state;
// index into context trees list
size_t index;
// The rest of the structure is only defined if state == QUEUE_RUNNING
pid_t pid;
int in, out;
};
static struct{
struct context *ctx;
enum queue_mode mode;
struct job *jobs;
// if 1, the executing code is in a subprocess
int in_child;
} state;
int
queue_init(struct context *ctx, enum queue_mode mode)
{
state.ctx = ctx;
state.mode = mode;
state.jobs = NULL;
return 1;
}
// -1 if none
static uint32_t
queue_find_tire(char *path)
{
uint32_t i;
for(i = 0; i < stb_sb_count(state.ctx->trees); i += 1)
if(strcmp(state.ctx->trees[i].path, path) == 0)
return i;
return -1;
}
int
queue_job_parent(char *path)
{
size_t len;
char buf[2];
buf[0] = 0;
len = strlen(path);
if(write(state.in_child, buf, 1) != 1)
FATAL("Failed to write import message type", 0);
buf[0] = len & 0xFF;
buf[1] = (len >> 8) & 0xFF;
if(write(state.in_child, buf, 2) != 2)
FATAL("Failed to write import path size (%d)", len);
if(write(state.in_child, path, len) != len)
FATAL("Failed to write import path",0);
return 0;
}
int
queue_job(char *path)
{
struct job job;
uint32_t i;
struct pathed_tire p;
if(state.in_child)
return queue_job_parent(path);
path = strdup(path);
if(path == NULL)
FATAL("Out of memory", 0);
i = queue_find_tire(path);
if(state.mode == QUEUE_PARSER){
// Don't duplicate job in queue
if(i != -1){
free(path);
return 1;
}
p.path = path;
p.tree = NULL;
i = stb_sb_count(state.ctx->trees);
if(!stb_sb_ensure_capacity(&state.ctx->trees, 1, sizeof(struct pathed_tire))){
ERROR("Out of memory", 0);
free(path);
return 0;
}
state.ctx->trees[i] = p;
stb_sb_increment(state.ctx->trees, 1);
}
if(state.mode == QUEUE_ANALYSIS){
if(i == -1){
ERROR("Unable to find '%s' parsetree", path);
free(path);
return 0;
}
free(path);
}
job.state = QUEUE_UNSTARTED;
job.index = i;
stb_sb_ensure_capacity(&state.jobs, 1, sizeof(struct job));
state.jobs[stb_sb_count(state.jobs)] = job;
stb_sb_increment(state.jobs, 1);
return 1;
}
uint32_t
queue_get_next(void)
{
uint32_t i;
for(i = 0; i < stb_sb_count(state.jobs); i += 1)
if(state.jobs[i].state < QUEUE_RUNNING)
return i;
return -1;
}
void
job_parse(struct job *job, struct pathed_tire *p)
{
p->tree = malloc(sizeof(struct c3tree));
if(p->tree == NULL){
ERROR("Out of memory", 0);
job->state = QUEUE_FAILED;
return;
}
if(!parse(p->path, p->tree)){
ERROR("Parsing failed!", 0);
job->state = QUEUE_FAILED;
c3free(p->tree);
p->tree = NULL;
return;
}
job->state = QUEUE_SUCCESS;
}
int analysis(struct c3tree *tree, int out);
void
job_analyze(struct job *job, struct pathed_tire *p)
{
job->state = QUEUE_SUCCESS;
if(!analysis(p->tree, -1))
job->state = QUEUE_FAILED;
}
void
queue_parser_child(char *path, int out)
{
char c = 1;
struct c3tree tree;
if(!parse(path, &tree))
FATAL("Parsing of '%s' failed",path);
if(write(out, &c, 1) != 1)
FATAL("Failed to write tree message type", 0);
if(!c3write(&tree, out))
FATAL("Failed to send tree to parent", 0);
}
void
queue_process(struct job *job)
{
int from_child[2], to_child[2];
struct pathed_tire p;
int buf[2];
if(job->state != QUEUE_UNSTARTED)
FATAL("ICE: attempted to run initialized job", 0);
if(pipe(from_child) != 0 || pipe(to_child) != 0){
ERROR("Failed to create pipes", 0);
job->state = QUEUE_CRASHED;
return;
}
job->in = from_child[0];
job->out = to_child[1];
job->state = QUEUE_RUNNING;
job->pid = fork();
p = state.ctx->trees[job->index];
fflush(NULL);
if(job->pid == 0){
state.in_child = from_child[1];
if(state.mode == QUEUE_PARSER)
queue_parser_child(p.path, from_child[1]);
else
if(!analysis(p.tree, from_child[1]))
exit(1);
if (read(to_child[0], buf, 1) != 1)
FATAL("Failed to read suicide signal", 0);
exit(0);
} else if(job->pid == -1){
ERROR("Failed to execute subprocess", 0);
job->state = QUEUE_CRASHED;
}
}
void
queue_execute(struct job *job)
{
struct pathed_tire *p;
p = &state.ctx->trees[job->index];
if(state.mode == QUEUE_PARSER)
job_parse(job, p);
else
job_analyze(job, p);
}
uint32_t
queue_active(void)
{
uint32_t c, i;
c = 0;
for(i = 0; i < stb_sb_count(state.jobs); i += 1)
if(state.jobs[i].state == QUEUE_RUNNING)
c += 1;
return c;
}
struct job *
queue_find_job(pid_t pid)
{
uint32_t i;
for(i = 0; i < stb_sb_count(state.jobs); i += 1)
if(state.jobs[i].state == QUEUE_RUNNING && state.jobs[i].pid == pid)
return &state.jobs[i];
return NULL;
}
void
queue_fail(struct job *job, char *msg)
{
ERROR("%s", msg);
job->state = QUEUE_CRASHED;
kill(job->pid, SIGKILL);
}
int
read_or_kill(struct job *job, char *buf, size_t len)
{
if(read(job->in, buf, len) != len){
queue_fail(job, "Failed to read pipe");
free(buf);
return 0;
}
return 1;
}
void
queue_handle_import(struct job *job)
{
char *buf;
char *b2;
uint16_t len;
buf = malloc(2);
if(buf == NULL){
queue_fail(job, "Out of memory");
return;
}
if(read_or_kill(job, buf, 2) != 1)
return;
len = buf[0] | (buf[1] << 8);
free(buf);
buf = malloc(len + 1);
if(buf == NULL){
queue_fail(job, "Out of memory");
return;
}
if(read_or_kill(job, buf, len) != 1)
return;
buf[len] = 0;
b2 = h_path_resolve(buf);
free(buf);
if(!queue_job(b2))
queue_fail(job, "Failed to queue imported file");
free(b2);
}
void
queue_handle_tire(struct job *job)
{
struct pathed_tire *p;
char c = 3;
p = &state.ctx->trees[job->index];
if(p->tree)
c3free(p->tree);
p->tree = malloc(sizeof(struct c3tree));
if(p->tree == NULL){
queue_fail(job, "Out of memory");
return;
}
if(!c3read(p->tree, job->in)){
queue_fail(job, "Failed to read tree!");
c3free(p->tree);
p->tree = NULL;
}
if (write(job->out, &c, 1) != 1)
ERROR("Failed to signal subprocess to die", 0);
}
void
queue_handle_input(struct job *job)
{
char *buf;
char c;
int flags;
while(1){
buf = malloc(2);
if(buf == NULL){
// Don't mark job as failed; we can retry later, since we didn't read anything.
ERROR("Out of memory",0);
break;
}
flags = fcntl(job->in, F_GETFL);
if(flags == -1){
ERROR("Failed to read flags",0);
free(buf);
break;
}
flags |= O_NONBLOCK;
if(fcntl(job->in, F_SETFL, flags) != 0){
ERROR("Failed to set NONBLOCK!", 0);
free(buf);
break;
}
c = read(job->in, buf, 1);
if(c != 1){
free(buf);
break;
}
flags = flags & ~O_NONBLOCK;
if(fcntl(job->in, F_SETFL, flags) != 0){
queue_fail(job, "Failed to unset NONBLOCK");
free(buf);
break;
}
c = buf[0];
free(buf);
if(c == 0)
queue_handle_import(job);
else if(c == 1)
queue_handle_tire(job);
else{
queue_fail(job, "Unrecognized message type");
break;
}
}
}
void
queue_handle_input_all(void)
{
fd_set ssd;
int highest;
uint32_t i;
struct timeval v;
struct job *job;
v.tv_sec = 0;
v.tv_usec = 10000;
FD_ZERO(&ssd);
highest = 0;
for(i = 0; i < stb_sb_count(state.jobs); i += 1){
job = &state.jobs[i];
if(job->state == QUEUE_RUNNING && job->in > -1){
FD_SET(job->in, &ssd);
if(job->in >= highest)
highest = job->in + 1;
}
}
if(select(highest, &ssd, NULL, NULL, &v) <= 0)
return;
for(i = 0; i < stb_sb_count(state.jobs); i += 1){
job = &state.jobs[i];
if(job->state == QUEUE_RUNNING && job->in > -1 && FD_ISSET(job->in, &ssd))
queue_handle_input(job);
}
}
void
queue_handle_death(void)
{
pid_t pid;
struct job *job;
int status;
pid = waitpid(-1, &status, WNOHANG);
if(pid > 0){
job = queue_find_job(pid);
if(job == NULL)
FATAL("Unable to find job for pid %d\n", pid);
job->state = QUEUE_SUCCESS;
if(WIFEXITED(status) == 0 || WEXITSTATUS(status) != 0){
job->state = WIFEXITED(status) == 0 ? QUEUE_CRASHED : QUEUE_FAILED;
ERROR("Job %s: %d", job->state == QUEUE_CRASHED ? "crashed" : "failed", pid);
return;
}
}
}
int
queue_status(void)
{
uint32_t i, j;
struct job *job;
struct pathed_tire *p;
j = 1;
for(i = 0; i < stb_sb_count(state.jobs); i += 1){
job = &state.jobs[i];
p = &state.ctx->trees[job->index];
if(job->state != QUEUE_SUCCESS){
if(job->state == QUEUE_CRASHED)
ERROR("ICE: Job for '%s' crashed!", p->path);
j = 0;
}
}
return j;
}
int
queue_wait(void)
{
uint32_t i;
struct timespec s;
s.tv_sec = 0;
s.tv_nsec = 10000;
i = queue_get_next();
while(i != -1 || queue_active() > 0){
if(state.ctx->single_process)
queue_execute(&state.jobs[i]);
else{
if(queue_active() < 4 && i != -1)
queue_process(&state.jobs[i]);
queue_handle_death();
queue_handle_input_all();
nanosleep(&s, NULL);
}
i = queue_get_next();
}
return queue_status();
}
void
queue_free(void)
{
stb_sb_free(state.jobs);
}