~mil/mepo

d0fbd028649567c707b0b6cdb34011ac92430999 — Miles Alan 5 months ago 0bfdd22
Working first pass on async shellpipe; process commands on semicolons
1 files changed, 77 insertions(+), 24 deletions(-)

M src/api/shellpipe.zig
M src/api/shellpipe.zig => src/api/shellpipe.zig +77 -24
@@ 56,36 56,89 @@ fn shellpipe_run(mepo: *Mepo, cmd: []const u8, as_async: bool) !void {
    var arena = std.heap.ArenaAllocator.init(mepo.allocator);
    defer arena.deinit();

    if (as_async) mepo.idle_mutex.lock();
    defer {
        if (as_async) mepo.idle_mutex.unlock();
    }

    mepo.idle_mutex.lock();
    try mepo.update_debug_message(
        try std.fmt.allocPrint(arena.allocator(), "Shellpipe ({s}) in progress - run!", .{cmd}),
    );
    try refresh_ui(mepo, as_async);
    const env_vars = try get_env_vars(mepo, arena.allocator());

    const args = [_][]const u8{ "sh", "-c", cmd };
    const process_result = try std.ChildProcess.exec(.{
        .allocator = arena.allocator(),
        .argv = args[0..],
        .env_map = &env_vars,
        .max_output_bytes = 10000 * 1024,
    });
    const exitcode = process_result.term.Exited;
    try mepo.update_debug_message(
        try std.fmt.allocPrint(arena.allocator(), "Shellpipe ({s}) completed - execute, {d}!", .{ cmd, process_result.stdout.len }),
    );
    try refresh_ui(mepo, as_async);
    if (exitcode == 0) {
        try mepo.mepolang_execute(process_result.stdout);
        try mepo.update_debug_message(null);
    mepo.idle_mutex.unlock();

    if (as_async) {
        // Async:
        // Continually looks at stdout, lock/unlock mutex and run mepolang
        // on every semicolon
        try async_mepolang_execute(mepo, arena.allocator(), cmd);
    } else {
        utildbg.log("shellpipe error: exited with code {d}\n{s}", .{ exitcode, process_result.stderr });
        try mepo.update_debug_message(process_result.stderr);
        // Non-async:
        // Basically single lock of the mutex, run result mepolang stdout unlock
        mepo.idle_mutex.lock();
        const env_vars = try get_env_vars(mepo, arena.allocator());
        const args = [_][]const u8{ "sh", "-c", cmd };
        const process_result = try std.ChildProcess.exec(.{
            .allocator = arena.allocator(),
            .argv = args[0..],
            .env_map = &env_vars,
            .max_output_bytes = 10000 * 1024,
        });
        const exitcode = process_result.term.Exited;
        try mepo.update_debug_message(
            try std.fmt.allocPrint(arena.allocator(), "Shellpipe ({s}) completed - execute, {d}!", .{ cmd, process_result.stdout.len }),
        );
        try refresh_ui(mepo, as_async);
        if (exitcode == 0) {
            try mepo.mepolang_execute(process_result.stdout);
            try mepo.update_debug_message(null);
        } else {
            utildbg.log("shellpipe error: exited with code {d}\n{s}", .{ exitcode, process_result.stderr });
            try mepo.update_debug_message(process_result.stderr);
        }
        mepo.idle_mutex.unlock();
    }
}

fn async_mepolang_execute(mepo: *Mepo, allocator: std.mem.Allocator, cmd: []const u8) !void {
    // More or less from zig 0.9 stdlib child process's spawnPosix
    const argv = [_][]const u8{ "sh", "-c", cmd };
    const max_output_bytes: usize = 50 * 1024;
    const child = try std.ChildProcess.init(argv[0..], allocator);
    defer child.deinit();
    child.stdin_behavior = .Ignore;
    child.stdout_behavior = .Pipe;
    try child.spawn();
    var stdout = std.ArrayList(u8).init(allocator);
    errdefer stdout.deinit();

    var poll_fds = [_]std.os.pollfd{
        .{ .fd = child.stdout.?.handle, .events = std.os.POLL.IN, .revents = undefined },
    };
    const bump_amt = 512;

    while (true) loop: {
        const events = try std.os.poll(&poll_fds, std.math.maxInt(i32));
        if (events == 0) continue;
        if (poll_fds[0].revents & std.os.POLL.IN != 0) {
            try stdout.ensureTotalCapacity(std.math.min(stdout.items.len + bump_amt, max_output_bytes));
            if (stdout.unusedCapacitySlice().len == 0) return error.StdoutStreamTooLong;
            const nread = try std.os.read(poll_fds[0].fd, stdout.unusedCapacitySlice());
            if (nread == 0) break :loop;
            stdout.items.len += nread;
            if (std.mem.indexOf(u8, stdout.items, ";")) |mepolang_statement_end_index| {
                const statement = stdout.items[0 .. mepolang_statement_end_index + 1];
                utildbg.log("Running mepolang statement from async shellipe: {s}\n", .{statement});

                mepo.idle_mutex.lock();
                try mepo.mepolang_execute(statement);
                mepo.idle_mutex.unlock();

                // TODO: is there a slice 'shift' like functionality in arraylist?
                var i: usize = mepolang_statement_end_index + 2;
                while (i > 0) : (i -= 1) _ = stdout.swapRemove(0);
            }
        } else if (poll_fds[0].revents & (std.os.POLL.ERR | std.os.POLL.NVAL | std.os.POLL.HUP) != 0) {
            break :loop;
        }
    }
    _ = try child.wait();
}

fn refresh_ui(mepo: *Mepo, as_async: bool) !void {