~nesv/govern

a11fb6b7e583db7bf96e3a3db68cc5375ec88be6 — Nick Saika 1 year, 2 months ago 18e992f v0.0.2
Remove a ton of dead code, and fix linting errors
40 files changed, 16 insertions(+), 3047 deletions(-)

D cmd/govern/main.go
D internal/agent/agent.go
M internal/exec/exec.go
M internal/exec/pkg.go
M internal/facts/util.go
D internal/hosts/hosts.go
M internal/resource/parser.go
M internal/runner/gather.go
D internal/server/call.go
D internal/server/message.go
D internal/server/server.go
M lua.go
M main.go
D pkg/agent/agent.go
D pkg/exec/doc.go
D pkg/exec/echo.go
D pkg/exec/exec.go
D pkg/exec/pkg.go
D pkg/exec/utils.go
D pkg/facts/doc.go
D pkg/facts/fact.go
D pkg/facts/facts.go
D pkg/facts/gather.go
D pkg/facts/util.go
D pkg/hosts/hosts.go
D pkg/resource/attributes.go
D pkg/resource/doc.go
D pkg/resource/parse.go
D pkg/resource/parser.go
D pkg/resource/ref.go
D pkg/resource/resource.go
D pkg/resource/signal.go
D pkg/runner/gather.go
D pkg/runner/runner.go
D pkg/runner/runners.go
D pkg/server/call.go
D pkg/server/message.go
D pkg/server/server.go
D pkg/state/state.go
D runners/pkg/arch_linux.go
D cmd/govern/main.go => cmd/govern/main.go +0 -309
@@ 1,309 0,0 @@
package main

import (
	"bytes"
	"fmt"
	"os"
	"sort"
	"strings"
	"text/tabwriter"
	"time"

	"git.sr.ht/~nesv/govern/pkg/facts"
	"git.sr.ht/~nesv/govern/pkg/resource"
	"git.sr.ht/~nesv/govern/pkg/runner"
	"git.sr.ht/~nesv/govern/pkg/state"

	"github.com/nesv/cmndr"
	"github.com/pkg/errors"
	kerrors "k8s.io/apimachinery/pkg/util/errors"
)

// Set by command-line flags.
var (
	//configPath     string
	factsDirs      []string
	factsIgnore    []string
	stateDirs      []string
	statesIgnore   []string
	pretend        bool
	withAttributes bool
	runnerDirs     []string
	withSignals    bool
)

var (
	defaultFactsDirs = []string{
		"/usr/local/lib/govern/facts.d",
		"/usr/local/etc/govern/facts.d",
	}

	defaultStateDirs = []string{
		"/usr/local/lib/govern/states.d",
		"/usr/local/etc/govern/states.d",
	}

	defaultRunnerDirs = []string{
		"/usr/local/lib/govern/bin",
		"/usr/lib/govern/bin",
	}
)

func main() {
	name, err := os.Executable()
	if err != nil {
		panic(err)
	}

	root := cmndr.New(name, nil)
	root.Description = "Configure your host"
	//root.Flags.StringVarP(&configPath, "config", "f", "/usr/local/etc/govern/config.hcl", "Path to the configuration `file`")
	root.Flags.StringSliceVarP(&factsDirs, "facts-directory", "F", defaultFactsDirs, "Directories to look for facts")
	root.Flags.StringSliceVar(&factsIgnore, "ignore-facts", nil, "Globbing `pattern`s of facts  to ignore")
	root.Flags.StringSliceVarP(&stateDirs, "states-directory", "S", defaultStateDirs, "Directories to gather state files from")
	root.Flags.StringSliceVar(&statesIgnore, "ignore-states", nil, "Globbing `pattern`s of states to ignore")
	root.Flags.BoolVarP(&pretend, "pretend", "n", false, "Do not apply any states")
	root.Flags.BoolVar(&withAttributes, "with-attributes", false, "Show resource attributes with --pretend, -n")
	root.Flags.StringSliceVarP(&runnerDirs, "runners-directory", "R", defaultRunnerDirs, "Directories to look for runners in")
	root.Flags.BoolVar(&withSignals, "with-signals", false, "resources: Show the signals a resource can send")

	facts := cmndr.New("facts", gatherFacts)
	facts.Description = "Collect facts for the current host"
	root.AddCmd(facts)

	states := cmndr.New("states", showStates)
	states.Description = "Collect and list defined states"
	root.AddCmd(states)

	apply := cmndr.New("apply", applyStates)
	apply.Description = "Apply all defined states"
	root.AddCmd(apply)

	resources := cmndr.New("resources", showResources)
	resources.Description = "List all defined resources"
	root.AddCmd(resources)

	runners := cmndr.New("runners", showRunners)
	runners.Description = "List all discovered runners"
	root.AddCmd(runners)

	root.Exec()
}

func gatherFacts(cmd *cmndr.Cmd, args []string) error {
	f, err := collectFacts()
	if err != nil {
		return errors.Wrap(err, "collect facts")
	}
	if _, err := f.WriteTo(os.Stdout); err != nil {
		return errors.Wrap(err, "write to stdout")
	}
	return nil
}

func collectFacts() (*facts.Facts, error) {
	if len(factsDirs) == 0 {
		factsDirs = make([]string, 0, len(defaultFactsDirs))
		copy(factsDirs, defaultFactsDirs)
	}

	var options []facts.GatherOption
	for _, dir := range factsDirs {
		options = append(options, facts.Dir(dir))
	}

	for _, pattern := range factsIgnore {
		options = append(options, facts.Ignore(pattern))
	}

	return facts.Gather(options...)
}

func showStates(cmd *cmndr.Cmd, args []string) error {
	states, err := collectStates()
	if err != nil {
		return errors.Wrap(err, "collect states")
	}

	if _, err := states.WriteTo(os.Stdout); err != nil {
		return errors.Wrap(err, "write to stdout")
	}

	return nil
}

func collectStates() (*state.States, error) {
	for len(stateDirs) == 0 {
		stateDirs = make([]string, 0, len(defaultStateDirs))
		copy(stateDirs, defaultStateDirs)
	}

	var options []state.GatherOption
	for _, dir := range stateDirs {
		options = append(options, state.Dir(dir))
	}

	for _, pattern := range statesIgnore {
		options = append(options, state.Ignore(pattern))
	}

	return state.Gather(options...)
}

func applyStates(cmd *cmndr.Cmd, args []string) error {
	// TODO(nesv): Allow the user to supply the name of the state they would
	// like to apply, as positional parameters.

	states, err := collectStates()
	if err != nil {
		return errors.Wrap(err, "collect states")
	}

	runners, err := collectRunners()
	if err != nil {
		return errors.Wrap(err, "collect runners")
	}

	var (
		m         = make(map[string]string) // rref -> src
		resources = make([]resource.Resource, 0)
	)
	for _, st := range states.States() {
		// The user can supply the name of states they would like to
		// selectively apply.
		// If the state we are current on doesn't match the name of
		// any of the states specified by the user, skip it.
		if len(args) > 0 {
			var apply bool
			for _, arg := range args {
				if st.String() == arg {
					apply = true
					break
				}
			}
			if !apply {
				continue
			}
		}

		res, err := st.Resources(resource.WithRunners(runners.Runners()...))
		if err != nil {
			return errors.Wrap(err, "resources")
		}

		for _, r := range res {
			if src, exists := m[r.Name.String()]; exists {
				return errors.Errorf("resource %s already defined at %s", r.Name, src)
			}
			m[r.Name.String()] = r.Source()
			resources = append(resources, r)
		}
	}

	// TODO(nesv): Sort resources to become a DAG.

	// Execute our resources.
	//
	// NOTE(nesv): Normally, we would be super careful and check to make
	// sure that we have a runner for the resource.
	// However, we cannot parse the state files unless we have a runner
	// that can already handle that resource, so for now, we are going to
	// elide the check.
	tw := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)

	var errs []error
	for _, res := range resources {
		fmt.Fprint(tw, res)

		var (
			start = time.Now()
			rn, _ = runners.Get(res.Name.Runner)
			cfg   = resource.ExecuteConfig{
				Runner:     rn,
				GovernPath: os.Args[0],
				FactsDirs:  factsDirs,
				Pretend:    pretend,
			}
		)
		stdout, stderr, err := res.Execute(cfg)
		fmt.Fprintf(tw, "\t%s", time.Since(start))
		if err != nil {
			errs = append(errs, fmt.Errorf("apply %s: %w", res, err))
			fmt.Fprintf(tw, "\t%s\n", stderr)
			continue
		}
		if pretend {
			fmt.Fprintf(tw, "\t%s\n", bytes.TrimSpace(stdout))
		} else {
			fmt.Fprintln(tw, "\tok")
		}
	}

	if err := tw.Flush(); err != nil {
		errs = append(errs, fmt.Errorf("flush tabwriter: %w", err))
	}

	return kerrors.NewAggregate(errs)
}

func showResources(cmd *cmndr.Cmd, args []string) error {
	runners, err := collectRunners()
	if err != nil {
		return errors.Wrap(err, "collect runners")
	}

	states, err := collectStates()
	if err != nil {
		return errors.Wrap(err, "collect states")
	}

	tw := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
	for _, s := range states.States() {
		resources, err := s.Resources(resource.WithRunners(runners.Runners()...))
		if err != nil {
			return errors.Wrapf(err, "parse resources for state %s", s)
		}

		for _, r := range resources {
			fmt.Fprintf(tw, "%s\t%s", s, r)
			if withSignals {
				signals := make([]string, 0, len(r.Notify))
				for _, s := range r.Notify {
					signals = append(signals, s.String())
				}
				sort.Strings(signals)
				fmt.Fprintf(tw, "\t%s", strings.Join(signals, ","))
			}
			fmt.Fprintln(tw)
		}
	}

	return errors.Wrap(tw.Flush(), "flush tabwriter")
}

func showRunners(cmd *cmndr.Cmd, args []string) error {
	runners, err := collectRunners()
	if err != nil {
		return errors.Wrap(err, "collect runners")
	}

	if _, err := runners.WriteTo(os.Stdout); err != nil {
		return errors.Wrap(err, "write to stdout")
	}

	return nil
}

func collectRunners() (runner.Runners, error) {
	for len(runnerDirs) == 0 {
		runnerDirs = make([]string, 0, len(defaultRunnerDirs))
		copy(runnerDirs, defaultRunnerDirs)
	}

	var options []runner.GatherOption
	for _, dir := range runnerDirs {
		options = append(options, runner.Dir(dir))
	}

	return runner.Gather(options...)
}

D internal/agent/agent.go => internal/agent/agent.go +0 -140
@@ 1,140 0,0 @@
package agent

import (
	"context"
	"time"

	"github.com/go-mangos/mangos"
	"github.com/go-mangos/mangos/protocol/req"
	"github.com/go-mangos/mangos/protocol/sub"
	"github.com/go-mangos/mangos/transport/tcp"
	"github.com/pkg/errors"
)

type Option func(*Agent) error

func SubscribeTo(addr string) Option {
	return func(a *Agent) error {
		if addr != "" {
			a.subRemoteAddr = addr
		}
		return nil
	}
}

func ReplyTo(addr string) Option {
	return func(a *Agent) error {
		if addr != "" {
			a.reqRemoteAddr = addr
		}
		return nil
	}
}

type Agent struct {
	subSock mangos.Socket
	reqSock mangos.Socket

	subRemoteAddr string
	reqRemoteAddr string
}

func New(options ...Option) (*Agent, error) {
	agent := &Agent{
		subRemoteAddr: "127.0.0.1:4510",
		reqRemoteAddr: "127.0.0.1:4511",
	}
	for i, option := range options {
		if err := option(agent); err != nil {
			return nil, errors.Wrapf(err, "apply option %d", i)
		}
	}

	return agent, nil
}

// Connect establishes a connection to an upstream server, and returns a
// channel that can be used for receiving errors that the agent is not able
// to recover from.
//
// If the provided context ctx is cancelled, it will result in a graceful
// shutdown of the agent, and the returned channel will be closed without any
// value sent on it.
func (a *Agent) Connect(ctx context.Context) <-chan error {
	errs := make(chan error)
	go a.connect(ctx, errs)
	return errs
}

func (a *Agent) connect(ctx context.Context, errs chan<- error) {
	defer close(errs)

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	if err := a.dial(); err != nil {
		errs <- errors.Wrap(err, "dial server")
		return
	}
	defer a.reqSock.Close()

	if err := a.subscribe(); err != nil {
		errs <- errors.Wrap(err, "subscribe")
		return
	}
	defer a.subSock.Close()

	for {
		a.subSock.SetOption(mangos.OptionRecvDeadline, 50*time.Millisecond)
		p, err := a.subSock.Recv()
		if err != nil && err != mangos.ErrRecvTimeout {
			if err == mangos.ErrRecvTimeout {
				select {
				case <-ctx.Done():
					return
				default:
					continue
				}
			}
			select {
			case <-ctx.Done():
			case errs <- errors.Wrap(err, "sub recv"):
			}
			return
		}
		select {
		case <-ctx.Done():
			return
		default:
		}
	}
}

func (a *Agent) dial() error {
	sock, err := req.NewSocket()
	if err != nil {
		return errors.Wrap(err, "new req socket")
	}
	sock.AddTransport(tcp.NewTransport())
	if err := sock.Dial("tcp://" + a.reqRemoteAddr); err != nil {
		return errors.Wrap(err, "dial remote addr")
	}
	a.reqSock = sock
	return nil
}

func (a *Agent) subscribe() error {
	sock, err := sub.NewSocket()
	if err != nil {
		return errors.Wrap(err, "new sub socket")
	}
	sock.AddTransport(tcp.NewTransport())
	options := map[string]interface{}{
		mangos.OptionSubscribe: []byte{},
	}
	if err := sock.DialOptions("tcp://"+a.subRemoteAddr, options); err != nil {
		return errors.Wrap(err, "sub socket: dial options")
	}
	a.subSock = sock
	return nil
}

M internal/exec/exec.go => internal/exec/exec.go +9 -9
@@ 28,15 28,15 @@ type Runner interface {
type Resource string

func (r Resource) Type() string {
	if i := strings.Index(":"); i != -1 {
		return r[:i]
	if i := strings.Index(string(r), ":"); i != -1 {
		return string(r[:i])
	}
	return ""
}

func (r Resource) Name() string {
	if i := strings.Index(":"); i != -1 {
		return r[i+1:]
	if i := strings.Index(string(r), ":"); i != -1 {
		return string(r[i+1:])
	}
	return ""
}


@@ 51,13 51,13 @@ type State struct {
func (s *State) Run() *Result {
	if err := s.prepare(); err != nil {
		return &Result{
			Status:  http.InternalServerError,
			Status:  http.StatusInternalServerError,
			Message: err.Error(),
		}
	}

	return &Result{
		Status:  http.NotImplemented,
		Status:  http.StatusNotImplemented,
		Message: "not implemented",
	}
}


@@ 80,7 80,7 @@ type File struct {

func (f *File) Run() *Result {
	return &Result{
		Status:  http.NotImplemented,
		Status:  http.StatusNotImplemented,
		Message: "not implemented",
	}
}


@@ 96,7 96,7 @@ type Service struct {

func (s *Service) Run() *Result {
	return &Result{
		Status:  http.NotImplemented,
		Status:  http.StatusNotImplemented,
		Message: "not implemented",
	}
}


@@ 105,5 105,5 @@ type ServiceState string

const (
	ServiceRunning ServiceState = "running"
	ServiceStopped              = "stopped"
	ServiceStopped ServiceState = "stopped"
)

M internal/exec/pkg.go => internal/exec/pkg.go +1 -1
@@ 9,7 9,7 @@ type PackageState string

const (
	PackageInstalled PackageState = "installed"
	PackageLatest                 = "latest"
	PackageLatest    PackageState = "latest"
)

type Package struct {

M internal/facts/util.go => internal/facts/util.go +2 -0
@@ 4,10 4,12 @@ import (
	"io/fs"
)

//lint:ignore U1000 Ignore for now
func isExecutable(m fs.FileMode) bool {
	return m&fs.ModePerm&0111 != 0
}

//lint:ignore U1000 Ignore for now
func isReadable(m fs.FileMode) bool {
	return m&fs.ModePerm&0444 != 0
}

D internal/hosts/hosts.go => internal/hosts/hosts.go +0 -11
@@ 1,11 0,0 @@
package hosts

type Groups map[string]Group

type Group struct {
	Hosts []string
}

type Host struct {
	Port string
}

M internal/resource/parser.go => internal/resource/parser.go +0 -2
@@ 305,6 305,4 @@ func (p *hclparser) parseAttrValue(v cty.Value) (string, error) {
	default:
		return "", errors.Errorf("unhandled attribute value type: %v", v.Type().GoString())
	}

	return "", nil
}

M internal/runner/gather.go => internal/runner/gather.go +2 -2
@@ 11,10 11,10 @@ func Dir(name string) GatherOption {
}

// Gather ...
func Gather(options ...GatherOption) (Runners, error) {
func Gather(options ...GatherOption) (*Runners, error) {
	var r Runners
	for _, option := range options {
		option(&r)
	}
	return r, r.gather()
	return &r, r.gather()
}

D internal/server/call.go => internal/server/call.go +0 -47
@@ 1,47 0,0 @@
package server

import (
	"encoding/json"
	"log"

	"github.com/go-mangos/mangos/protocol/req"
	"github.com/go-mangos/mangos/transport/ipc"
	"github.com/pkg/errors"
)

// Call connects to a server's local control socket, and calls the RPC method
// with the specified args.
func Call(method string, args ...string) error {
	sock, err := req.NewSocket()
	if err != nil {
		return errors.Wrap(err, "new req socket")
	}
	sock.AddTransport(ipc.NewTransport())
	if err := sock.Dial("ipc://" + "/tmp/govern.sock"); err != nil {
		return errors.Wrap(err, "dial control socket")
	}
	defer sock.Close()

	p, err := json.Marshal(&ControlMessage{
		Method: method,
		Args:   args,
	})
	if err != nil {
		return errors.Wrap(err, "marshal json")
	}

	if err := sock.Send(p); err != nil {
		return errors.Wrapf(err, "send %q", method)
	}

	rp, err := sock.Recv()
	if err != nil {
		return errors.Wrap(err, "recv")
	}
	var resp ControlMessageResponse
	if err := json.Unmarshal(rp, &resp); err != nil {
		return errors.Wrap(err, "unmarshal json response")
	}
	log.Printf("%s:%d: %s", resp.ID, resp.Status, resp.Message)
	return nil
}

D internal/server/message.go => internal/server/message.go +0 -13
@@ 1,13 0,0 @@
package server

type ControlMessage struct {
	Method string
	Args   []string
}

type ControlMessageResponse struct {
	ID      string
	Request ControlMessage
	Status  int
	Message string
}

D internal/server/server.go => internal/server/server.go +0 -277
@@ 1,277 0,0 @@
package server

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/go-mangos/mangos"
	"github.com/go-mangos/mangos/protocol/pub"
	"github.com/go-mangos/mangos/protocol/rep"
	"github.com/go-mangos/mangos/transport/ipc"
	"github.com/go-mangos/mangos/transport/tcp"
	"github.com/pkg/errors"
	"github.com/rs/xid"
)

const (
	// The default listen address of the PUB socket that downstream
	// agents will connect to, with SUB sockets. The PUB socket will
	// be used for sending control messages to agents.
	DefaultPubListenAddr = "0.0.0.0:4510"

	// Default listen address of the REP socket, that downstream agents
	// will connect to, with REP sockets. The REP socket will be used
	// for receiving data from agents.
	DefaultRepListenAddr = "0.0.0.0:4511"

	// Default path to a UNIX socket, which is used by command-line clients
	// for sending commands to the server.
	DefaultSocketPath = "/var/run/govern.sock"
)

type Option func(*Server) error

func PubListenAddr(urlStr string) Option {
	return func(s *Server) error {
		if urlStr != "" {
			s.pubListenAddr = urlStr
		}
		return nil
	}
}

func RepListenAddr(urlStr string) Option {
	return func(s *Server) error {
		if urlStr != "" {
			s.repListenAddr = urlStr
		}
		return nil
	}
}

func ControlSocketPath(name string) Option {
	return func(s *Server) error {
		if name != "" {
			s.ctlSockPath = name
		}
		return nil
	}
}

type Server struct {
	pubSock       mangos.Socket
	pubListenAddr string
	repSock       mangos.Socket
	repListenAddr string
	ctlSock       mangos.Socket
	ctlSockPath   string
}

func New(options ...Option) (*Server, error) {
	srv := &Server{
		pubListenAddr: DefaultPubListenAddr,
		repListenAddr: DefaultRepListenAddr,
		ctlSockPath:   DefaultSocketPath,
	}

	for i, option := range options {
		if err := option(srv); err != nil {
			return nil, errors.Wrapf(err, "apply option %d", i)
		}
	}

	return srv, nil
}

func (s *Server) Listen(ctx context.Context) error {
	if s.pubSock == nil {
		sock, err := pub.NewSocket()
		if err != nil {
			return errors.Wrap(err, "create pub socket")
		}
		sock.AddTransport(tcp.NewTransport())
		if err := sock.Listen("tcp://" + s.pubListenAddr); err != nil {
			return errors.Wrapf(err, "pub socket: listen on %q", s.pubListenAddr)
		}
		s.pubSock = sock
	}

	if s.repSock == nil {
		sock, err := rep.NewSocket()
		if err != nil {
			return errors.Wrap(err, "create rep socket")
		}
		sock.AddTransport(tcp.NewTransport())
		if err := sock.Listen("tcp://" + s.repListenAddr); err != nil {
			return errors.Wrapf(err, "rep socket: listen on %q", s.repListenAddr)
		}
		s.repSock = sock
	}

	if s.ctlSock == nil {
		sock, err := rep.NewSocket()
		if err != nil {
			return errors.Wrap(err, "create control socket")
		}
		sock.AddTransport(ipc.NewTransport())
		if err := sock.Listen("ipc://" + s.ctlSockPath); err != nil {
			return errors.Wrapf(err, "control socket: listen on %q", s.ctlSockPath)
		}
		s.ctlSock = sock
	}

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	errs := make(chan error)
	defer close(errs)

	var wg sync.WaitGroup
	wg.Add(3)

	go func() {
		defer wg.Done()
		if err, ok := <-s.startPubSocket(ctx); ok && err != nil {
			select {
			case errs <- err:
			default:
			}
		}
	}()

	go func() {
		defer wg.Done()
		if err, ok := <-s.startRepSocket(ctx); ok && err != nil {
			select {
			case errs <- err:
			default:
			}
		}
	}()

	go func() {
		defer wg.Done()
		if err, ok := <-s.startControlSocket(ctx); ok && err != nil {
			select {
			case errs <- err:
			default:
			}
		}
	}()

	select {
	case err, ok := <-errs:
		cancel()
		wg.Wait()
		if ok {
			return err
		}

	case <-ctx.Done():
		wg.Wait()
		return ctx.Err()
	}

	return nil
}

func (s *Server) startPubSocket(ctx context.Context) <-chan error {
	log.Println("Starting PUB socket")
	ch := make(chan error)
	go func() {
		defer close(ch)
		defer log.Println("PUB socket stopped")
		defer s.pubSock.Close()

		for {
			select {
			case <-ctx.Done():
				ch <- ctx.Err()
				return
			}
		}
	}()
	return ch
}

func (s *Server) startRepSocket(ctx context.Context) <-chan error {
	log.Println("Starting REP socket")
	ch := make(chan error)
	go func() {
		defer close(ch)
		defer log.Println("REP socket stopped")
		defer s.repSock.Close()

		for {
			select {
			case <-ctx.Done():
				ch <- ctx.Err()
				return
			}
		}
	}()
	return ch
}

func (s *Server) startControlSocket(ctx context.Context) <-chan error {
	log.Println("Starting control socket")
	ch := make(chan error)
	go func() {
		defer close(ch)
		defer log.Println("Control socket stopped")
		defer s.ctlSock.Close()

		for {
			s.ctlSock.SetOption(mangos.OptionRecvDeadline, 50*time.Millisecond)
			p, err := s.ctlSock.Recv()
			if err != nil && err == mangos.ErrRecvTimeout {
				select {
				case <-ctx.Done():
					ch <- ctx.Err()
					return
				default:
					continue
				}
			} else if err != nil {
				ch <- errors.Wrap(err, "control socket recv")
				return
			}
			select {
			case <-ctx.Done():
				ch <- ctx.Err()
				return
			default:
			}

			var msg ControlMessage
			if err := json.Unmarshal(p, &msg); err != nil {
				log.Println("Failed to parse control message:", err)
				continue
			}

			log.Printf("Received control message: %q args=%v", msg.Method, msg.Args)

			resp := &ControlMessageResponse{
				ID:      xid.New().String(),
				Request: msg,
				Status:  http.StatusNotImplemented,
				Message: "not implemented",
			}
			rp, err := json.Marshal(resp)
			if err != nil {
				log.Println("Failed to marshal control message response:", err)
				continue
			}

			if err := s.ctlSock.Send(rp); err != nil {
				ch <- errors.Wrap(err, "control socket: send response")
				return
			}
		}
	}()
	return ch
}

M lua.go => lua.go +1 -0
@@ 87,6 87,7 @@ func (c *command) runLuaShell(cmd *cmndr.Cmd, args []string) error {
	}
}

//lint:ignore U1000 Ignore for now
func stdio() io.ReadWriter {
	return &readwriter{r: os.Stdin, w: os.Stdout}
}

M main.go => main.go +1 -1
@@ 352,7 352,7 @@ func (c *command) showRunners(cmd *cmndr.Cmd, args []string) error {
	return nil
}

func (c *command) collectRunners() (runner.Runners, error) {
func (c *command) collectRunners() (*runner.Runners, error) {
	var options []runner.GatherOption
	for _, dir := range c.runnerDirs {
		options = append(options, runner.Dir(dir))

D pkg/agent/agent.go => pkg/agent/agent.go +0 -140
@@ 1,140 0,0 @@
package agent

import (
	"context"
	"time"

	"github.com/go-mangos/mangos"
	"github.com/go-mangos/mangos/protocol/req"
	"github.com/go-mangos/mangos/protocol/sub"
	"github.com/go-mangos/mangos/transport/tcp"
	"github.com/pkg/errors"
)

type Option func(*Agent) error

func SubscribeTo(addr string) Option {
	return func(a *Agent) error {
		if addr != "" {
			a.subRemoteAddr = addr
		}
		return nil
	}
}

func ReplyTo(addr string) Option {
	return func(a *Agent) error {
		if addr != "" {
			a.reqRemoteAddr = addr
		}
		return nil
	}
}

type Agent struct {
	subSock mangos.Socket
	reqSock mangos.Socket

	subRemoteAddr string
	reqRemoteAddr string
}

func New(options ...Option) (*Agent, error) {
	agent := &Agent{
		subRemoteAddr: "127.0.0.1:4510",
		reqRemoteAddr: "127.0.0.1:4511",
	}
	for i, option := range options {
		if err := option(agent); err != nil {
			return nil, errors.Wrapf(err, "apply option %d", i)
		}
	}

	return agent, nil
}

// Connect establishes a connection to an upstream server, and returns a
// channel that can be used for receiving errors that the agent is not able
// to recover from.
//
// If the provided context ctx is cancelled, it will result in a graceful
// shutdown of the agent, and the returned channel will be closed without any
// value sent on it.
func (a *Agent) Connect(ctx context.Context) <-chan error {
	errs := make(chan error)
	go a.connect(ctx, errs)
	return errs
}

func (a *Agent) connect(ctx context.Context, errs chan<- error) {
	defer close(errs)

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	if err := a.dial(); err != nil {
		errs <- errors.Wrap(err, "dial server")
		return
	}
	defer a.reqSock.Close()

	if err := a.subscribe(); err != nil {
		errs <- errors.Wrap(err, "subscribe")
		return
	}
	defer a.subSock.Close()

	for {
		a.subSock.SetOption(mangos.OptionRecvDeadline, 50*time.Millisecond)
		p, err := a.subSock.Recv()
		if err != nil && err != mangos.ErrRecvTimeout {
			if err == mangos.ErrRecvTimeout {
				select {
				case <-ctx.Done():
					return
				default:
					continue
				}
			}
			select {
			case <-ctx.Done():
			case errs <- errors.Wrap(err, "sub recv"):
			}
			return
		}
		select {
		case <-ctx.Done():
			return
		default:
		}
	}
}

func (a *Agent) dial() error {
	sock, err := req.NewSocket()
	if err != nil {
		return errors.Wrap(err, "new req socket")
	}
	sock.AddTransport(tcp.NewTransport())
	if err := sock.Dial("tcp://" + a.reqRemoteAddr); err != nil {
		return errors.Wrap(err, "dial remote addr")
	}
	a.reqSock = sock
	return nil
}

func (a *Agent) subscribe() error {
	sock, err := sub.NewSocket()
	if err != nil {
		return errors.Wrap(err, "new sub socket")
	}
	sock.AddTransport(tcp.NewTransport())
	options := map[string]interface{}{
		mangos.OptionSubscribe: []byte{},
	}
	if err := sock.DialOptions("tcp://"+a.subRemoteAddr, options); err != nil {
		return errors.Wrap(err, "sub socket: dial options")
	}
	a.subSock = sock
	return nil
}

D pkg/exec/doc.go => pkg/exec/doc.go +0 -2
@@ 1,2 0,0 @@
// Package exec provides functions run by govern agents on local machines.
package exec

D pkg/exec/echo.go => pkg/exec/echo.go +0 -27
@@ 1,27 0,0 @@
//+build !windows

package exec

import (
	"os/exec"

	"github.com/pkg/errors"
)

func Echo(args map[string]string) ([]byte, error) {
	p, err := findInPath("echo")
	if err != nil {
		return nil, errors.Wrap(err, "find in path")
	}

	msg, ok := args["message"]
	if !ok {
		return nil, nil
	}

	output, err := runCmd(exec.Command(p, msg))
	if err != nil {
		return nil, errors.Wrapf(err, "run %q", p)
	}
	return output, nil
}

D pkg/exec/exec.go => pkg/exec/exec.go +0 -109
@@ 1,109 0,0 @@
package exec

import (
	"errors"
	"net/http"
	"os"
	"strings"
)

type Result struct {
	ID      string
	Status  int
	Message string
	Output  string
}

func OK(message string) *Result {
	return &Result{
		Status:  http.StatusOK,
		Message: message,
	}
}

type Runner interface {
	Run() *Result
}

type Resource string

func (r Resource) Type() string {
	if i := strings.Index(":"); i != -1 {
		return r[:i]
	}
	return ""
}

func (r Resource) Name() string {
	if i := strings.Index(":"); i != -1 {
		return r[i+1:]
	}
	return ""
}

// State represents an entire state manifest.
type State struct {
	Packages map[string]Package `hcl:"pkg"`
	Files    map[string]File    `hcl:"file"`
	Services map[string]Service `hcl:"service"`
}

func (s *State) Run() *Result {
	if err := s.prepare(); err != nil {
		return &Result{
			Status:  http.InternalServerError,
			Message: err.Error(),
		}
	}

	return &Result{
		Status:  http.NotImplemented,
		Message: "not implemented",
	}
}

func (s *State) prepare() error {
	return errors.New("not implemented")
}

type File struct {
	Source      string      `hcl:"src"`
	Destination string      `hcl:"dest"`
	Template    bool        `hcl:"template"`
	User        string      `hcl:"user"`
	Group       string      `hcl:"group"`
	Mode        os.FileMode `hcl:"mode"`

	Require []Resource `hcl:"require"`
	Notify  []Resource `hcl:"notify"`
}

func (f *File) Run() *Result {
	return &Result{
		Status:  http.NotImplemented,
		Message: "not implemented",
	}
}

type Service struct {
	Name    string       `hcl:"name"`
	Enabled bool         `hcl:"enabled"`
	State   ServiceState `hcl:"state"`

	Require []Resource `hcl:"require"`
	Notify  []Resource `hcl:"notify"`
}

func (s *Service) Run() *Result {
	return &Result{
		Status:  http.NotImplemented,
		Message: "not implemented",
	}
}

type ServiceState string

const (
	ServiceRunning ServiceState = "running"
	ServiceStopped              = "stopped"
)

D pkg/exec/pkg.go => pkg/exec/pkg.go +0 -32
@@ 1,32 0,0 @@
package exec

import (
	"fmt"
	"net/http"
)

type PackageState string

const (
	PackageInstalled PackageState = "installed"
	PackageLatest                 = "latest"
)

type Package struct {
	Name    string       `hcl:"name"`
	State   PackageState `hcl:"state"`
	Version string       `hcl:"version"`

	Notify  []Resource `hcl:"notify"`
	Require []Resource `hcl:"require"`
}

func (p Package) Run() *Result {
	if p.Version != "" && p.State == PackageLatest {
		return &Result{
			Status:  http.StatusConflict,
			Message: "version cannot be specified if state=latest",
		}
	}
	return OK(fmt.Sprintf("package %q installed", p.Name))
}

D pkg/exec/utils.go => pkg/exec/utils.go +0 -37
@@ 1,37 0,0 @@
//+build !windows

package exec

import (
	"os"
	"os/exec"
	"path/filepath"
	"strings"

	"github.com/pkg/errors"
)

// findInPath searches the govern agent's $PATH environment variable looking
// for the executable name, returning the absolute path to the executable.
// If the executable cannot be found, the absolute path will be an empty
// string, and a non-nil error will be returned.
func findInPath(name string) (absPath string, err error) {
	path, ok := os.LookupEnv("PATH")
	if !ok {
		return "", errors.New("$PATH is not set")
	}
	if path == "" {
		return "", errors.New("$PATH is empty")
	}

	for _, pdir := range strings.Split(path, ":") {
		if info, err := os.Stat(filepath.Join(pdir, name)); err == nil {
			return info.Name(), nil
		}
	}
	return "", errors.Errorf("cannot find %q in $PATH", name)
}

func runCmd(cmd *exec.Cmd) (output []byte, err error) {
	return cmd.CombinedOutput()
}

D pkg/facts/doc.go => pkg/facts/doc.go +0 -69
@@ 1,69 0,0 @@
// Package facts provides the means of gathering information about hosts where
// govern is installed.
//
// The main entrypoint for this package is the Gather function.
//
// Facts are either plain-text, or executable files found within a directory in
// Gather's search path.
// If a fact is executable, Gather will execute the fact and collect whatever is
// written to STDOUT.
// If a fact is a plain-text file,
// Gather will simply read its contents.
//
// Regardless of how a fact's value is populated,
// all leading and trailing whitespace will be trimmed.
//
// Fact naming
//
// Facts are named after their location in the directory hierarchy,
// minus the leading path elements of the directory.
// For example, if "/etc/govern/facts.d" was in the search path,
// a fact collected from:
//
//     /etc/govern/facts/os/kernel/version
//
// would be known as:
//
//     os/kernel/version
//
// Precedence
//
// As facts are collected from the directories in Gather's search path,
// facts that have the same name as previously-collected fact will "win".
//
// If the following facts are collected in the order given:
//
//     /usr/local/lib/govern/facts/os/name
//     /etc/govern/facts/os/name
//
// the value collected from "/etc/govern/facts/os/name" would be the winning
// value.
//
// Disabling facts
//
// To disable a plain-text fact from being read,
// simply changes the file's mode by dropping the read bits:
//
//     chmod 0200 /etc/govern/facts/my-custom-fact
//
// To disable an executable fact, drop the read and execute bits.
// Note that simply removing the execute bits from an executable fact's mode
// will still leave the read bits set.
// In this case,
// the contents of the executable fact will be read,
// which is likely not desirable.
package facts

// DefaultDirs returns a list of the default directories that will be searched
// for facts.
func DefaultDirs() []string {
	s := make([]string, 0, len(defaultDirs))
	copy(s, defaultDirs)
	return s
}

var defaultDirs = []string{
	"/usr/local/lib/govern/facts",
	"/usr/local/etc/govern/facts",
	"/etc/govern/facts",
}

D pkg/facts/fact.go => pkg/facts/fact.go +0 -158
@@ 1,158 0,0 @@
package facts

import (
	"bytes"
	"fmt"
	"io"
	"io/fs"
	"os"
	"os/exec"
	"path/filepath"
	"sync"

	"github.com/pkg/errors"
)

// Remote returns a new fact that will retrieve the fact name from a remote host.
// Refer to the documentation for net.Dial to see how network and addr are used.
func Remote(network, addr, name string) (*Fact, error) {
	return nil, errors.New("not implemented")
}

func newFact(baseDir, name string) (*Fact, error) {
	if !filepath.IsAbs(baseDir) {
		var err error
		if baseDir, err = filepath.Abs(baseDir); err != nil {
			return nil, fmt.Errorf("make basedir absolute: %w", err)
		}
	}

	return &Fact{
		dir:  baseDir,
		name: name,
	}, nil
}

type Fact struct {
	// dir is the "fact directory".
	// name is the name of the fact.
	// dir + "/" + name must give you the absolute path to the fact.
	dir, name string

	// When network and addr are set, will indicate that this is a remote fact.
	network, addr string

	// value is the cached value of the fact.
	// It will be populated on the first call to Read.
	value []byte
	mu    sync.RWMutex
}

func (f *Fact) Read(p []byte) (int, error) {
	// If the destination buffer has a zero length, do nothing.
	if len(p) == 0 {
		return 0, nil
	}

	// Has f.value already been populated?
	f.mu.RLock()
	if len(f.value) > 0 {
		n := copy(p, f.value)
		f.mu.RUnlock()
		if n < len(p) {
			return n, io.EOF
		}
		return n, nil
	}
	f.mu.RUnlock()

	// Populate f.value.

	// Is this a remote fact?
	if f.Remote() {
		return 0, errors.New("remote facts are not yet implemented")
	}

	// This is a local fact.
	src := filepath.Join(f.dir, f.name)
	info, err := os.Stat(src)
	if err != nil {
		return 0, err
	}

	var buf []byte
	if mode := info.Mode(); f.isReadable(mode) && f.isExecutable(mode) {
		// Execute the file, and return its STDOUT as the fact value.
		var err error
		if buf, err = f.exec(src); err != nil {
			return 0, fmt.Errorf("exec fact: %w", err)
		}
	} else if f.isReadable(mode) {
		// Read the file, using its contents as the fact value.
		var err error
		if buf, err = f.read(src); err != nil {
			return 0, fmt.Errorf("read fact: %w", err)
		}
	}
	buf = bytes.TrimSpace(buf)

	f.mu.Lock()
	f.value = make([]byte, len(buf))
	copy(f.value, buf)
	f.mu.Unlock()

	n := copy(p, buf)
	if n < len(p) {
		return n, io.EOF
	}
	return n, nil
}

func (f *Fact) exec(pathname string) ([]byte, error) {
	cmd := exec.Command(pathname)
	return cmd.Output()
}

func (f *Fact) read(pathname string) ([]byte, error) {
	ff, err := os.Open(pathname)
	if err != nil {
		return nil, fmt.Errorf("open: %w", err)
	}
	defer ff.Close()
	return io.ReadAll(io.LimitReader(ff, 1<<20))
}

func (f *Fact) isReadable(m fs.FileMode) bool {
	return m&fs.ModePerm&0444 != 0
}

func (f *Fact) isExecutable(m fs.FileMode) bool {
	return m&fs.ModePerm&0111 != 0
}

// String returns the fact's value.
func (f *Fact) String() string {
	f.mu.RLock()
	if len(f.value) > 0 {
		f.mu.RUnlock()
		return string(f.value)
	}
	f.mu.RUnlock()

	p, err := io.ReadAll(f)
	if err != nil {
		return ""
	}
	return string(p)
}

// Name returns the name of the fact.
func (f *Fact) Name() string {
	return f.name
}

// Remote indicates whether or not this is a fact that has to be retrieved from
// a remote host.
func (f *Fact) Remote() bool {
	return f.network != "" && f.addr != ""
}

D pkg/facts/facts.go => pkg/facts/facts.go +0 -162
@@ 1,162 0,0 @@
package facts

import (
	"context"
	"fmt"
	"io"
	"io/fs"
	"os"
	"path"
	"path/filepath"
	"sort"
	"strings"
	"sync"
	"text/tabwriter"

	"github.com/pkg/errors"
	kerrors "k8s.io/apimachinery/pkg/util/errors"
)

// Facts holds facts discovered about a host.
type Facts struct {
	mu    sync.RWMutex
	facts map[string]*Fact

	dirs      []string
	ignore    []string
	net, addr string
}

func (f *Facts) gather(ctx context.Context) error {
	var facts map[string]*Fact
	if f.Local() {
		f, err := f.gatherLocal(ctx)
		if err != nil {
			return errors.Wrap(err, "local")
		}
		facts = f
	} else {
		f, err := f.gatherRemote(ctx)
		if err != nil {
			return errors.Wrap(err, "remote")
		}
		facts = f
	}

	f.mu.Lock()
	defer f.mu.Unlock()
	f.facts = facts
	return nil
}

// gatherLocal gathers facts from the local host.
func (f *Facts) gatherLocal(ctx context.Context) (map[string]*Fact, error) {
	var errs []error
	facts := make(map[string]*Fact)
	for i, d := range f.dirs {
		if !filepath.IsAbs(d) {
			var err error
			d, err = filepath.Abs(d)
			if err != nil {
				errs = append(errs, fmt.Errorf("make %q an absolute path: %w", f.dirs[i], err))
				continue
			}
		}

		info, err := os.Stat(d)
		if err != nil {
			errs = append(errs, fmt.Errorf("fact directory: %w", err))
			continue
		}
		if !info.Mode().IsDir() {
			errs = append(errs, fmt.Errorf("%s: not a directory", d))
			continue
		}

		if err := filepath.Walk(d, f.localFact(d, facts)); err != nil {
			errs = append(errs, fmt.Errorf("walk: %w", err))
			continue
		}
	}

	return facts, kerrors.NewAggregate(errs)
}

// localFact returns a filepath.WalkFunc that captures the value of a fact,
// and stores the value in the facts map.
//
// When the path provided to the WalkFunc is executable,
// it will be executed and its standard output will be
// used as the fact value.
// If the path is not executable,
// its contents will be used as the fact value.
func (f *Facts) localFact(basedir string, facts map[string]*Fact) filepath.WalkFunc {
	return func(pathname string, info fs.FileInfo, err error) error {
		if err != nil {
			return err
		}

		// The name of the fact is its path within the directory.
		name := strings.TrimPrefix(pathname, basedir+string(filepath.Separator))

		// Should we ignore this fact?
		for _, pattern := range f.ignore {
			if matched, err := path.Match(pattern, name); err != nil {
				return errors.Wrap(err, "ignore fact")
			} else if matched {
				return nil
			}
		}

		if info.IsDir() {
			// Descend.
			return nil
		}

		f, err := newFact(basedir, name)
		if err != nil {
			return fmt.Errorf("fact %q: %w", name, err)
		}
		facts[name] = f
		return nil
	}
}

// gatherRemote gathers facts about a remote host.
func (f *Facts) gatherRemote(ctx context.Context) (map[string]*Fact, error) {
	return nil, errors.New("not implemented")
}

// Local indicates whether or not f holds the collected facts for the local
// host.
func (f *Facts) Local() bool {
	return f.addr == ""
}

// WriteTo writes the facts in f to w, using a "text/tabwriter".Writer.
// Facts will be written sorted by name.
func (f *Facts) WriteTo(w io.Writer) (int64, error) {
	f.mu.RLock()
	defer f.mu.RUnlock()

	if len(f.facts) == 0 {
		return 0, nil
	}

	keys := make([]string, 0, len(f.facts))
	for k := range f.facts {
		keys = append(keys, k)
	}
	sort.Strings(keys)

	var written int64
	tw := tabwriter.NewWriter(w, 0, 8, 1, '\t', 0)
	for _, k := range keys {
		n, err := fmt.Fprintf(tw, "%s\t%s\n", k, f.facts[k])
		if err != nil {
			return written, err
		}
		written += int64(n)
	}
	return written, errors.Wrap(tw.Flush(), "flush tabwriter")
}

D pkg/facts/gather.go => pkg/facts/gather.go +0 -43
@@ 1,43 0,0 @@
package facts

import (
	"context"
)

// GatherOption is a functional type that can be passed to Gather,
// to control how it collects facts about a system.
type GatherOption func(*Facts)

// Dir adds the directory name to the facts search path.
// This option is ignored when the Host option is provided.
func Dir(name string) GatherOption {
	return func(f *Facts) {
		f.dirs = append(f.dirs, name)
	}
}

// Ignore allows the caller to skip collecting facts with the given pattern.
func Ignore(pattern string) GatherOption {
	return func(f *Facts) {
		f.ignore = append(f.ignore, pattern)
	}
}

// Host instructs Gather to collect facts from a remote host,
// at the specified network and address.
//
// See net.Dial for more information.
func Host(network, address string) GatherOption {
	return func(f *Facts) {
		f.net, f.addr = network, address
	}
}

// Gather collects facts.
func Gather(options ...GatherOption) (*Facts, error) {
	var f Facts
	for _, option := range options {
		option(&f)
	}
	return &f, f.gather(context.Background())
}

D pkg/facts/util.go => pkg/facts/util.go +0 -13
@@ 1,13 0,0 @@
package facts

import (
	"io/fs"
)

func isExecutable(m fs.FileMode) bool {
	return m&fs.ModePerm&0111 != 0
}

func isReadable(m fs.FileMode) bool {
	return m&fs.ModePerm&0444 != 0
}

D pkg/hosts/hosts.go => pkg/hosts/hosts.go +0 -11
@@ 1,11 0,0 @@
package hosts

type Groups map[string]Group

type Group struct {
	Hosts []string
}

type Host struct {
	Port string
}

D pkg/resource/attributes.go => pkg/resource/attributes.go +0 -21
@@ 1,21 0,0 @@
package resource

const (
	// BeforeAttr is the reserved name of the attribute that is used to
	// order a resource before any others.
	BeforeAttr = "before"

	// AfterAttr is the reserved name of the attribute used to order a
	// resource after another.
	AfterAttr = "after"

	// NotifyAttr is the reserved name of the attribute used to send a
	// signal to another resource.
	NotifyAttr = "notify"
)

type attributes struct {
	before, after []Ref
	notify        []Signal
	other         map[string]string
}

D pkg/resource/doc.go => pkg/resource/doc.go +0 -2
@@ 1,2 0,0 @@
// Package resource provides functions for parsing a resource manifest.
package resource

D pkg/resource/parse.go => pkg/resource/parse.go +0 -44
@@ 1,44 0,0 @@
package resource

import (
	"path/filepath"

	"git.sr.ht/~nesv/govern/pkg/runner"
	"github.com/hashicorp/hcl/v2/hclparse"
	"github.com/pkg/errors"
)

// ParseOption is a functional configuration type that configures a Parser.
type ParseOption func(Parser)

// WithRunners registers the given runners to the parser,
// so that only resources with local runners can be parsed.
func WithRunners(r ...runner.Runner) ParseOption {
	return func(p Parser) {
		p.WithRunners(r...)
	}
}

// Parse parses the resource definitions from the state file name.
// The keys in the returned mapping are the string representations of the Ref
// for the resource.
func Parse(name string, options ...ParseOption) ([]Resource, error) {
	if !filepath.IsAbs(name) {
		return nil, errors.New("path must be absolute")
	}

	var p Parser
	switch ext := filepath.Ext(name); ext {
	case ".hcl":
		p = &hclparser{
			parser: hclparse.NewParser(),
		}

	default:
		return nil, errors.Errorf("unsupported state file extension: %s", ext)
	}
	for _, option := range options {
		option(p)
	}
	return p.Parse(name)
}

D pkg/resource/parser.go => pkg/resource/parser.go +0 -310
@@ 1,310 0,0 @@
package resource

import (
	"strconv"
	"strings"

	"git.sr.ht/~nesv/govern/pkg/runner"

	"github.com/hashicorp/hcl/v2"
	"github.com/hashicorp/hcl/v2/hclparse"
	"github.com/pkg/errors"
	"github.com/zclconf/go-cty/cty"
	"github.com/zclconf/go-cty/cty/gocty"
)

// Parser defines the interface of a type that parses resources from a
// state file.
type Parser interface {
	Parse(filename string) ([]Resource, error)
	WithRunners(r ...runner.Runner)
}

type hclparser struct {
	runners []runner.Runner
	parser  *hclparse.Parser
}

func (p *hclparser) WithRunners(r ...runner.Runner) {
	p.runners = append(p.runners, r...)
}

func (p *hclparser) Parse(name string) ([]Resource, error) {
	f, diagnostics := p.parser.ParseHCLFile(name)
	if diagnostics.HasErrors() {
		return nil, errors.Wrap(diagnostics, "parse file")
	}
	return p.parseResources(f)
}

func (p *hclparser) parseResources(f *hcl.File) ([]Resource, error) {
	// This BodySchema has to be built dynamically,
	// based on the provided list of available runners.
	blocks := make([]hcl.BlockHeaderSchema, 0, len(p.runners))
	for _, r := range p.runners {
		blocks = append(blocks, hcl.BlockHeaderSchema{
			Type:       r.String(),
			LabelNames: []string{"*"},
		})
	}

	content, diagnostics := f.Body.Content(&hcl.BodySchema{Blocks: blocks})
	if diagnostics.HasErrors() {
		return nil, diagnostics
	}

	var (
		m         = make(map[string]struct{}) // used to find duplicate resource definitions
		resources = make([]Resource, 0, len(content.Blocks))
	)
	for _, block := range content.Blocks {
		var (
			runner = block.Type
			name   string
		)
		if labels := block.Labels; len(labels) == 1 {
			name = labels[0]
		} else if len(labels) == 0 {
			return nil, errors.Errorf("missing name for resource at %s", block.DefRange)
		} else if len(labels) > 1 {
			return nil, errors.Errorf("too many labels for resource at %s", block.DefRange)
		}

		if name == "" {
			return nil, errors.Errorf("empty resource name at %s", block.DefRange)
		}

		ref := Ref{
			Runner: runner,
			Name:   name,
		}
		if _, exists := m[ref.String()]; exists {
			return nil, errors.Errorf("redefinition of resource %s at %s", ref, block.DefRange)
		}

		// Parse out the attributes.
		attrs, err := p.parseAttrs(block)
		if err != nil {
			return nil, errors.Wrapf(err, "%q", ref)
		}

		resources = append(resources, Resource{
			Name:       ref,
			Before:     attrs.before,
			After:      attrs.after,
			Notify:     attrs.notify,
			Attributes: attrs.other,
			src:        block.DefRange.String(),
		})
		m[ref.String()] = struct{}{}
	}

	return resources, nil
}

// parseAttrs parses all of the attributes in block.
//
// The reserved attributes "before", "after", and "notify" are parsed first,
// then all remaining attributes are collected.
func (p *hclparser) parseAttrs(block *hcl.Block) (*attributes, error) {
	content, body, diagnostics := block.Body.PartialContent(&hcl.BodySchema{
		Attributes: []hcl.AttributeSchema{
			{Name: BeforeAttr},
			{Name: AfterAttr},
			{Name: NotifyAttr},
		},
	})
	if diagnostics.HasErrors() {
		return nil, errors.Wrap(diagnostics, "parse resource attributes")
	}

	// Our "reserved" attributes are going to be in content.Attributes.
	// All of the other key=value attributes are going to be collected via
	// body.JustAttributes.
	var (
		before []Ref
		after  []Ref
		notify []Signal
	)
	for _, attr := range content.Attributes {
		if !p.isStringTuple(attr) {
			return nil, errors.Errorf("expected a list of strings: %s", attr.Range)
		}

		// Covert values to Refs.
		switch attr.Name {
		case BeforeAttr:
			refs, err := p.toRefs(attr)
			if err != nil {
				return nil, errors.Wrap(err, "parse references")
			}
			before = refs

		case AfterAttr:
			refs, err := p.toRefs(attr)
			if err != nil {
				return nil, errors.Wrap(err, "parse references")
			}
			after = refs

		case NotifyAttr:
			strs, err := p.tostrs(attr)
			if err != nil {
				return nil, errors.Wrap(err, "to strings")
			}

			var signals []Signal
			for _, v := range strs {
				signal, err := ParseSignal(v)
				if err != nil {
					return nil, errors.Wrap(err, "parse signal")
				}
				signals = append(signals, signal)
			}
			notify = signals

		default:
			return nil, errors.Errorf("caught unexpected attribute: %s", attr.Name)
		}
	}

	// Now we can collect all of the other attributes that were specified in
	// the state block.
	// These will be passed to the runner.
	attrs, diagnostics := body.JustAttributes()
	if diagnostics.HasErrors() {
		return nil, errors.Wrap(diagnostics, "collect additional resource attributes")
	}
	other := make(map[string]string)
	for _, attr := range attrs {
		v, err := p.parseAttr(attr)
		if err != nil {
			return nil, errors.Wrapf(err, "parse attribute %q", attr.Name)
		}
		other[attr.Name] = v
	}

	return &attributes{
		before: before,
		after:  after,
		notify: notify,
		other:  other,
	}, nil
}

// toRefs is used to convert the values of an attribute from a go-cty string tuple,
// to a slice of Refs.
func (p *hclparser) toRefs(attr *hcl.Attribute) ([]Ref, error) {
	values, err := p.tostrs(attr)
	if err != nil {
		return nil, errors.Wrap(err, "convert to strings")
	}

	refs := make([]Ref, 0, len(values))
	for _, v := range values {
		r, err := ParseRef(v)
		if err != nil {
			return nil, errors.Wrap(err, "parse reference")
		}
		refs = append(refs, r)
	}

	return refs, nil
}

// tostrs converts attr's value to a slice of strings.
// The value of attr is expected to be a go-cty tuple containing strings.
//
// The caller should check to make sure attr is a go-cty string tuple,
// by calling isStringTuple beforehand.
func (p *hclparser) tostrs(attr *hcl.Attribute) ([]string, error) {
	values, diag := attr.Expr.Value(nil)
	if diag.HasErrors() {
		return nil, diag
	}
	if !values.Type().IsTupleType() {
		return nil, errors.New("not a list/tuple")
	}
	var s []string
	for i := 0; i < values.Type().Length(); i++ {
		s = append(s, values.Index(cty.NumberIntVal(int64(i))).AsString())
	}
	return s, nil
}

// isStringTuple indicates whether attr is a go-cty tuple,
// where each element is a string.
func (p *hclparser) isStringTuple(attr *hcl.Attribute) bool {
	values, diag := attr.Expr.Value(nil)
	if diag.HasErrors() {
		return false
	}
	if !values.Type().IsTupleType() {
		return false
	}
	for i := 0; i < values.Type().Length(); i++ {
		if !values.Type().TupleElementType(i).Equals(cty.String) {
			return false
		}
	}
	return true
}

// parseAttr is used for parsing non-reserved resource attributes.
func (p *hclparser) parseAttr(attr *hcl.Attribute) (string, error) {
	v, diagnostics := attr.Expr.Value(nil)
	if diagnostics.HasErrors() {
		return "", errors.Wrapf(diagnostics, "get attribute value: %s", attr.Name)
	}

	return p.parseAttrValue(v)
}

// parseAttrValue parses the attribute value v,
// and returns the string representation of that value.
//
// Tuple values will be returned as a single string,
// where each element is separated by a comma.
// Commas within a string element will be escaped to "\x2c"
// (the ASCII code for a "," character).
func (p *hclparser) parseAttrValue(v cty.Value) (string, error) {
	switch {
	case v.Type().Equals(cty.Bool):
		if v.True() {
			return "true", nil
		}
		return "false", nil

	case v.Type().Equals(cty.Number):
		var (
			i int
			f float64
		)
		if err := gocty.FromCtyValue(v, &i); err == nil {
			return strconv.FormatInt(int64(i), 10), nil
		}
		if err := gocty.FromCtyValue(v, &f); err == nil {
			return strconv.FormatFloat(f, 'g', -1, 64), nil
		}
		return "", errors.New("cannot convert numeric value")

	case v.Type().Equals(cty.String):
		return strings.ReplaceAll(v.AsString(), ",", "\\x2c"), nil

	case v.Type().IsTupleType():
		var s []string
		for i := 0; i < v.Type().Length(); i++ {
			tv, err := p.parseAttrValue(v.Index(cty.NumberIntVal(int64(i))))
			if err != nil {
				return "", errors.Wrap(err, "parse attribute value")
			}
			s = append(s, tv)
		}
		return strings.Join(s, ","), nil

	default:
		return "", errors.Errorf("unhandled attribute value type: %v", v.Type().GoString())
	}

	return "", nil
}

D pkg/resource/ref.go => pkg/resource/ref.go +0 -39
@@ 1,39 0,0 @@
package resource

import (
	"fmt"
	"strings"

	"github.com/pkg/errors"
)

const refFieldSep = ":"

// ParseRef parses a resource reference from s.
// See the documentation for Ref.String, for the expected format of a
// resource reference string.
func ParseRef(s string) (Ref, error) {
	sep := strings.Index(s, refFieldSep)
	if sep == -1 {
		return Ref{}, errors.New("missing separator")
	}

	return Ref{
		Runner: s[:sep],
		Name:   s[sep+1:],
	}, nil
}

// Ref represents a resource reference.
type Ref struct {
	Runner, Name string
}

// String returns a string representation of r.
// The format of a resource reference is
//
//   runner-name + ":" + resource-name
//
func (r Ref) String() string {
	return fmt.Sprintf("%s%s%s", r.Runner, refFieldSep, r.Name)
}

D pkg/resource/resource.go => pkg/resource/resource.go +0 -110
@@ 1,110 0,0 @@
package resource

import (
	"bytes"
	"errors"
	"fmt"
	"os"
	"os/exec"
	"sort"
	"strings"

	"git.sr.ht/~nesv/govern/pkg/runner"
)

// Resource represents a resource within a state file.
type Resource struct {
	Name       Ref
	Before     []Ref
	After      []Ref
	Notify     []Signal
	Attributes Attributes

	src string
}

// Source returns the name and position of the resource definition from its
// source file.
func (r Resource) Source() string {
	return r.src
}

// String returns the name of the resource so it can be referenced by other
// Resources.
func (r Resource) String() string {
	return r.Name.String()
}

// Execute formats the attributes in r,
// and passes them as positional arguments to r.
//
// If the given runner's name isn't the same as r.Name.Runner,
// Execute will refuse to execute the runner.
func (r Resource) Execute(cfg ExecuteConfig) (stdout, stderr []byte, err error) {
	if cfg.Runner.String() != r.Name.Runner {
		return nil, nil, fmt.Errorf("runner name mismatch: want=%q got=%q", r.Name.Runner, cfg.Runner)
	}

	// The first argument to the runner is the resource's name;
	// not the reference (which includes the runner's name),
	// but the name of the resource that will be managed by the runner.
	args := []string{r.Name.Name}

	// Add in all of the attributes as positional arguments.
	args = append(args, r.Attributes.AsSlice()...)

	var (
		cmd        = exec.Command(cfg.Runner.Path(), args...)
		bout, berr bytes.Buffer
	)

	// Set environment variables.
	cmd.Env = append(cmd.Env,
		fmt.Sprintf("GOVERN=%s", cfg.GovernPath),
		fmt.Sprintf("GOVERN_FACTS_PATH=%s", strings.Join(cfg.FactsDirs, ":")),
		fmt.Sprintf("PATH=%s", os.Getenv("PATH")),
	)
	if cfg.Pretend {
		cmd.Env = append(cmd.Env, "PRETEND=yes")
	}

	cmd.Stdout, cmd.Stderr = &bout, &berr
	var exerr *exec.ExitError
	if err := cmd.Run(); errors.As(err, &exerr) {
		return bytes.TrimSpace(bout.Bytes()), bytes.TrimSpace(berr.Bytes()), fmt.Errorf("execute runner %s: %w", cfg.Runner, exerr)
	} else if err != nil {
		return bytes.TrimSpace(bout.Bytes()), bytes.TrimSpace(berr.Bytes()), fmt.Errorf("run %s: %w", cfg.Runner, err)
	}

	// The runner finished successfully.
	return bout.Bytes(), berr.Bytes(), nil
}

type ExecuteConfig struct {
	Runner     runner.Runner
	GovernPath string
	FactsDirs  []string
	Pretend    bool
}

// Attributes ...
type Attributes map[string]string

// String returns a string representation of all of the attribute pairs.
func (a Attributes) String() string {
	return strings.Join(a.AsSlice(), " ")
}

// AsSlice returns the attributes in a,
// as a sorted slice of "key=value" strings.
func (a Attributes) AsSlice() []string {
	keys := make([]string, 0, len(a))
	for k := range a {
		keys = append(keys, k)
	}
	sort.Strings(keys)
	for i, k := range keys {
		keys[i] = fmt.Sprintf("%s=%s", k, a[k])
	}
	return keys
}

D pkg/resource/signal.go => pkg/resource/signal.go +0 -50
@@ 1,50 0,0 @@
package resource

import (
	"fmt"
	"strings"

	"github.com/pkg/errors"
)

// ParseSignal parses a signal notification string.
// The format of a signal string is:
//
//     RREF:NAME
//
// where RREF is a resource reference, parseable with Parseref,
// and NAME is the name of the signal to send to the resource specified by
// RREF.
//
// Note that signal names have no special meaning within this code.
// Signal names are passed to a runner,
// and it is the runner that decides what to do with the signal name.
func ParseSignal(s string) (Signal, error) {
	if n := strings.Count(s, refFieldSep); n != 2 {
		return Signal{}, errors.Errorf("wrong number of field separators: want=2, got=%d", n)
	}

	sep := strings.LastIndex(s, refFieldSep)
	ref, err := ParseRef(s[:sep])
	if err != nil {
		return Signal{}, errors.Wrap(err, "parse resource reference")
	}

	return Signal{
		Resource: ref,
		Name:     s[sep+1:],
	}, nil
}

// Signal represents a notification signal that is sent from one resource,
// to another.
type Signal struct {
	Resource Ref
	Name     string
}

// String returns s as a string.
// The returned string is parseable with ParseSignal.
func (s Signal) String() string {
	return fmt.Sprintf("%s%s%s", s.Resource, refFieldSep, s.Name)
}

D pkg/runner/gather.go => pkg/runner/gather.go +0 -20
@@ 1,20 0,0 @@
package runner

// GatherOption ...
type GatherOption func(*Runners)

// Dir ...
func Dir(name string) GatherOption {
	return func(r *Runners) {
		r.dirs = append(r.dirs, name)
	}
}

// Gather ...
func Gather(options ...GatherOption) (Runners, error) {
	var r Runners
	for _, option := range options {
		option(&r)
	}
	return r, r.gather()
}

D pkg/runner/runner.go => pkg/runner/runner.go +0 -18
@@ 1,18 0,0 @@
package runner

import "path/filepath"

// Runner ...
type Runner struct {
	src string // absolute path to the runner executable
}

// Path returns the absolute path name to the runner executable.
func (r Runner) Path() string {
	return r.src
}

// String returns the name of the runner.
func (r Runner) String() string {
	return filepath.Base(r.src)
}

D pkg/runner/runners.go => pkg/runner/runners.go +0 -157
@@ 1,157 0,0 @@
// Package runner provides types and functions for discovering runners on the
// local host.
package runner

import (
	"fmt"
	"io"
	"io/fs"
	"os"
	"path/filepath"
	"sort"
	"strings"
	"sync"
	"text/tabwriter"

	"github.com/pkg/errors"
	kerrors "k8s.io/apimachinery/pkg/util/errors"
)

// Runners ...
type Runners struct {
	mu      sync.RWMutex
	runners map[string]Runner

	dirs []string
}

func (r *Runners) gather() error {
	var (
		runners = make(map[string]Runner)
		errs    []error
	)
	for i, d := range r.dirs {
		if !filepath.IsAbs(d) {
			var err error
			d, err = filepath.Abs(d)
			if err != nil {
				errs = append(errs, errors.Wrapf(err, "make %q an absolute path", r.dirs[i]))
				continue
			}
		}

		info, err := os.Stat(d)
		if err != nil {
			errs = append(errs, err)
			continue
		}
		if !info.IsDir() {
			errs = append(errs, errors.Errorf("%s: not a directory", d))
			continue
		}

		if err := filepath.Walk(d, func(pathname string, info fs.FileInfo, err error) error {
			if err != nil {
				return err
			}

			// Skip directories.
			if info.IsDir() {
				return nil
			}

			// Skip any files that are not immediately within the
			// directory we are walking.
			// (In other words, do not descend into child directories.)
			dir, name := filepath.Split(pathname)
			if strings.TrimPrefix(dir, d+string(filepath.Separator)) != "" {
				println("skipping non-first-level file", pathname)
				return nil
			}

			// Is the file executable?
			if info.Mode()&fs.ModePerm&0111 != 0111 {
				println("skipping non-executable file", pathname)
				return nil
			}

			// Does this runner already exist in the mapping?
			// Similarly to states, we do not want to get into the
			// business of guessing precedence of runners.
			if other, exists := runners[name]; exists {
				println("skipping duplicate runner", pathname)
				errs = append(errs, errors.Errorf("runner %q already exists at %s", name, other.Path()))
				return nil
			}

			runners[name] = Runner{src: pathname}
			return nil
		}); err != nil {
			errs = append(errs, err)
		}
	}

	r.mu.Lock()
	defer r.mu.Unlock()
	r.runners = runners

	return kerrors.NewAggregate(errs)
}

// WriteTo ...
func (r *Runners) WriteTo(w io.Writer) (int64, error) {
	r.mu.RLock()
	defer r.mu.RUnlock()

	if len(r.runners) == 0 {
		return 0, nil
	}

	keys := make([]string, 0, len(r.runners))
	for k := range r.runners {
		keys = append(keys, k)
	}
	sort.Strings(keys)

	var written int64
	tw := tabwriter.NewWriter(w, 0, 8, 1, '\t', 0)
	for _, k := range keys {
		n, err := fmt.Fprintf(tw, "%s\t%s\n", k, r.runners[k].Path())
		if err != nil {
			return written, err
		}
		written += int64(n)
	}

	return written, errors.Wrap(tw.Flush(), "flush tabwriter")
}

// Runners returns the list of runners in r.
func (r *Runners) Runners() []Runner {
	r.mu.RLock()
	defer r.mu.RUnlock()

	if len(r.runners) == 0 {
		return nil
	}

	runners := make([]Runner, 0, len(r.runners))
	for _, r := range r.runners {
		runners = append(runners, r)
	}
	sort.SliceStable(runners, func(i, j int) bool {
		return runners[i].String() < runners[j].String()
	})
	return runners
}

// Get returns the runner with the given name,
// and whether or not that runner exists.
// The caller should always check the value of exists.
func (r *Runners) Get(name string) (runner Runner, exists bool) {
	rn, exists := r.runners[name]
	if !exists {
		return Runner{}, false
	}
	return rn, true
}

D pkg/server/call.go => pkg/server/call.go +0 -47
@@ 1,47 0,0 @@
package server

import (
	"encoding/json"
	"log"

	"github.com/go-mangos/mangos/protocol/req"
	"github.com/go-mangos/mangos/transport/ipc"
	"github.com/pkg/errors"
)

// Call connects to a server's local control socket, and calls the RPC method
// with the specified args.
func Call(method string, args ...string) error {
	sock, err := req.NewSocket()
	if err != nil {
		return errors.Wrap(err, "new req socket")
	}
	sock.AddTransport(ipc.NewTransport())
	if err := sock.Dial("ipc://" + "/tmp/govern.sock"); err != nil {
		return errors.Wrap(err, "dial control socket")
	}
	defer sock.Close()

	p, err := json.Marshal(&ControlMessage{
		Method: method,
		Args:   args,
	})
	if err != nil {
		return errors.Wrap(err, "marshal json")
	}

	if err := sock.Send(p); err != nil {
		return errors.Wrapf(err, "send %q", method)
	}

	rp, err := sock.Recv()
	if err != nil {
		return errors.Wrap(err, "recv")
	}
	var resp ControlMessageResponse
	if err := json.Unmarshal(rp, &resp); err != nil {
		return errors.Wrap(err, "unmarshal json response")
	}
	log.Printf("%s:%d: %s", resp.ID, resp.Status, resp.Message)
	return nil
}

D pkg/server/message.go => pkg/server/message.go +0 -13
@@ 1,13 0,0 @@
package server

type ControlMessage struct {
	Method string
	Args   []string
}

type ControlMessageResponse struct {
	ID      string
	Request ControlMessage
	Status  int
	Message string
}

D pkg/server/server.go => pkg/server/server.go +0 -277
@@ 1,277 0,0 @@
package server

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/go-mangos/mangos"
	"github.com/go-mangos/mangos/protocol/pub"
	"github.com/go-mangos/mangos/protocol/rep"
	"github.com/go-mangos/mangos/transport/ipc"
	"github.com/go-mangos/mangos/transport/tcp"
	"github.com/pkg/errors"
	"github.com/rs/xid"
)

const (
	// The default listen address of the PUB socket that downstream
	// agents will connect to, with SUB sockets. The PUB socket will
	// be used for sending control messages to agents.
	DefaultPubListenAddr = "0.0.0.0:4510"

	// Default listen address of the REP socket, that downstream agents
	// will connect to, with REP sockets. The REP socket will be used
	// for receiving data from agents.
	DefaultRepListenAddr = "0.0.0.0:4511"

	// Default path to a UNIX socket, which is used by command-line clients
	// for sending commands to the server.
	DefaultSocketPath = "/var/run/govern.sock"
)

type Option func(*Server) error

func PubListenAddr(urlStr string) Option {
	return func(s *Server) error {
		if urlStr != "" {
			s.pubListenAddr = urlStr
		}
		return nil
	}
}

func RepListenAddr(urlStr string) Option {
	return func(s *Server) error {
		if urlStr != "" {
			s.repListenAddr = urlStr
		}
		return nil
	}
}

func ControlSocketPath(name string) Option {
	return func(s *Server) error {
		if name != "" {
			s.ctlSockPath = name
		}
		return nil
	}
}

type Server struct {
	pubSock       mangos.Socket
	pubListenAddr string
	repSock       mangos.Socket
	repListenAddr string
	ctlSock       mangos.Socket
	ctlSockPath   string
}

func New(options ...Option) (*Server, error) {
	srv := &Server{
		pubListenAddr: DefaultPubListenAddr,
		repListenAddr: DefaultRepListenAddr,
		ctlSockPath:   DefaultSocketPath,
	}

	for i, option := range options {
		if err := option(srv); err != nil {
			return nil, errors.Wrapf(err, "apply option %d", i)
		}
	}

	return srv, nil
}

func (s *Server) Listen(ctx context.Context) error {
	if s.pubSock == nil {
		sock, err := pub.NewSocket()
		if err != nil {
			return errors.Wrap(err, "create pub socket")
		}
		sock.AddTransport(tcp.NewTransport())
		if err := sock.Listen("tcp://" + s.pubListenAddr); err != nil {
			return errors.Wrapf(err, "pub socket: listen on %q", s.pubListenAddr)
		}
		s.pubSock = sock
	}

	if s.repSock == nil {
		sock, err := rep.NewSocket()
		if err != nil {
			return errors.Wrap(err, "create rep socket")
		}
		sock.AddTransport(tcp.NewTransport())
		if err := sock.Listen("tcp://" + s.repListenAddr); err != nil {
			return errors.Wrapf(err, "rep socket: listen on %q", s.repListenAddr)
		}
		s.repSock = sock
	}

	if s.ctlSock == nil {
		sock, err := rep.NewSocket()
		if err != nil {
			return errors.Wrap(err, "create control socket")
		}
		sock.AddTransport(ipc.NewTransport())
		if err := sock.Listen("ipc://" + s.ctlSockPath); err != nil {
			return errors.Wrapf(err, "control socket: listen on %q", s.ctlSockPath)
		}
		s.ctlSock = sock
	}

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	errs := make(chan error)
	defer close(errs)

	var wg sync.WaitGroup
	wg.Add(3)

	go func() {
		defer wg.Done()
		if err, ok := <-s.startPubSocket(ctx); ok && err != nil {
			select {
			case errs <- err:
			default:
			}
		}
	}()

	go func() {
		defer wg.Done()
		if err, ok := <-s.startRepSocket(ctx); ok && err != nil {
			select {
			case errs <- err:
			default:
			}
		}
	}()

	go func() {
		defer wg.Done()
		if err, ok := <-s.startControlSocket(ctx); ok && err != nil {
			select {
			case errs <- err:
			default:
			}
		}
	}()

	select {
	case err, ok := <-errs:
		cancel()
		wg.Wait()
		if ok {
			return err
		}

	case <-ctx.Done():
		wg.Wait()
		return ctx.Err()
	}

	return nil
}

func (s *Server) startPubSocket(ctx context.Context) <-chan error {
	log.Println("Starting PUB socket")
	ch := make(chan error)
	go func() {
		defer close(ch)
		defer log.Println("PUB socket stopped")
		defer s.pubSock.Close()

		for {
			select {
			case <-ctx.Done():
				ch <- ctx.Err()
				return
			}
		}
	}()
	return ch
}

func (s *Server) startRepSocket(ctx context.Context) <-chan error {
	log.Println("Starting REP socket")
	ch := make(chan error)
	go func() {
		defer close(ch)
		defer log.Println("REP socket stopped")
		defer s.repSock.Close()

		for {
			select {
			case <-ctx.Done():
				ch <- ctx.Err()
				return
			}
		}
	}()
	return ch
}

func (s *Server) startControlSocket(ctx context.Context) <-chan error {
	log.Println("Starting control socket")
	ch := make(chan error)
	go func() {
		defer close(ch)
		defer log.Println("Control socket stopped")
		defer s.ctlSock.Close()

		for {
			s.ctlSock.SetOption(mangos.OptionRecvDeadline, 50*time.Millisecond)
			p, err := s.ctlSock.Recv()
			if err != nil && err == mangos.ErrRecvTimeout {
				select {
				case <-ctx.Done():
					ch <- ctx.Err()
					return
				default:
					continue
				}
			} else if err != nil {
				ch <- errors.Wrap(err, "control socket recv")
				return
			}
			select {
			case <-ctx.Done():
				ch <- ctx.Err()
				return
			default:
			}

			var msg ControlMessage
			if err := json.Unmarshal(p, &msg); err != nil {
				log.Println("Failed to parse control message:", err)
				continue
			}

			log.Printf("Received control message: %q args=%v", msg.Method, msg.Args)

			resp := &ControlMessageResponse{
				ID:      xid.New().String(),
				Request: msg,
				Status:  http.StatusNotImplemented,
				Message: "not implemented",
			}
			rp, err := json.Marshal(resp)
			if err != nil {
				log.Println("Failed to marshal control message response:", err)
				continue
			}

			if err := s.ctlSock.Send(rp); err != nil {
				ch <- errors.Wrap(err, "control socket: send response")
				return
			}
		}
	}()
	return ch
}

D pkg/state/state.go => pkg/state/state.go +0 -205
@@ 1,205 0,0 @@
package state

import (
	"fmt"
	"io"
	"io/fs"
	"os"
	"path"
	"path/filepath"
	"sort"
	"strings"
	"sync"
	"text/tabwriter"

	"git.sr.ht/~nesv/govern/pkg/resource"

	"github.com/pkg/errors"
	kerrors "k8s.io/apimachinery/pkg/util/errors"
)

// fileExt is the filename extension of state files.
const fileExt = ".hcl"

// Gather walks any directories passed in with the Dir option,
// and recursively collects all of the state files in those directories.
func Gather(options ...GatherOption) (*States, error) {
	var s States
	for _, option := range options {
		option(&s)
	}
	return &s, s.gather()
}

// GatherOption is a functional type that can be passed to Gather,
// to control how it collects state files.
type GatherOption func(*States)

// Dir adds the directory name to the list of directories Gather will
// look at when collecting state files.
func Dir(name string) GatherOption {
	return func(s *States) {
		s.dirs = append(s.dirs, name)
	}
}

// Ignore allows the caller to skip collecting state file names with the
// given pattern.
func Ignore(pattern string) GatherOption {
	return func(s *States) {
		s.ignore = append(s.ignore, pattern)
	}
}

// States represents a collection of state files gathered from the local
// filesystem.
type States struct {
	mu sync.RWMutex

	states map[string]State
	dirs   []string
	ignore []string
}

func (s *States) gather() error {
	var (
		states = make(map[string]State)
		errs   []error
	)
	for i, d := range s.dirs {
		if !filepath.IsAbs(d) {
			var err error
			d, err = filepath.Abs(d)
			if err != nil {
				errs = append(errs, errors.Wrapf(err, "make %q an absolute path", s.dirs[i]))
				continue
			}
		}

		info, err := os.Stat(d)
		if err != nil {
			errs = append(errs, errors.Wrap(err, "state directory"))
			continue
		}
		if !info.IsDir() {
			errs = append(errs, fmt.Errorf("not a directory: %s", d))
			continue
		}

		if err := filepath.Walk(d, func(pathname string, info fs.FileInfo, err error) error {
			if err != nil {
				return err
			}

			// Skip directories.
			if info.IsDir() {
				return nil
			}

			// Skip non-fact files.
			if filepath.Ext(pathname) != fileExt {
				return nil
			}

			// The state's name is its relative path within the
			// directory we are walking,
			// with the filename extension trimmed off.
			//
			// Example:
			//
			//   /usr/local/etc/govern/states.d/foo/bar/baz.hcl => foo/bar/baz
			//
			name := strings.TrimSuffix(strings.TrimPrefix(pathname, d+string(filepath.Separator)), fileExt)

			// Should we ignore this state?
			for _, pattern := range s.ignore {
				if matched, err := path.Match(pattern, name); err != nil {
					return errors.Wrap(err, "ignore fact")
				} else if matched {
					return nil
				}
			}

			// Does this state already exist in the mapping?
			// If so, error out.
			// There should be no "precedence" with states,
			// since having multiple states with the same name is
			// super dicey.
			if other, exists := states[name]; exists {
				return fmt.Errorf("state %q already defined at %s", name, other.Path())
			}

			states[name] = State{name: name, dir: d}
			return nil
		}); err != nil {
			errs = append(errs, fmt.Errorf("walk %s: %w", d, err))
		}
	}

	s.mu.Lock()
	defer s.mu.Unlock()
	s.states = states
	return kerrors.NewAggregate(errs)
}

// WriteTo writes the sorted names of discovered states to w.
func (s *States) WriteTo(w io.Writer) (int64, error) {
	s.mu.RLock()
	defer s.mu.RUnlock()

	if len(s.states) == 0 {
		return 0, nil
	}

	keys := make([]string, 0, len(s.states))
	for k := range s.states {
		keys = append(keys, k)
	}
	sort.Strings(keys)

	var written int64
	tw := tabwriter.NewWriter(w, 0, 8, 1, '\t', 0)
	for _, k := range keys {
		n, err := fmt.Fprintf(tw, "%s\t%s\n", k, s.states[k].Path())
		if err != nil {
			return written, err
		}
		written += int64(n)
	}
	return written, errors.Wrap(tw.Flush(), "flush tabwriter")
}

// States returns the states contained by s.
func (s *States) States() []State {
	states := make([]State, 0, len(s.states))
	for _, state := range s.states {
		states = append(states, state)
	}
	return states
}

// State represents an on-disk state file.
// The resources defined within a state file are only loaded when the
// Resources method is called.
type State struct {
	name, dir string
}

// Resources returns the resources defined in the state file backing s.
func (s State) Resources(options ...resource.ParseOption) ([]resource.Resource, error) {
	m, err := resource.Parse(s.Path(), options...)
	if err != nil {
		return nil, err
	}
	return m, nil
}

// Path returns the absolute path to the state file backing s.
func (s State) Path() string {
	return filepath.Clean(filepath.Join(s.dir, s.name) + fileExt)
}

// String implements the fmt.Stringer interface.
func (s State) String() string {
	return s.name
}

D runners/pkg/arch_linux.go => runners/pkg/arch_linux.go +0 -119
@@ 1,119 0,0 @@
package main

import (
	"fmt"
	"strings"

	"git.sr.ht/~nesv/govern/runner"
)

func archInstalled(r *runner.Runner, name string, attrs map[string]string) error {
	packageName := name
	if v := attrs["version"]; v != "" {
		packageName = fmt.Sprintf("%s%s%s", packageName, "=", v)
	}

	// Are we running in "pretend" (a.k.a. "no-op mode")?
	if r.Pretend() {
		installed, err := archPackageInstalled(packageName)
		if err != nil {
			return fmt.Errorf("get installed version: %w", err)
		}

		if installed {
			return runner.OK()
		}

		fmt.Println("would be installed")
		return nil
	}

	// Run the command!
	if _, err := runner.Command("sudo", "pacman", "-Sq", "--needed", "--noconfirm", packageName); err != nil {
		return err
	}

	return runner.OK()
}

func archPackageInstalled(pkgname string) (bool, error) {
	version, err := archInstalledVersion(pkgname)
	if err != nil {
		return false, err
	}

	return version != "", nil
}

func archInstalledVersion(pkgname string) (string, error) {
	output, err := runner.Command("pacman", "-Q", pkgname)
	if err != nil && strings.Contains(output, "was not found") {
		return "", nil
	} else if err != nil {
		return "", fmt.Errorf("query pacman: %w", err)
	}

	fields := strings.Fields(output)
	if n := len(fields); n < 2 {
		return "", fmt.Errorf("expected 2 fields, got %d", n)
	}

	return fields[1], nil
}

func archLatest(r *runner.Runner, name string, attrs map[string]string) error {
	version := attrs["version"]
	if r.Pretend() {
		installedVersion, err := archInstalledVersion(name)
		if err != nil {
			return err
		}

		if installedVersion == "" {
			fmt.Println("will be installed")
			return nil
		}

		if installedVersion != version {
			fmt.Println("will be upgraded")
			return nil
		}

		return runner.OK()
	}

	if version != "" {
		name = fmt.Sprintf("%s=%s", name, version)
	}

	if _, err := runner.Command("sudo", "pacman", "-Syq", "--needed", "--noconfirm", name); err != nil {
		return err
	}

	return runner.OK()
}

func archRemoved(r *runner.Runner, name string, attrs map[string]string) error {
	if r.Pretend() {
		version, err := archInstalledVersion(name)
		if err != nil {
			return err
		}

		if version == "" {
			return runner.OK()
		}

		fmt.Println("will be removed")
		return nil
	}

	output, err := runner.Command("sudo", "pacman", "-R", "--noconfirm", name)
	if err != nil && strings.Contains(output, "target not found") {
		return runner.OK()
	} else if err != nil {
		return err
	}

	return runner.OK()
}