From f54d5fbdfabf1ecf63357b635fadc9c45fd6adde Mon Sep 17 00:00:00 2001 From: Jens Zeilund Date: Thu, 26 Jul 2018 21:27:18 +0200 Subject: [PATCH] Flush log more often --- aproxy/tracker.go | 60 +++++++++++++++++++++++++++++------------- aproxy/tracker_test.go | 2 ++ 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/aproxy/tracker.go b/aproxy/tracker.go index 95ab715..353e9bd 100644 --- a/aproxy/tracker.go +++ b/aproxy/tracker.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "log" "net/http" "os" "path" @@ -14,6 +15,8 @@ import ( "time" ) +const logPath = "/var/log/aproxy" + // Event represents a tracking event type Event struct { TS time.Time @@ -31,6 +34,7 @@ type Tracker struct { c chan Event wg sync.WaitGroup buffer bytes.Buffer + mux sync.Mutex } func NewTracker() *Tracker { @@ -38,23 +42,36 @@ func NewTracker() *Tracker { t.c = make(chan Event, 100000) //t.done = make(chan bool, 1) t.wg = sync.WaitGroup{} - os.MkdirAll("/var/log/aproxy", 0700) + os.MkdirAll(logPath, 0700) // Create func that processes events. // start tracking... t.wg.Add(1) go func() { - for evt := range t.c { - // Handle event. - data, err := json.Marshal(evt) - if err != nil { - fmt.Println("Boom") - continue - } - t.buffer.Write(data) - t.buffer.Write([]byte("\n")) + tick := time.Tick(60 * time.Second) + process := true + for process { + select { + case evt, ok := <-t.c: + if !ok { // Channel closed. Break out. + process = false + break + } + // Handle event. + data, err := json.Marshal(evt) + // If not if else/ then implement a function that does the stuff.. + if err != nil { + log.Printf("Failed marshalling tracking json: %v", err) + } else { + t.buffer.Write(data) + t.buffer.Write([]byte("\n")) - // Roll buffer if too big... (2MB at most) - if t.buffer.Len() > 1024*1024*2 { + // Roll buffer if too big... (2MB at most) + if t.buffer.Len() > 1024*1024*2 { + t.flush() + } + } + case <-tick: + // Flush buffer. t.flush() } } @@ -65,8 +82,15 @@ func NewTracker() *Tracker { // flush flushes the buffer to disk. func (t *Tracker) flush() { - dirPath := "/var/log/aproxy" - dir, err := os.Open(dirPath) + t.mux.Lock() + defer t.mux.Unlock() + + data := t.buffer.Bytes() + if len(data) == 0 { + return + } + + dir, err := os.Open(logPath) if err != nil { return } @@ -91,7 +115,7 @@ func (t *Tracker) flush() { var file *os.File if len(tfiles) == 0 { // Create a new file. - file, err = os.Create(path.Join(dirPath, today+"-1.log")) + file, err = os.Create(path.Join(logPath, today+"-1.log")) if err != nil { panic(err) return @@ -101,13 +125,13 @@ func (t *Tracker) flush() { return tfiles[i].Name() > tfiles[j].Name() }) if tfiles[0].Size() > 1024*1024*50 { - file, err = os.Create(path.Join(dirPath, fmt.Sprintf("%v-%v.log", today, len(tfiles)+1))) + file, err = os.Create(path.Join(logPath, fmt.Sprintf("%v-%v.log", today, len(tfiles)+1))) if err != nil { panic(err) return } } else { - file, err = os.OpenFile(path.Join(dirPath, tfiles[0].Name()), os.O_APPEND|os.O_WRONLY, 0666) + file, err = os.OpenFile(path.Join(logPath, tfiles[0].Name()), os.O_APPEND|os.O_WRONLY, 0666) if err != nil { panic(err) return @@ -116,7 +140,7 @@ func (t *Tracker) flush() { } defer file.Close() - file.Write(t.buffer.Bytes()) + file.Write(data) t.buffer.Reset() } diff --git a/aproxy/tracker_test.go b/aproxy/tracker_test.go index 9d024ae..559dc8a 100644 --- a/aproxy/tracker_test.go +++ b/aproxy/tracker_test.go @@ -2,6 +2,7 @@ package aproxy_test import ( "context" + "fmt" "keybase/sketchground/aproxygo/aproxy" "net/http" "testing" @@ -20,5 +21,6 @@ func BenchmarkTracker(b *testing.B) { t.Track(ctx, req) } + fmt.Println("Shutting down") t.Shutdown() } -- 2.30.1