~egtann/sls

ref: acd42d9b7da9 sls/client.go -rw-r--r-- 3.8 KiB
acd42d9b — Evan Tann add json payload stackdriver option 9 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package sls

import (
	"bytes"
	"encoding/json"
	"fmt"
	"net"
	"net/http"
	"sync"
	"time"
)

// Client is used to interact with the logging server.
type Client struct {
	client        HTTPClient
	url           string
	buf           []string
	mu            sync.Mutex
	errCh         chan error
	flushInterval time.Duration
}

// HTTPClient is satisfied by *http.Client but enables us to pass in
// alternative http clients as well with different features (such as automatic
// retries or allowing host-based communication over a LAN).
type HTTPClient interface {
	Do(*http.Request) (*http.Response, error)
}

// NewClient for interacting with sls.
func NewClient(url string) *Client {
	// These settings are lifted from Hashicorp's cleanhttp package.
	httpClient := &http.Client{
		Transport: &http.Transport{
			Proxy: http.ProxyFromEnvironment,
			DialContext: (&net.Dialer{
				Timeout:   30 * time.Second,
				KeepAlive: 30 * time.Second,
				DualStack: true,
			}).DialContext,
			MaxIdleConns:          100,
			IdleConnTimeout:       90 * time.Second,
			TLSHandshakeTimeout:   10 * time.Second,
			ExpectContinueTimeout: 1 * time.Second,
			MaxIdleConnsPerHost:   -1,
			DisableKeepAlives:     true,
		},
		Timeout: 10 * time.Second,
	}
	httpClient.Timeout = 10 * time.Second
	c := &Client{
		client: httpClient,
		url:    url,
	}
	return c
}

func (c *Client) WithHTTPClient(client HTTPClient) *Client {
	c.client = client
	return c
}

// WithFlushInterval specifies how long to wait before flushing the buffer to
// the log server. This returns a function which flushes the client and should
// be called with defer before main exits.
func (c *Client) WithFlushInterval(dur time.Duration) (*Client, func()) {
	c.flushInterval = dur
	go func() {
		for range time.Tick(dur) {
			c.flush()
		}
	}()
	return c, c.flush
}

// marshalBuffer to JSON. If the buffer is empty, marshalBuffer reports nil.
// This is not thread-safe, so protect any call with a mutex.
func (c *Client) marshalBuffer() ([]byte, error) {
	if len(c.buf) == 0 {
		return nil, nil
	}
	byt, err := json.Marshal(c.buf)
	c.buf = []string{}
	if err != nil {
		return nil, fmt.Errorf("marshal log buffer: %w", err)
	}
	return byt, nil
}

// flush the log buffer to the server. This happens automatically over time if
// WithFlushInterval is called.
func (c *Client) flush() {
	c.mu.Lock()
	defer c.mu.Unlock()

	byt, err := c.marshalBuffer()
	if err != nil {
		c.sendErr(fmt.Errorf("marshal buffer: %w", err))
		return
	}
	if len(byt) == 0 {
		return
	}
	uri := c.url + "/log"
	req, err := http.NewRequest(http.MethodPost, uri, bytes.NewReader(byt))
	if err != nil {
		c.sendErr(fmt.Errorf("new request: %w", err))
		return
	}
	req.Header.Set("Content-Type", "application/json")
	resp, err := c.client.Do(req)
	if err != nil {
		c.sendErr(fmt.Errorf("do: %w", err))
		return
	}
	defer resp.Body.Close()
	if resp.StatusCode != http.StatusOK {
		err = fmt.Errorf("expected 200, got %d: %s", resp.StatusCode, uri)
		c.sendErr(err)
		return
	}
}

// Err is a convenience function that wraps an error channel.
func (c *Client) Err() <-chan error {
	if c.errCh == nil {
		c.errCh = make(chan error)
	}
	return c.errCh
}

func (c *Client) sendErr(err error) {
	if c.errCh == nil {
		return
	}
	c.errCh <- err
}

// Log to sls. This is threadsafe.
func (c *Client) Log(s string) {
	c.mu.Lock()
	if c.flushInterval > 0 {
		c.buf = append(c.buf, s)
		c.mu.Unlock()
		return
	}

	// Don't buffer, flush immediately. Flush locks the mutex, so we unlock
	// before flush is called.
	c.buf = []string{s}
	c.mu.Unlock()
	c.flush()
}

// Write satisfies the io.Writer interface, so a client can be a drop-in
// replacement for stdout. Logs are written to the external service on a
// best-effort basis.
func (c *Client) Write(byt []byte) (int, error) {
	c.Log(string(byt))
	return len(byt), nil
}