~charles/logquery

77340b2c12da6538aabab08026551b8f0374ddc9 — Charles Daniels 3 years ago 7bdfd62 go-rewrite
Revert "try adding async reading support"

This reverts commit 7bdfd627cf94173a7b30357ecfb6262b75489acc.
4 files changed, 23 insertions(+), 76 deletions(-)

M logquery.go
M logquery_test.go
M parser/parser.go
M parser/parser_test.go
M logquery.go => logquery.go +1 -1
@@ 129,7 129,7 @@ func formatRecordLogFmt(record []interface{}, keys []string, header bool) string
// returns. This is used when the caller has encountered an error and needs to
// exit early. If the parser is asked to quit, the out channel will be closed.
func backgroundParser(stream io.Reader, out chan interface{}, quit chan int) {
	p, err := parser.NewParser(stream, true)
	p, err := parser.NewParser(stream)
	if err != nil {
		out <- err
		close(out)

M logquery_test.go => logquery_test.go +2 -2
@@ 16,9 16,9 @@ import (
)

func assertLogfmtEqual(t *testing.T, lf1, lf2 string) {
	p1, err := parser.NewParserFromString(lf1, false)
	p1, err := parser.NewParserFromString(lf1)
	assert.Nil(t, err)
	p2, err := parser.NewParserFromString(lf2, false)
	p2, err := parser.NewParserFromString(lf2)
	assert.Nil(t, err)

	for {

M parser/parser.go => parser/parser.go +10 -63
@@ 57,10 57,6 @@ import (
	"strings"
)

// BackgroundReaderChannelSize specify how large the buffered channel should be
// for the asynchrous reader option.
var BackgroundReaderChannelSize int = 1024 * 128

// SyntaxError indicates a syntax error has been detected. To check if an error
// is a syntax error, use the IsSyntaxError() method.
type SyntaxError struct {


@@ 86,52 82,27 @@ func isHexDigit(r rune) bool {

// Parser is used to store state while processing input data.
type Parser struct {
	reader   *bufio.Reader
	current  rune
	peek     rune
	col      int
	line     int
	iseof    bool
	async    bool
	runeChan chan interface{}
}

// backgroundReader reads runes from the given buffered reader and writes them
// into the channel. When io.EOF is received, the channel is closed. If any
// errors occur, they are passed instead of rune. io.EOF is propogated.
func backgroundReader(reader *bufio.Reader, runeChan chan interface{}) {
	for {
		r, _, err := reader.ReadRune()
		if err == io.EOF {
			runeChan <- err
			close(runeChan)
			return
		} else if err != nil {
			runeChan <- err
			return
		}

		runeChan <- r
	}
	reader  *bufio.Reader
	current rune
	peek    rune
	col     int
	line    int
	iseof   bool
}

// NewParserFromString instantiates a new logfmt parser on the specified text,
// given as a string.
func NewParserFromString(text string, asyncReader bool) (*Parser, error) {
	return NewParser(strings.NewReader(text), asyncReader)
func NewParserFromString(text string) (*Parser, error) {
	return NewParser(strings.NewReader(text))
}

// NewParser instantiates a new logfmt parser on the specified stream.
//
// If asyncReader is True, then the I/O will be performed using a bakground
// goroutine.
func NewParser(stream io.Reader, asyncReader bool) (*Parser, error) {
func NewParser(stream io.Reader) (*Parser, error) {
	p := &Parser{
		reader: bufio.NewReader(stream),
		col:    1,
		line:   1,
		iseof:  false,
		async:  false,
	}

	var err error


@@ 140,15 111,6 @@ func NewParser(stream io.Reader, asyncReader bool) (*Parser, error) {
		return nil, err
	}

	if asyncReader {
		p.async = true
		p.runeChan = make(chan interface{}, BackgroundReaderChannelSize)
		go backgroundReader(p.reader, p.runeChan)

		// Make sure we never attempt to use the reader again.
		p.reader = nil
	}

	return p, nil
}



@@ 182,22 144,7 @@ func (p *Parser) consumeWhitespace() error {
func (p *Parser) next() (rune, error) {
	c := p.current

	var adv rune
	var err error

	if p.async {
		runeOrErr := <-p.runeChan
		if r, ok := runeOrErr.(rune); ok {
			adv = r
		} else if r, ok := runeOrErr.(error); ok {
			err = r
		} else {
			panic(fmt.Sprintf("If you see this, you have found a bug. Async reader gave us an unhandled type: %#v", runeOrErr))
		}
	} else {
		adv, _, err = p.reader.ReadRune()
	}

	adv, _, err := p.reader.ReadRune()
	if err == io.EOF {
		p.iseof = true
		err = nil

M parser/parser_test.go => parser/parser_test.go +10 -10
@@ 7,7 7,7 @@ import (
)

func Test_Simple1(t *testing.T) {
	p, err := NewParserFromString("foo=bar", false)
	p, err := NewParserFromString("foo=bar")

	assert.Nil(t, err)
	rec, err := p.NextRecord()


@@ 20,7 20,7 @@ func Test_Simple1(t *testing.T) {
}

func Test_Simple2(t *testing.T) {
	p, err := NewParserFromString("foo=bar spam=\"green eggs and ham\" baz quux=42 pi=3.14159 quoted_int=\"123\"", false)
	p, err := NewParserFromString("foo=bar spam=\"green eggs and ham\" baz quux=42 pi=3.14159 quoted_int=\"123\"")

	assert.Nil(t, err)
	rec, err := p.NextRecord()


@@ 39,7 39,7 @@ func Test_Simple2(t *testing.T) {

func Test_MultiLineString(t *testing.T) {
	// A string with an un-escaped newline in it should be a syntax error.
	p, err := NewParserFromString("key=\"line1\nline2\"", false)
	p, err := NewParserFromString("key=\"line1\nline2\"")
	assert.Nil(t, err)

	_, err = p.NextRecord()


@@ 49,7 49,7 @@ func Test_MultiLineString(t *testing.T) {

func Test_IllegalKeys(t *testing.T) {
	// A string with an un-escaped newline in it should be a syntax error.
	p, err := NewParserFromString("'=foo'\n';foo'\n'\"foo'\n\"'\"foo\nfoo", false)
	p, err := NewParserFromString("'=foo'\n';foo'\n'\"foo'\n\"'\"foo\nfoo")
	assert.Nil(t, err)

	_, err = p.NextRecord()


@@ 77,7 77,7 @@ func Test_IllegalKeys(t *testing.T) {
}

func Test_EscapeChars(t *testing.T) {
	p, err := NewParserFromString("key=\"\\a\\b\\\\\\t\\n\\f\\r\\v\\'\\\"\\x61\"", false)
	p, err := NewParserFromString("key=\"\\a\\b\\\\\\t\\n\\f\\r\\v\\'\\\"\\x61\"")

	assert.Nil(t, err)
	rec, err := p.NextRecord()


@@ 90,7 90,7 @@ func Test_EscapeChars(t *testing.T) {
}

func Test_MultiRecords(t *testing.T) {
	p, err := NewParserFromString("key1=val1\nkey2=val2", false)
	p, err := NewParserFromString("key1=val1\nkey2=val2")
	assert.Nil(t, err)

	rec, err := p.NextRecord()


@@ 110,7 110,7 @@ func Test_MultiRecords(t *testing.T) {
}

func Test_MultiRecordsWithBlankLine(t *testing.T) {
	p, err := NewParserFromString("key1=val1\n   \t\nkey2=val2", false)
	p, err := NewParserFromString("key1=val1\n   \t\nkey2=val2")
	assert.Nil(t, err)

	rec, err := p.NextRecord()


@@ 132,7 132,7 @@ func Test_MultiRecordsWithBlankLine(t *testing.T) {
func Test_MissingSpace(t *testing.T) {
	// Test that if a space is missing between generated records, the
	// parser should throw a syntax error.
	p, err := NewParserFromString("key1=val1key2=val2", false)
	p, err := NewParserFromString("key1=val1key2=val2")
	assert.Nil(t, err)

	_, err = p.NextRecord()


@@ 143,7 143,7 @@ func Test_MissingSpace(t *testing.T) {
func Test_SyntaxError(t *testing.T) {
	// Test that if a space is missing between generated records, the
	// parser should throw a syntax error.
	p, err := NewParserFromString("key1=val1key2=val2", false)
	p, err := NewParserFromString("key1=val1key2=val2")
	assert.Nil(t, err)

	_, err = p.NextRecord()


@@ 154,7 154,7 @@ func Test_SyntaxError(t *testing.T) {
func Test_SkipSyntaxError(t *testing.T) {
	// Test that if a space is missing between generated records, the
	// parser should throw a syntax error.
	p, err := NewParserFromString("key1='abc xyz\nkey1='123 456'\n", false)
	p, err := NewParserFromString("key1='abc xyz\nkey1='123 456'\n")
	assert.Nil(t, err)

	// The first record should throw an error due to the newline before