@@ 5,6 5,7 @@ import (
"context"
"encoding/json"
"fmt"
+ "log"
"net/http"
"os"
"path"
@@ 14,6 15,8 @@ import (
"time"
)
+const logPath = "/var/log/aproxy"
+
// Event represents a tracking event
type Event struct {
TS time.Time
@@ 31,6 34,7 @@ type Tracker struct {
c chan Event
wg sync.WaitGroup
buffer bytes.Buffer
+ mux sync.Mutex
}
func NewTracker() *Tracker {
@@ 38,23 42,36 @@ func NewTracker() *Tracker {
t.c = make(chan Event, 100000)
//t.done = make(chan bool, 1)
t.wg = sync.WaitGroup{}
- os.MkdirAll("/var/log/aproxy", 0700)
+ os.MkdirAll(logPath, 0700)
// Create func that processes events.
// start tracking...
t.wg.Add(1)
go func() {
- for evt := range t.c {
- // Handle event.
- data, err := json.Marshal(evt)
- if err != nil {
- fmt.Println("Boom")
- continue
- }
- t.buffer.Write(data)
- t.buffer.Write([]byte("\n"))
+ tick := time.Tick(60 * time.Second)
+ process := true
+ for process {
+ select {
+ case evt, ok := <-t.c:
+ if !ok { // Channel closed. Break out.
+ process = false
+ break
+ }
+ // Handle event.
+ data, err := json.Marshal(evt)
+ // If not if else/ then implement a function that does the stuff..
+ if err != nil {
+ log.Printf("Failed marshalling tracking json: %v", err)
+ } else {
+ t.buffer.Write(data)
+ t.buffer.Write([]byte("\n"))
- // Roll buffer if too big... (2MB at most)
- if t.buffer.Len() > 1024*1024*2 {
+ // Roll buffer if too big... (2MB at most)
+ if t.buffer.Len() > 1024*1024*2 {
+ t.flush()
+ }
+ }
+ case <-tick:
+ // Flush buffer.
t.flush()
}
}
@@ 65,8 82,15 @@ func NewTracker() *Tracker {
// flush flushes the buffer to disk.
func (t *Tracker) flush() {
- dirPath := "/var/log/aproxy"
- dir, err := os.Open(dirPath)
+ t.mux.Lock()
+ defer t.mux.Unlock()
+
+ data := t.buffer.Bytes()
+ if len(data) == 0 {
+ return
+ }
+
+ dir, err := os.Open(logPath)
if err != nil {
return
}
@@ 91,7 115,7 @@ func (t *Tracker) flush() {
var file *os.File
if len(tfiles) == 0 {
// Create a new file.
- file, err = os.Create(path.Join(dirPath, today+"-1.log"))
+ file, err = os.Create(path.Join(logPath, today+"-1.log"))
if err != nil {
panic(err)
return
@@ 101,13 125,13 @@ func (t *Tracker) flush() {
return tfiles[i].Name() > tfiles[j].Name()
})
if tfiles[0].Size() > 1024*1024*50 {
- file, err = os.Create(path.Join(dirPath, fmt.Sprintf("%v-%v.log", today, len(tfiles)+1)))
+ file, err = os.Create(path.Join(logPath, fmt.Sprintf("%v-%v.log", today, len(tfiles)+1)))
if err != nil {
panic(err)
return
}
} else {
- file, err = os.OpenFile(path.Join(dirPath, tfiles[0].Name()), os.O_APPEND|os.O_WRONLY, 0666)
+ file, err = os.OpenFile(path.Join(logPath, tfiles[0].Name()), os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
panic(err)
return
@@ 116,7 140,7 @@ func (t *Tracker) flush() {
}
defer file.Close()
- file.Write(t.buffer.Bytes())
+ file.Write(data)
t.buffer.Reset()
}
@@ 2,6 2,7 @@ package aproxy_test
import (
"context"
+ "fmt"
"keybase/sketchground/aproxygo/aproxy"
"net/http"
"testing"
@@ 20,5 21,6 @@ func BenchmarkTracker(b *testing.B) {
t.Track(ctx, req)
}
+ fmt.Println("Shutting down")
t.Shutdown()
}