~emersion/soju

7f0d57a0154e9951c8b1190531c039bb42e4e604 — Simon Ser 23 days ago be31825 refactor-new-packages
Move MessageStore logic into its own package
5 files changed, 54 insertions(+), 29 deletions(-)

M downstream.go
R msgstore.go => msgstore/disk.go
A msgstore/msgstore.go
M upstream.go
M user.go
M downstream.go => downstream.go +4 -4
@@ 829,7 829,7 @@ func (dc *downstreamConn) welcome() error {
				continue
			}

			lastID, err := dc.user.msgStore.LastMsgID(net, target, time.Now())
			lastID, err := dc.user.msgStore.LastMsgID(&net.Network, target, time.Now())
			if err != nil {
				dc.logger.Printf("failed to get last message ID: %v", err)
				continue


@@ 856,7 856,7 @@ func (dc *downstreamConn) sendNetworkHistory(net *network) {
		}

		limit := 4000
		history, err := dc.user.msgStore.LoadLatestID(net, target, lastDelivered, limit)
		history, err := dc.user.msgStore.LoadLatestID(&net.Network, target, lastDelivered, limit)
		if err != nil {
			dc.logger.Printf("failed to send implicit history for %q: %v", target, err)
			continue


@@ 1607,9 1607,9 @@ func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
		var history []*irc.Message
		switch subcommand {
		case "BEFORE":
			history, err = dc.user.msgStore.LoadBeforeTime(uc.network, entity, timestamp, limit)
			history, err = dc.user.msgStore.LoadBeforeTime(&uc.network.Network, entity, timestamp, limit)
		case "AFTER":
			history, err = dc.user.msgStore.LoadAfterTime(uc.network, entity, timestamp, limit)
			history, err = dc.user.msgStore.LoadAfterTime(&uc.network.Network, entity, timestamp, limit)
		default:
			// TODO: support LATEST, BETWEEN
			return ircError{&irc.Message{

R msgstore.go => msgstore/disk.go +20 -20
@@ 1,4 1,4 @@
package soju
package msgstore

import (
	"bufio"


@@ 11,28 11,28 @@ import (

	"gopkg.in/irc.v3"

	"git.sr.ht/~emersion/soju/database"
	"git.sr.ht/~emersion/soju/ircutil"
)

const messageStoreMaxTries = 100
const diskMaxTries = 100

var escapeFilename = strings.NewReplacer("/", "-", "\\", "-")

// messageStore is a per-user store for IRC messages.
type messageStore struct {
type diskStore struct {
	root string

	files map[string]*os.File // indexed by entity
}

func newMessageStore(root, username string) *messageStore {
	return &messageStore{
		root:  filepath.Join(root, escapeFilename.Replace(username)),
func NewDisk(root string, user *database.User) Store {
	return &diskStore{
		root:  filepath.Join(root, escapeFilename.Replace(user.Username)),
		files: make(map[string]*os.File),
	}
}

func (ms *messageStore) logPath(network *network, entity string, t time.Time) string {
func (ms *diskStore) logPath(network *database.Network, entity string, t time.Time) string {
	year, month, day := t.Date()
	filename := fmt.Sprintf("%04d-%02d-%02d.log", year, month, day)
	return filepath.Join(ms.root, escapeFilename.Replace(network.GetName()), escapeFilename.Replace(entity), filename)


@@ 54,7 54,7 @@ func formatMsgID(network, entity string, t time.Time, offset int64) string {
}

// nextMsgID queries the message ID for the next message to be written to f.
func nextMsgID(network *network, entity string, t time.Time, f *os.File) (string, error) {
func nextMsgID(network *database.Network, entity string, t time.Time, f *os.File) (string, error) {
	offset, err := f.Seek(0, io.SeekEnd)
	if err != nil {
		return "", err


@@ 65,7 65,7 @@ func nextMsgID(network *network, entity string, t time.Time, f *os.File) (string
// LastMsgID queries the last message ID for the given network, entity and
// date. The message ID returned may not refer to a valid message, but can be
// used in history queries.
func (ms *messageStore) LastMsgID(network *network, entity string, t time.Time) (string, error) {
func (ms *diskStore) LastMsgID(network *database.Network, entity string, t time.Time) (string, error) {
	p := ms.logPath(network, entity, t)
	fi, err := os.Stat(p)
	if os.IsNotExist(err) {


@@ 76,7 76,7 @@ func (ms *messageStore) LastMsgID(network *network, entity string, t time.Time) 
	return formatMsgID(network.GetName(), entity, t, fi.Size()-1), nil
}

func (ms *messageStore) Append(network *network, entity string, msg *irc.Message) (string, error) {
func (ms *diskStore) Append(network *database.Network, entity string, msg *irc.Message) (string, error) {
	s := formatMessage(msg)
	if s == "" {
		return "", nil


@@ 131,7 131,7 @@ func (ms *messageStore) Append(network *network, entity string, msg *irc.Message
	return msgID, nil
}

func (ms *messageStore) Close() error {
func (ms *diskStore) Close() error {
	var closeErr error
	for _, f := range ms.files {
		if err := f.Close(); err != nil {


@@ 237,7 237,7 @@ func parseMessage(line, entity string, ref time.Time) (*irc.Message, time.Time, 
	return msg, t, nil
}

func (ms *messageStore) parseMessagesBefore(network *network, entity string, ref time.Time, limit int, afterOffset int64) ([]*irc.Message, error) {
func (ms *diskStore) parseMessagesBefore(network *database.Network, entity string, ref time.Time, limit int, afterOffset int64) ([]*irc.Message, error) {
	path := ms.logPath(network, entity, ref)
	f, err := os.Open(path)
	if err != nil {


@@ 293,7 293,7 @@ func (ms *messageStore) parseMessagesBefore(network *network, entity string, ref
	}
}

func (ms *messageStore) parseMessagesAfter(network *network, entity string, ref time.Time, limit int) ([]*irc.Message, error) {
func (ms *diskStore) parseMessagesAfter(network *database.Network, entity string, ref time.Time, limit int) ([]*irc.Message, error) {
	path := ms.logPath(network, entity, ref)
	f, err := os.Open(path)
	if err != nil {


@@ 323,11 323,11 @@ func (ms *messageStore) parseMessagesAfter(network *network, entity string, ref 
	return history, nil
}

func (ms *messageStore) LoadBeforeTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
func (ms *diskStore) LoadBeforeTime(network *database.Network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
	history := make([]*irc.Message, limit)
	remaining := limit
	tries := 0
	for remaining > 0 && tries < messageStoreMaxTries {
	for remaining > 0 && tries < diskMaxTries {
		buf, err := ms.parseMessagesBefore(network, entity, t, remaining, -1)
		if err != nil {
			return nil, err


@@ 346,12 346,12 @@ func (ms *messageStore) LoadBeforeTime(network *network, entity string, t time.T
	return history[remaining:], nil
}

func (ms *messageStore) LoadAfterTime(network *network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
func (ms *diskStore) LoadAfterTime(network *database.Network, entity string, t time.Time, limit int) ([]*irc.Message, error) {
	var history []*irc.Message
	remaining := limit
	tries := 0
	now := time.Now()
	for remaining > 0 && tries < messageStoreMaxTries && t.Before(now) {
	for remaining > 0 && tries < diskMaxTries && t.Before(now) {
		buf, err := ms.parseMessagesAfter(network, entity, t, remaining)
		if err != nil {
			return nil, err


@@ 374,7 374,7 @@ func truncateDay(t time.Time) time.Time {
	return time.Date(year, month, day, 0, 0, 0, 0, t.Location())
}

func (ms *messageStore) LoadLatestID(network *network, entity, id string, limit int) ([]*irc.Message, error) {
func (ms *diskStore) LoadLatestID(network *database.Network, entity, id string, limit int) ([]*irc.Message, error) {
	var afterTime time.Time
	var afterOffset int64
	if id != "" {


@@ 393,7 393,7 @@ func (ms *messageStore) LoadLatestID(network *network, entity, id string, limit 
	t := time.Now()
	remaining := limit
	tries := 0
	for remaining > 0 && tries < messageStoreMaxTries && !truncateDay(t).Before(afterTime) {
	for remaining > 0 && tries < diskMaxTries && !truncateDay(t).Before(afterTime) {
		var offset int64 = -1
		if afterOffset >= 0 && truncateDay(t).Equal(afterTime) {
			offset = afterOffset

A msgstore/msgstore.go => msgstore/msgstore.go +24 -0
@@ 0,0 1,24 @@
package msgstore

import (
	"time"

	"gopkg.in/irc.v3"

	"git.sr.ht/~emersion/soju/database"
)

// Store is a per-user store for IRC messages.
//
// A Store is not safe for concurrent use from multiple goroutines.
type Store interface {
	Close() error
	// LastMsgID queries the last message ID for the given network, entity and
	// date. The message ID returned may not refer to a valid message, but can
	// be used in history queries.
	LastMsgID(network *database.Network, entity string, t time.Time) (string, error)
	LoadBeforeTime(network *database.Network, entity string, t time.Time, limit int) ([]*irc.Message, error)
	LoadAfterTime(network *database.Network, entity string, t time.Time, limit int) ([]*irc.Message, error)
	LoadLatestID(network *database.Network, entity, msgID string, limit int) ([]*irc.Message, error)
	Append(network *database.Network, entity string, msg *irc.Message) (msgID string, err error)
}

M upstream.go => upstream.go +2 -2
@@ 1628,7 1628,7 @@ func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {

	history, ok := uc.network.history[entity]
	if !ok {
		lastID, err := uc.user.msgStore.LastMsgID(uc.network, entity, time.Now())
		lastID, err := uc.user.msgStore.LastMsgID(&uc.network.Network, entity, time.Now())
		if err != nil {
			uc.logger.Printf("failed to log message: failed to get last message ID: %v", err)
			return


@@ 1652,7 1652,7 @@ func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
		}
	}

	msgID, err := uc.user.msgStore.Append(uc.network, entity, msg)
	msgID, err := uc.user.msgStore.Append(&uc.network.Network, entity, msg)
	if err != nil {
		uc.logger.Printf("failed to log message: %v", err)
		return

M user.go => user.go +4 -3
@@ 10,6 10,7 @@ import (
	"gopkg.in/irc.v3"

	"git.sr.ht/~emersion/soju/database"
	"git.sr.ht/~emersion/soju/msgstore"
)

type event interface{}


@@ 251,7 252,7 @@ type user struct {

	networks        []*network
	downstreamConns []*downstreamConn
	msgStore        *messageStore
	msgStore        msgstore.Store

	// LIST commands in progress
	pendingLISTs []pendingLIST


@@ 264,9 265,9 @@ type pendingLIST struct {
}

func newUser(srv *Server, record *database.User) *user {
	var msgStore *messageStore
	var msgStore msgstore.Store
	if srv.LogPath != "" {
		msgStore = newMessageStore(srv.LogPath, record.Username)
		msgStore = msgstore.NewDisk(srv.LogPath, record)
	}

	return &user{