~mh/proc

d4ed0b2ce972edfd74535830563d0e7025a92ba6 — Max Hille 4 months ago ea1c9be
Add graceful shutdown
1 files changed, 99 insertions(+), 33 deletions(-)

M main.go
M main.go => main.go +99 -33
@@ 2,24 2,31 @@ package main

import (
	"bufio"
	"errors"
	"fmt"
	"io"
	"log"
	"os"
	"os/exec"
	"os/signal"
	"strings"
	"syscall"
)

type job struct {
	name string
	cmd  []string
	name    string
	cmd     []string
	stopped bool
}

type msg struct {
	job  job
	line string
type output struct {
	job   *job
	lines []string
	eof   bool
}

var pad = 0

func main() {
	jobs, err := load()
	if err != nil {


@@ 30,63 37,122 @@ func main() {
		log.Fatalf("No jobs defined.")
	}

	log.Print(jobs)
	stdouts := make(chan msg)
	stderrs := make(chan msg)
	// pad formatting
	for _, job := range jobs {
		if len(job.name) > pad {
			pad = len(job.name)
		}
	}

	stdouts := make(chan output)
	stderrs := make(chan output)

	for i := range jobs {
		job := &jobs[i]
		cmd := exec.Command(job.cmd[0], job.cmd[1:]...)

		stdout, err := cmd.StdoutPipe()
		if err != nil {
			log.Fatal(err)
			job.printError(fmt.Errorf("Could not bind STDOUT: %w", err))
			continue
		}
		stderr, err := cmd.StderrPipe()
		if err != nil {
			log.Fatal(err)
			job.printError(fmt.Errorf("Could not bind STDERR: %w", err))
			continue
		}

		err = cmd.Start()
		if err != nil {
			log.Fatal(err)

			job.printError(fmt.Errorf("Could not start: %w", err))
			continue
		}
		go bind(job, stdout, stdouts)
		go bind(job, stderr, stderrs)
	}

	// formatting
	max := 0
	for _, job := range jobs {
		if len(job.name) > max {
			max = len(job.name)
		}
		go job.bind(stdout, stdouts)
		go job.bind(stderr, stderrs)
	}
	paddedFmt := fmt.Sprintf("%%%ds | %%v\n", max)
	paddedFmtErr := fmt.Sprintf("%%%ds | \x1b[31m%%v\x1b[0m\n", max)

	sig := make(chan os.Signal)
	signal.Notify(sig, syscall.SIGINT)

	waiting := false
	for {
		select {
		case msg := <-stdouts:
			fmt.Printf(paddedFmt, msg.job.name, msg.line)
		case msg := <-stderrs:
			fmt.Printf(paddedFmtErr, msg.job.name, msg.line)
		case <-sig:
			if !waiting {
				log.Println("WAITING FOR JOBS TO SHUT DOWN. PRESS ^C AGAIN TO KILL IMMEDIATELY")
				waiting = true
			} else {
				log.Println("KILLING IMMEDIATELY. WATCH OUT FOR ZOMBIES!")
				os.Exit(1)
			}
		case output := <-stdouts:
			for _, line := range output.lines {
				output.job.print(line)
			}
			if output.eof {
				output.job.stopped = true
				output.job.print("=== STOPPED ===")
			}
			if allStopped(jobs) {
				os.Exit(0)
			}

		case output := <-stderrs:
			for _, line := range output.lines {
				output.job.print(line)
			}
		}
	}
}

func bind(job job, pipe io.ReadCloser, out chan msg) {
func (job *job) printError(err error) {
	formatErr := fmt.Sprintf("%%%ds | \x1b[31m%%v\x1b[0m\n", pad)
	fmt.Printf(formatErr, job.name, err)
}

func (job *job) print(str string) {
	format := fmt.Sprintf("%%%ds | %%v\n", pad)
	fmt.Printf(format, job.name, str)
}

func allStopped(jobs []job) bool {
	for _, job := range jobs {
		if !job.stopped {
			return false
		}
	}

	return true
}

func (job *job) bind(pipe io.ReadCloser, out chan output) {
	rd := bufio.NewReader(pipe)
	for {
		str, err := rd.ReadString('\n')
		if err != nil {
			log.Fatal(err)
		}

		// split CRs into lines
		str = strings.ReplaceAll(str, "\r", "\n")
		lines := strings.Split(strings.TrimSuffix(str, "\n"), "\n")

		for _, line := range lines {
			out <- (msg{job: job, line: line})
		stopped := false
		eof := false
		if errors.Is(err, io.EOF) {
			eof = true
			stopped = true
		} else if err != nil {
			job.printError(err)
			stopped = true
		}

		out <- output{
			lines: lines,
			eof:   eof,
			job:   job,
		}

		if stopped {
			break
		}
	}
}