~emersion/mrsh

mrsh/shell/task/pipeline.c -rw-r--r-- 3.1 KiB View raw
742831faSimon Ser Replace assert(0) with abort() 23 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "shell/task.h"

/**
 * Put the process into its job's process group. This has to be done both in the
 * parent and the child because of potential race conditions.
 */
static struct mrsh_process *init_child(struct mrsh_context *ctx, pid_t pid) {
	struct mrsh_process *proc = process_create(ctx->state, pid);
	if (ctx->state->options & MRSH_OPT_MONITOR) {
		job_add_process(ctx->job, proc);

		if (ctx->state->interactive && !ctx->background) {
			job_set_foreground(ctx->job, true, false);
		}
	}
	return proc;
}

int run_pipeline(struct mrsh_context *ctx, struct mrsh_pipeline *pl) {
	struct mrsh_state_priv *priv = state_get_priv(ctx->state);

	// Create a new sub-context, because we want one job per pipeline.
	struct mrsh_context child_ctx = *ctx;
	if (child_ctx.job == NULL) {
		child_ctx.job = job_create(ctx->state, &pl->and_or_list.node);
	}

	assert(pl->commands.len > 0);
	if (pl->commands.len == 1) {
		int ret = run_command(&child_ctx, pl->commands.data[0]);
		if (pl->bang && ret >= 0) {
			ret = !ret;
		}
		return ret;
	}

	struct mrsh_array procs = {0};
	mrsh_array_reserve(&procs, pl->commands.len);
	int next_stdin = -1, cur_stdin = -1, cur_stdout = -1;
	for (size_t i = 0; i < pl->commands.len; ++i) {
		struct mrsh_command *cmd = pl->commands.data[i];

		if (i < pl->commands.len - 1) {
			int fds[2];
			if (pipe(fds) != 0) {
				perror("pipe");
				return TASK_STATUS_ERROR;
			}

			// We'll use the write end of the pipe as stdout, the read end will
			// be used as stdin by the next command
			assert(next_stdin == -1 && cur_stdout == -1);
			next_stdin = fds[0];
			cur_stdout = fds[1];
		}

		pid_t pid = fork();
		if (pid < 0) {
			return TASK_STATUS_ERROR;
		} else if (pid == 0) {
			priv->child = true;

			init_child(&child_ctx, getpid());
			if (ctx->state->options & MRSH_OPT_MONITOR) {
				init_job_child_process(ctx->state);
			}

			if (next_stdin >= 0) {
				close(next_stdin);
			}

			if (i > 0) {
				if (dup2(cur_stdin, STDIN_FILENO) < 0) {
					fprintf(stderr, "failed to duplicate stdin: %s\n",
						strerror(errno));
					return false;
				}
				close(cur_stdin);
			}

			if (i < pl->commands.len - 1) {
				if (dup2(cur_stdout, STDOUT_FILENO) < 0) {
					fprintf(stderr, "failed to duplicate stdout: %s\n",
						strerror(errno));
					return false;
				}
				close(cur_stdout);
			}

			int ret = run_command(&child_ctx, cmd);
			if (ret < 0) {
				exit(127);
			}

			exit(ret);
		}

		struct mrsh_process *proc = init_child(&child_ctx, pid);
		mrsh_array_add(&procs, proc);

		if (cur_stdin >= 0) {
			close(cur_stdin);
			cur_stdin = -1;
		}
		if (cur_stdout >= 0) {
			close(cur_stdout);
			cur_stdout = -1;
		}

		cur_stdin = next_stdin;
		next_stdin = -1;
	}

	assert(next_stdin == -1 && cur_stdout == -1 && cur_stdin == -1);

	int ret = 0;
	for (size_t i = 0; i < procs.len; ++i) {
		struct mrsh_process *proc = procs.data[i];
		ret = job_wait_process(proc);
		if (ret < 0) {
			break;
		}
	}
	mrsh_array_finish(&procs);
	if (pl->bang && ret >= 0) {
		ret = !ret;
	}
	return ret;
}