~emersion/soju

f7894e612b13e851f0def074fc929ce5ad6121a8 — delthas 4 years ago 1fa5195
Add support for downstream CHATHISTORY

This adds support for the WIP (at the time of this commit)
draft/chathistory extension, based on the draft at [1] and the
additional comments at [2].

This gets the history by parsing the chat logs, and is therefore only
enabled when the logs are enabled and the log path is configured.

Getting the history only from the logs adds some restrictions:
- we cannot get history by msgid (those are not logged)
- we cannot get the users masks (maybe they could be inferred from the
  JOIN etc, but it is not worth the effort and would not work every
  time)

The regular soju network history is not sent to clients that support
draft/chathistory, so that they can fetch what they need by manually
calling CHATHISTORY.

The only supported command is BEFORE for now, because that is the only
required command for an app that offers an "infinite history scrollback"
feature.

Regarding implementation, rather than reading the file from the end in
reverse, we simply start from the beginning of each log file, store each
PRIVMSG into a ring, then add the last lines of that ring into the
history we'll return later. The message parsing implementation must be
kept somewhat fast because an app could potentially request thousands of
messages in several files. Here we are using simple sscanf and indexOf
rather than regexps.

In case some log files do not contain any message (for example because
the user had not joined a channel at that time), we try up to a 100 days
of empty log files before giving up.

[1]: https://github.com/prawnsalad/ircv3-specifications/pull/3/files
[2]: https://github.com/ircv3/ircv3-specifications/pull/393/files#r350210018
3 files changed, 197 insertions(+), 9 deletions(-)

M downstream.go
M logger.go
M server.go
M downstream.go => downstream.go +114 -0
@@ 45,6 45,13 @@ func newNeedMoreParamsError(cmd string) ircError {
	}}
}

func newChatHistoryError(subcommand string, target string) ircError {
	return ircError{&irc.Message{
		Command: "FAIL",
		Params:  []string{"CHATHISTORY", "MESSAGE_ERROR", subcommand, target, "Messages could not be retrieved"},
	}}
}

var errAuthFailed = ircError{&irc.Message{
	Command: irc.ERR_PASSWDMISMATCH,
	Params:  []string{"*", "Invalid username or password"},


@@ 107,6 114,9 @@ func newDownstreamConn(srv *Server, netConn net.Conn, id uint64) *downstreamConn
	for k, v := range permanentDownstreamCaps {
		dc.supportedCaps[k] = v
	}
	if srv.LogPath != "" {
		dc.supportedCaps["draft/chathistory"] = ""
	}
	return dc
}



@@ 785,6 795,7 @@ func (dc *downstreamConn) welcome() error {
		Params:  []string{dc.nick, dc.srv.Hostname, "soju", "aiwroO", "OovaimnqpsrtklbeI"},
	})
	// TODO: RPL_ISUPPORT
	// TODO: send CHATHISTORY in RPL_ISUPPORT when implemented
	dc.SendMessage(&irc.Message{
		Prefix:  dc.srv.prefix(),
		Command: irc.ERR_NOMOTD,


@@ 825,6 836,9 @@ func (dc *downstreamConn) welcome() error {
}

func (dc *downstreamConn) sendNetworkHistory(net *network) {
	if dc.caps["draft/chathistory"] {
		return
	}
	for target, history := range net.history {
		if ch, ok := net.channels[target]; ok && ch.Detached {
			continue


@@ 1510,6 1524,106 @@ func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
			Command: "INVITE",
			Params:  []string{upstreamUser, upstreamChannel},
		})
	case "CHATHISTORY":
		var subcommand string
		if err := parseMessageParams(msg, &subcommand); err != nil {
			return err
		}
		var target, criteria, limitStr string
		if err := parseMessageParams(msg, nil, &target, &criteria, &limitStr); err != nil {
			return ircError{&irc.Message{
				Command: "FAIL",
				Params:  []string{"CHATHISTORY", "NEED_MORE_PARAMS", subcommand, "Missing parameters"},
			}}
		}

		if dc.srv.LogPath == "" {
			return ircError{&irc.Message{
				Command: irc.ERR_UNKNOWNCOMMAND,
				Params:  []string{dc.nick, subcommand, "Unknown command"},
			}}
		}

		uc, entity, err := dc.unmarshalEntity(target)
		if err != nil {
			return err
		}

		// TODO: support msgid criteria
		criteriaParts := strings.SplitN(criteria, "=", 2)
		if len(criteriaParts) != 2 || criteriaParts[0] != "timestamp" {
			return ircError{&irc.Message{
				Command: "FAIL",
				Params:  []string{"CHATHISTORY", "UNKNOWN_CRITERIA", criteria, "Unknown criteria"},
			}}
		}

		timestamp, err := time.Parse(serverTimeLayout, criteriaParts[1])
		if err != nil {
			return ircError{&irc.Message{
				Command: "FAIL",
				Params:  []string{"CHATHISTORY", "INVALID_CRITERIA", criteria, "Invalid criteria"},
			}}
		}

		limit, err := strconv.Atoi(limitStr)
		if err != nil || limit < 0 || limit > dc.srv.HistoryLimit {
			return ircError{&irc.Message{
				Command: "FAIL",
				Params:  []string{"CHATHISTORY", "INVALID_LIMIT", limitStr, "Invalid limit"},
			}}
		}

		switch subcommand {
		case "BEFORE":
			batchRef := "history"
			dc.SendMessage(&irc.Message{
				Prefix:  dc.srv.prefix(),
				Command: "BATCH",
				Params:  []string{"+" + batchRef, "chathistory", target},
			})

			history := make([]*irc.Message, limit)
			remaining := limit

			tries := 0
			for remaining > 0 {
				buf, err := parseMessagesBefore(uc.network, entity, timestamp, remaining)
				if err != nil {
					dc.logger.Printf("failed parsing log messages for chathistory: %v", err)
					return newChatHistoryError(subcommand, target)
				}
				if len(buf) == 0 {
					tries++
					if tries >= 100 {
						break
					}
				} else {
					tries = 0
				}
				copy(history[remaining-len(buf):], buf)
				remaining -= len(buf)
				year, month, day := timestamp.Date()
				timestamp = time.Date(year, month, day, 0, 0, 0, 0, timestamp.Location()).Add(-1)
			}

			for _, m := range history[remaining:] {
				m.Tags["batch"] = irc.TagValue(batchRef)
				dc.SendMessage(dc.marshalMessage(m, uc.network))
			}

			dc.SendMessage(&irc.Message{
				Prefix:  dc.srv.prefix(),
				Command: "BATCH",
				Params:  []string{"-" + batchRef},
			})
		default:
			// TODO: support AFTER, LATEST, BETWEEN
			return ircError{&irc.Message{
				Command: "FAIL",
				Params:  []string{"CHATHISTORY", "UNKNOWN_COMMAND", subcommand, "Unknown command"},
			}}
		}
	default:
		dc.logger.Printf("unhandled message: %v", msg)
		return newUnknownCommandError(msg.Command)

M logger.go => logger.go +72 -0
@@ 1,6 1,7 @@
package soju

import (
	"bufio"
	"fmt"
	"os"
	"path/filepath"


@@ 132,3 133,74 @@ func formatMessage(msg *irc.Message) string {
		return ""
	}
}

func parseMessagesBefore(network *network, entity string, timestamp time.Time, limit int) ([]*irc.Message, error) {
	year, month, day := timestamp.Date()
	path := logPath(network, entity, timestamp)
	f, err := os.Open(path)
	if err != nil {
		if os.IsNotExist(err) {
			return nil, nil
		}
		return nil, err
	}
	defer f.Close()

	historyRing := make([]*irc.Message, limit)
	cur := 0

	sc := bufio.NewScanner(f)
	for sc.Scan() {
		line := sc.Text()
		var hour, minute, second int
		_, err := fmt.Sscanf(line, "[%02d:%02d:%02d] ", &hour, &minute, &second)
		if err != nil {
			return nil, err
		}
		message := line[11:]
		// TODO: support NOTICE
		if !strings.HasPrefix(message, "<") {
			continue
		}
		i := strings.Index(message, "> ")
		if i == -1 {
			continue
		}
		t := time.Date(year, month, day, hour, minute, second, 0, time.Local)
		if !t.Before(timestamp) {
			break
		}

		sender := message[1:i]
		text := message[i+2:]
		historyRing[cur%limit] = &irc.Message{
			Tags: map[string]irc.TagValue{
				"time": irc.TagValue(t.UTC().Format(serverTimeLayout)),
			},
			Prefix: &irc.Prefix{
				Name: sender,
			},
			Command: "PRIVMSG",
			Params:  []string{entity, text},
		}
		cur++
	}
	if sc.Err() != nil {
		return nil, sc.Err()
	}

	n := limit
	if cur < limit {
		n = cur
	}
	start := (cur - n + limit) % limit

	if start+n <= limit { // ring doesnt wrap
		return historyRing[start : start+n], nil
	} else { // ring wraps
		history := make([]*irc.Message, n)
		r := copy(history, historyRing[start:])
		copy(history[r:], historyRing[:n-r])
		return history, nil
	}
}

M server.go => server.go +11 -9
@@ 38,11 38,12 @@ func (l *prefixLogger) Printf(format string, v ...interface{}) {
}

type Server struct {
	Hostname string
	Logger   Logger
	RingCap  int
	LogPath  string
	Debug    bool
	Hostname     string
	Logger       Logger
	RingCap      int
	HistoryLimit int
	LogPath      string
	Debug        bool

	db *DB



@@ 52,10 53,11 @@ type Server struct {

func NewServer(db *DB) *Server {
	return &Server{
		Logger:  log.New(log.Writer(), "", log.LstdFlags),
		RingCap: 4096,
		users:   make(map[string]*user),
		db:      db,
		Logger:       log.New(log.Writer(), "", log.LstdFlags),
		RingCap:      4096,
		HistoryLimit: 1000,
		users:        make(map[string]*user),
		db:           db,
	}
}