~egtann/srp

ac438ce80b2e493a34ee210a7ccaf17d94345935 — Evan Tann 3 years ago 747923b
fix bugs
2 files changed, 34 insertions(+), 11 deletions(-)

M cmd/srp/main.go
M proxy.go
M cmd/srp/main.go => cmd/srp/main.go +8 -3
@@ 80,7 80,9 @@ func main() {
			log.Fatal(err)
		}
	}()
	proxy.CheckHealth()
	if err = proxy.CheckHealth(); err != nil {
		log.Println("check health", err)
	}
	sighupCh := make(chan bool)
	go hotReloadConfig(*config, proxy, sighupCh)
	go checkHealth(proxy, sighupCh)


@@ 98,12 100,15 @@ func (l *Logger) Printf(format string, vals ...interface{}) {
// check when the reloaded channel receives a message, so a new health check
// with the new registry can be started.
func checkHealth(proxy *srp.ReverseProxy, sighupCh <-chan bool) {
	ticker := time.NewTicker(10 * time.Second)
	ticker := time.NewTicker(3 * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			proxy.CheckHealth()
			err := proxy.CheckHealth()
			if err != nil {
				log.Println("check health", err)
			}
		case <-sighupCh:
			return
		}

M proxy.go => proxy.go +26 -8
@@ 1,6 1,8 @@
package srp

import (
	"bytes"
	"encoding/gob"
	"encoding/json"
	"fmt"
	"io/ioutil"


@@ 113,19 115,30 @@ func (r Registry) Hosts() []string {
	return hosts
}

func (r Registry) clone() Registry {
func (r *ReverseProxy) cloneRegistry() (Registry, error) {
	r.mu.RLock()
	defer r.mu.RUnlock()
	var byt bytes.Buffer
	enc := gob.NewEncoder(&byt)
	if err := enc.Encode(r.reg); err != nil {
		return nil, errors.Wrap(err, "encode")
	}
	dec := gob.NewDecoder(&byt)
	clone := Registry{}
	for k, v := range r {
		clone[k] = v
	if err := dec.Decode(&clone); err != nil {
		return nil, errors.Wrap(err, "decode")
	}
	return clone
	return clone, nil
}

// CheckHealth of backend servers in the registry concurrently, and update the
// registry so requests are only routed to healthy servers.
func (r *ReverseProxy) CheckHealth() {
func (r *ReverseProxy) CheckHealth() error {
	checks := []*healthCheck{}
	regClone := r.reg.clone()
	regClone, err := r.cloneRegistry()
	if err != nil {
		return errors.Wrap(err, "clone registry")
	}
	for host, frontend := range regClone {
		if frontend.HealthPath == "" {
			frontend.liveBackends = frontend.Backends


@@ 141,7 154,7 @@ func (r *ReverseProxy) CheckHealth() {
		}
	}
	if len(checks) == 0 {
		return
		return nil
	}
	go func() {
		for _, check := range checks {


@@ 159,6 172,7 @@ func (r *ReverseProxy) CheckHealth() {
		log.Printf("check health: %s 200 OK\n", check.ip)
	}
	r.UpdateRegistry(regClone)
	return nil
}

// UpdateRegistry for the reverse proxy with new frontends, backends, and


@@ 195,7 209,11 @@ func newTransport(reg Registry) http.RoundTripper {
	return &http.Transport{
		Proxy: http.ProxyFromEnvironment,
		Dial: func(network, addr string) (net.Conn, error) {
			endpoints := reg[addr].liveBackends
			host, ok := reg[addr]
			if !ok {
				return nil, fmt.Errorf("no host for %s", addr)
			}
			endpoints := host.liveBackends
			if len(endpoints) == 0 {
				return nil, fmt.Errorf("no live backend for %s", addr)
			}