~sircmpwn/mrsh

2d5d78be5e59e97015b97c30ea8322e24526fafe — Simon Ser 6 months ago 2f3acaf
shell: fix pipelines
4 files changed, 72 insertions(+), 16 deletions(-)

M include/mrsh/shell.h
M shell/job.c
M shell/shell.c
M shell/task/pipeline.c
M include/mrsh/shell.h => include/mrsh/shell.h +1 -0
@@ 110,6 110,7 @@ struct mrsh_state {
	// TODO: move this to context
	enum mrsh_branch_control branch_control;
	int nloops;
	bool child; // true if we're not the main shell process
};

void mrsh_function_destroy(struct mrsh_function *fn);

M shell/job.c => shell/job.c +32 -12
@@ 193,19 193,26 @@ int job_poll(struct mrsh_job *job) {
	return proc_status;
}

static bool wait_any(struct mrsh_state *state) {
static bool _job_wait(struct mrsh_state *state, pid_t pid) {
	while (true) {
		// We only want to be notified about stopped processes in the main
		// shell. Child processes want to block until their own children have
		// terminated.
		//
		// Here it's important to wait for a specific process: we don't want to
		// steal one of our grandchildren's status for one of our children.
		int stat;
		pid_t pid = waitpid(-1, &stat, WUNTRACED);
		if (pid == -1) {
		pid_t ret = waitpid(pid, &stat, state->child ? 0 : WUNTRACED);
		if (ret < 0) {
			if (errno == EINTR) {
				continue;
			}
			fprintf(stderr, "failed to waitpid(): %s\n", strerror(errno));
			fprintf(stderr, "waitpid failed: %s\n", strerror(errno));
			return false;
		}
		assert(ret > 0);

		update_job(state, pid, stat);
		update_job(state, ret, stat);
		return true;
	}
}


@@ 217,7 224,16 @@ int job_wait(struct mrsh_job *job) {
			return status;
		}

		if (!wait_any(job->state)) {
		struct process *wait_proc = NULL;
		for (size_t j = 0; j < job->processes.len; ++j) {
			struct process *proc = job->processes.data[j];
			if (process_poll(proc) == TASK_STATUS_WAIT) {
				wait_proc = proc;
				break;
			}
		}
		assert(wait_proc != NULL);
		if (!_job_wait(job->state, wait_proc->pid)) {
			return TASK_STATUS_ERROR;
		}
	}


@@ 230,7 246,7 @@ int job_wait_process(struct process *proc) {
			return status;
		}

		if (!wait_any(proc->state)) {
		if (!_job_wait(proc->state, proc->pid)) {
			return TASK_STATUS_ERROR;
		}
	}


@@ 256,11 272,15 @@ bool init_job_child_process(struct mrsh_state *state) {
void update_job(struct mrsh_state *state, pid_t pid, int stat) {
	update_process(state, pid, stat);

	// Put stopped and terminated jobs in the background
	for (size_t i = 0; i < state->jobs.len; ++i) {
		struct mrsh_job *job = state->jobs.data[i];
		if (job_poll(job) != TASK_STATUS_WAIT) {
			job_set_foreground(job, false, false);
	// Put stopped and terminated jobs in the background. We don't want to do so
	// if we're not the main shell, because we only have a partial view of the
	// jobs (we only know about our own child processes).
	if (!state->child) {
		for (size_t i = 0; i < state->jobs.len; ++i) {
			struct mrsh_job *job = state->jobs.data[i];
			if (job_poll(job) != TASK_STATUS_WAIT) {
				job_set_foreground(job, false, false);
			}
		}
	}
}

M shell/shell.c => shell/shell.c +2 -0
@@ 158,6 158,8 @@ pid_t subshell_fork(struct context *ctx, struct process **process_ptr) {
			*process_ptr = NULL;
		}

		ctx->state->child = true;

		if (ctx->state->options & MRSH_OPT_MONITOR) {
			// Create a job for all children processes
			pid_t pgid = create_process_group(getpid());

M shell/task/pipeline.c => shell/task/pipeline.c +37 -4
@@ 5,10 5,25 @@
#include <unistd.h>
#include "shell/task.h"

/**
 * Put the process into its job's process group. This has to be done both in the
 * parent and the child because of potential race conditions.
 */
static struct mrsh_job *put_into_process_group(struct context *ctx, pid_t pid) {
	if (ctx->job == NULL) {
		ctx->job = job_create(ctx->state, pid);
	}
	setpgid(pid, ctx->job->pgid);
	return ctx->job;
}

int run_pipeline(struct context *ctx, struct mrsh_pipeline *pl) {
	// Create a new sub-context, because we want one job per pipeline.
	struct context child_ctx = *ctx;

	assert(pl->commands.len > 0);
	if (pl->commands.len == 1) {
		return run_command(ctx, pl->commands.data[0]);
		return run_command(&child_ctx, pl->commands.data[0]);
	}

	struct mrsh_array procs = {0};


@@ 31,11 46,20 @@ int run_pipeline(struct context *ctx, struct mrsh_pipeline *pl) {
			cur_stdout = fds[1];
		}

		struct process *proc;
		pid_t pid = subshell_fork(ctx, &proc);
		pid_t pid = fork();
		if (pid < 0) {
			return TASK_STATUS_ERROR;
		} else if (pid == 0) {
			child_ctx.state->child = true;

			if (ctx->state->options & MRSH_OPT_MONITOR) {
				struct mrsh_job *job = put_into_process_group(&child_ctx, getpid());
				if (ctx->state->interactive && !ctx->background) {
					job_set_foreground(job, true, false);
				}
				init_job_child_process(ctx->state);
			}

			if (i > 0) {
				if (dup2(cur_stdin, STDIN_FILENO) < 0) {
					fprintf(stderr, "failed to duplicate stdin: %s\n",


@@ 54,7 78,7 @@ int run_pipeline(struct context *ctx, struct mrsh_pipeline *pl) {
				close(cur_stdout);
			}

			int ret = run_command(ctx, cmd);
			int ret = run_command(&child_ctx, cmd);
			if (ret < 0) {
				exit(127);
			}


@@ 62,6 86,15 @@ int run_pipeline(struct context *ctx, struct mrsh_pipeline *pl) {
			exit(ret);
		}

		struct process *proc = process_create(ctx->state, pid);
		if (ctx->state->options & MRSH_OPT_MONITOR) {
			struct mrsh_job *job = put_into_process_group(&child_ctx, pid);
			if (ctx->state->interactive && !ctx->background) {
				job_set_foreground(job, true, false);
			}
			job_add_process(job, proc);
		}

		mrsh_array_add(&procs, proc);

		if (cur_stdin >= 0) {