~jzs/aproxy

d3c09221d188769e41bb132d4b62ffcb97ab2d29 — Jens Zeilund 2 years ago 787b2e9
Add tracker, add config support
3 files changed, 234 insertions(+), 14 deletions(-)

M aproxy.go
A aproxy/tracker.go
A aproxy/tracker_test.go
M aproxy.go => aproxy.go +64 -14
@@ 1,13 1,16 @@
package main

import (
	"crypto/tls"
	"flag"
	"fmt"
	"keybase/sketchground/aproxygo/aproxy"
	"log"
	"net/http"
	"net/http/httputil"
	"net/url"
	"strings"
	"sync"
	"time"

	"golang.org/x/crypto/acme/autocert"
)


@@ 62,6 65,28 @@ func main() {
			Server: "static:///var/www/ikurvendk",
		},
	}

	flag.Parse()
	if len(flag.Args()) > 0 {
		cfgs = []Config{}
	}
	for _, a := range flag.Args() {
		arg := strings.Split(a, "=")
		if len(arg) != 2 {
			return
		}
		if arg[0] != "host" {
			continue
		}
		data := strings.Split(arg[1], ";")
		if len(data) != 2 {
			continue
		}
		host := data[0]
		server := data[1]
		cfgs = append(cfgs, Config{Host: host, Server: server})
	}

	hosts := []string{}

	// Load services...


@@ 84,39 109,64 @@ func main() {
		HostPolicy: autocert.HostWhitelist(hosts...),
	}

	tracker := aproxy.NewTracker()

	wg := sync.WaitGroup{}
	wg.Add(1)
	go func() {
		log.Println("Starting reverse proxy for ssl connections")
		s := &http.Server{
			Addr:      ":https",
			TLSConfig: &tls.Config{GetCertificate: mgr.GetCertificate},
			Handler:   &P{secure: true},
		}
		log.Fatal(s.ListenAndServeTLS("", ""))
		/*
			log.Println("Starting reverse proxy for ssl connections")
			s := &http.Server{
				Addr:      ":https",
				TLSConfig: &tls.Config{GetCertificate: mgr.GetCertificate},
				Handler:   NewP(true, tracker),
			}
			log.Fatal(s.ListenAndServeTLS("", ""))
		*/
		wg.Done()
	}()
	wg.Add(1)
	go func() {
		log.Println("Starting reverse proxy for http connections")
		log.Fatal(http.ListenAndServe(":http", mgr.HTTPHandler(&P{}))) // port 80
		log.Fatal(http.ListenAndServe(":8080", mgr.HTTPHandler(NewP(false, tracker)))) // port 80
		wg.Done()
	}()
	wg.Wait()

	// Shut down track channel...

	//TODO: Process left of logs for service. Then shut down.
	fmt.Println("SLeeping before shutdown")
	time.Sleep(10 * time.Second)
	fmt.Println("going down")
	tracker.Shutdown()
	fmt.Println("went down")
}

func NewP(secure bool, t *aproxy.Tracker) *P {
	p := &P{secure: secure, t: t}

	return p
}

// P struct
type P struct {
	secure bool
	t      *aproxy.Tracker
}

func (p *P) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
	log.Printf("Receiving quest on path: %v", req.URL)
	if !p.secure { // Redirect always if not secure.
		u := fmt.Sprintf("https://%v%v", req.Host, req.URL.Path)
		http.Redirect(rw, req, u, http.StatusFound)
		return
	}
	/*
		if !p.secure { // Redirect always if not secure.
			u := fmt.Sprintf("https://%v%v", req.Host, req.URL.Path)
			http.Redirect(rw, req, u, http.StatusFound)
			return
		}
	*/

	// TODO: Investigate if this should be guarded for concurrent access?
	p.t.Track(req.Context(), req)

	if h, ok := proxies[req.Host]; ok { // Check if we have proxies
		h.ServeHTTP(rw, req)

A aproxy/tracker.go => aproxy/tracker.go +146 -0
@@ 0,0 1,146 @@
package aproxy

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"os"
	"path"
	"sort"
	"strings"
	"sync"
	"time"
)

// Event represents a tracking event
type Event struct {
	TS             time.Time
	URL            string
	RemoteAddr     string
	UserAgent      string
	AcceptLanguage string
	Referer        string
	Method         string
	Host           string
}

// Tracker represents a tracker that logs info about http requests and logs them
type Tracker struct {
	c      chan Event
	wg     sync.WaitGroup
	buffer bytes.Buffer
}

func NewTracker() *Tracker {
	t := &Tracker{}
	t.c = make(chan Event, 100000)
	//t.done = make(chan bool, 1)
	t.wg = sync.WaitGroup{}
	os.MkdirAll("/var/log/aproxy", 0700)
	// Create func that processes events.
	//  start tracking...
	t.wg.Add(1)
	go func() {
		for evt := range t.c {
			// Handle event.
			data, err := json.Marshal(evt)
			if err != nil {
				fmt.Println("Boom")
				continue
			}
			t.buffer.Write(data)
			t.buffer.Write([]byte("\n"))

			// Roll buffer if too big... (2MB at most)
			if t.buffer.Len() > 1024*1024*2 {
				t.flush()
			}
		}
		t.wg.Done()
	}()
	return t
}

// flush flushes the buffer to disk.
func (t *Tracker) flush() {
	dirPath := "/var/log/aproxy"
	dir, err := os.Open(dirPath)
	if err != nil {
		return
	}
	infos, err := dir.Readdir(0)
	if err != nil {
		return
	}

	// Compute today string.
	today := time.Now().Format("2006-01-02")

	tfiles := []os.FileInfo{}

	// parse names. Find correct file.
	for _, i := range infos {
		// Handle infos...
		if strings.HasPrefix(i.Name(), today) {
			tfiles = append(tfiles, i)
		}
	}

	var file *os.File
	if len(tfiles) == 0 {
		// Create a new file.
		file, err = os.Create(path.Join(dirPath, today+"-1.log"))
		if err != nil {
			panic(err)
			return
		}
	} else {
		sort.Slice(tfiles, func(i, j int) bool {
			return tfiles[i].Name() > tfiles[j].Name()
		})
		if tfiles[0].Size() > 1024*1024*50 {
			file, err = os.Create(path.Join(dirPath, fmt.Sprintf("%v-%v.log", today, len(tfiles)+1)))
			if err != nil {
				panic(err)
				return
			}
		} else {
			file, err = os.OpenFile(path.Join(dirPath, tfiles[0].Name()), os.O_APPEND|os.O_WRONLY, 0666)
			if err != nil {
				panic(err)
				return
			}
		}
	}
	defer file.Close()

	file.Write(t.buffer.Bytes())

	t.buffer.Reset()
}

func (p *Tracker) Shutdown() {
	close(p.c)
	p.wg.Wait()

	p.flush()
}

// Perform logging...
func (p *Tracker) Track(ctx context.Context, r *http.Request) {
	// Consider respecting DNT header?
	h := r.Header
	evt := Event{
		TS:             time.Now(),
		URL:            r.URL.String(),
		RemoteAddr:     r.RemoteAddr,
		UserAgent:      strings.Join(h["User-Agent"], "|"),
		AcceptLanguage: strings.Join(h["Accept-Language"], "|"),
		Referer:        strings.Join(h["Referer"], "|"),
		Method:         r.Method,
		Host:           r.Host,
	}
	p.c <- evt
}

A aproxy/tracker_test.go => aproxy/tracker_test.go +24 -0
@@ 0,0 1,24 @@
package aproxy_test

import (
	"context"
	"keybase/sketchground/aproxygo/aproxy"
	"net/http"
	"testing"
)

func BenchmarkTracker(b *testing.B) {
	ctx := context.Background()
	t := aproxy.NewTracker()

	req, err := http.NewRequest("GET", "http://localhost/", nil)
	if err != nil {
		b.Fatal(err)
	}

	for n := 0; n < 500000; n++ {
		t.Track(ctx, req)
	}

	t.Shutdown()
}