~emersion/mrsh

ref: 07e65a3a88e03c6d9e6f1f6ac4a1cfa9a74a9b09 mrsh/shell/task/pipeline.c -rw-r--r-- 3.0 KiB
07e65a3aSimon Ser parser/arithm: fix && and || 2 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "shell/task.h"

/**
 * Put the process into its job's process group. This has to be done both in the
 * parent and the child because of potential race conditions.
 */
static struct process *init_child(struct context *ctx, pid_t pid) {
	struct process *proc = process_create(ctx->state, pid);
	if (ctx->state->options & MRSH_OPT_MONITOR) {
		job_add_process(ctx->job, proc);

		if (ctx->state->interactive && !ctx->background) {
			job_set_foreground(ctx->job, true, false);
		}
	}
	return proc;
}

int run_pipeline(struct context *ctx, struct mrsh_pipeline *pl) {
	// Create a new sub-context, because we want one job per pipeline.
	struct context child_ctx = *ctx;
	if (child_ctx.job == NULL) {
		child_ctx.job = job_create(ctx->state, &pl->and_or_list.node);
	}

	assert(pl->commands.len > 0);
	if (pl->commands.len == 1) {
		int ret = run_command(&child_ctx, pl->commands.data[0]);
		if (pl->bang && ret >= 0) {
			ret = !ret;
		}
		return ret;
	}

	struct mrsh_array procs = {0};
	mrsh_array_reserve(&procs, pl->commands.len);
	int next_stdin = -1, cur_stdin = -1, cur_stdout = -1;
	for (size_t i = 0; i < pl->commands.len; ++i) {
		struct mrsh_command *cmd = pl->commands.data[i];

		if (i < pl->commands.len - 1) {
			int fds[2];
			if (pipe(fds) != 0) {
				perror("pipe");
				return TASK_STATUS_ERROR;
			}

			// We'll use the write end of the pipe as stdout, the read end will
			// be used as stdin by the next command
			assert(next_stdin == -1 && cur_stdout == -1);
			next_stdin = fds[0];
			cur_stdout = fds[1];
		}

		pid_t pid = fork();
		if (pid < 0) {
			return TASK_STATUS_ERROR;
		} else if (pid == 0) {
			child_ctx.state->child = true;

			init_child(&child_ctx, getpid());
			if (ctx->state->options & MRSH_OPT_MONITOR) {
				init_job_child_process(ctx->state);
			}

			if (next_stdin >= 0) {
				close(next_stdin);
			}

			if (i > 0) {
				if (dup2(cur_stdin, STDIN_FILENO) < 0) {
					fprintf(stderr, "failed to duplicate stdin: %s\n",
						strerror(errno));
					return false;
				}
				close(cur_stdin);
			}

			if (i < pl->commands.len - 1) {
				if (dup2(cur_stdout, STDOUT_FILENO) < 0) {
					fprintf(stderr, "failed to duplicate stdout: %s\n",
						strerror(errno));
					return false;
				}
				close(cur_stdout);
			}

			int ret = run_command(&child_ctx, cmd);
			if (ret < 0) {
				exit(127);
			}

			exit(ret);
		}

		struct process *proc = init_child(&child_ctx, pid);
		mrsh_array_add(&procs, proc);

		if (cur_stdin >= 0) {
			close(cur_stdin);
			cur_stdin = -1;
		}
		if (cur_stdout >= 0) {
			close(cur_stdout);
			cur_stdout = -1;
		}

		cur_stdin = next_stdin;
		next_stdin = -1;
	}

	assert(next_stdin == -1 && cur_stdout == -1 && cur_stdin == -1);

	int ret = 0;
	for (size_t i = 0; i < procs.len; ++i) {
		struct process *proc = procs.data[i];
		ret = job_wait_process(proc);
		if (ret < 0) {
			break;
		}
	}
	mrsh_array_finish(&procs);
	if (pl->bang && ret >= 0) {
		ret = !ret;
	}
	return ret;
}