~emersion/soju

92fece5cd4b29da4944263ffa258dc76dfe2a1f5 — Simon Ser 3 months ago b6f15c3
Nuke in-memory ring buffer

Instead, always read chat history from logs. Unify the implicit chat
history (pushing history to clients) and explicit chat history
(via the CHATHISTORY command).

Instead of keeping track of ring buffer cursors for each client, use
message IDs.

If necessary, the ring buffer could be re-introduced behind a
common MessageStore interface (could be useful when on-disk logs are
disabled).

References: https://todo.sr.ht/~emersion/soju/80
5 files changed, 41 insertions(+), 122 deletions(-)

M downstream.go
D ring.go
M server.go
M upstream.go
M user.go
M downstream.go => downstream.go +23 -12
@@ 850,13 850,27 @@ func (dc *downstreamConn) welcome() error {
			dc.sendNetworkHistory(net)
			delete(net.offlineClients, dc.clientName)
		}

		// Fast-forward history to last message
		for target, history := range net.history {
			if ch, ok := net.channels[target]; ok && ch.Detached {
				continue
			}

			lastID, err := lastMsgID(net, target, time.Now())
			if err != nil {
				dc.logger.Printf("failed to get last message ID: %v", err)
				continue
			}
			history.clients[dc.clientName] = lastID
		}
	})

	return nil
}

func (dc *downstreamConn) sendNetworkHistory(net *network) {
	if dc.caps["draft/chathistory"] {
	if dc.caps["draft/chathistory"] || dc.srv.LogPath == "" {
		return
	}
	for target, history := range net.history {


@@ 864,13 878,17 @@ func (dc *downstreamConn) sendNetworkHistory(net *network) {
			continue
		}

		seq, ok := history.clients[dc.clientName]
		lastDelivered, ok := history.clients[dc.clientName]
		if !ok {
			continue
		}
		history.clients[dc.clientName] = history.ring.Cur()

		consumer := history.ring.NewConsumer(seq)
		limit := 4000
		history, err := loadHistoryLatestID(dc.network, target, lastDelivered, limit)
		if err != nil {
			dc.logger.Printf("failed to send implicit history for %q: %v", target, err)
			continue
		}

		batchRef := "history"
		if dc.caps["batch"] {


@@ 881,12 899,7 @@ func (dc *downstreamConn) sendNetworkHistory(net *network) {
			})
		}

		for {
			msg := consumer.Consume()
			if msg == nil {
				break
			}

		for _, msg := range history {
			// Don't replay all messages, because that would mess up client
			// state. For instance we just sent the list of users, sending
			// PART messages for one of these users would be incorrect.


@@ 900,10 913,8 @@ func (dc *downstreamConn) sendNetworkHistory(net *network) {
			}

			if dc.caps["batch"] {
				msg = msg.Copy()
				msg.Tags["batch"] = irc.TagValue(batchRef)
			}

			dc.SendMessage(dc.marshalMessage(msg, net))
		}


D ring.go => ring.go +0 -86
@@ 1,86 0,0 @@
package soju

import (
	"fmt"

	"gopkg.in/irc.v3"
)

// Ring implements a single producer, multiple consumer ring buffer. The ring
// buffer size is fixed. The ring buffer is stored in memory.
type Ring struct {
	buffer []*irc.Message
	cap    uint64
	cur    uint64
}

// NewRing creates a new ring buffer.
func NewRing(capacity int) *Ring {
	return &Ring{
		buffer: make([]*irc.Message, capacity),
		cap:    uint64(capacity),
	}
}

// Produce appends a new message to the ring buffer.
func (r *Ring) Produce(msg *irc.Message) {
	i := int(r.cur % r.cap)
	r.buffer[i] = msg
	r.cur++
}

// Cur returns the current history sequence number.
func (r *Ring) Cur() uint64 {
	return r.cur
}

// NewConsumer creates a new ring buffer consumer.
//
// The consumer will get messages starting from the specified history sequence
// number (see Ring.Cur).
func (r *Ring) NewConsumer(seq uint64) *RingConsumer {
	return &RingConsumer{ring: r, cur: seq}
}

// RingConsumer is a ring buffer consumer.
type RingConsumer struct {
	ring *Ring
	cur  uint64
}

// diff returns the number of pending messages. It assumes the Ring is locked.
func (rc *RingConsumer) diff() uint64 {
	if rc.cur > rc.ring.cur {
		panic(fmt.Sprintf("soju: consumer cursor (%v) greater than producer cursor (%v)", rc.cur, rc.ring.cur))
	}
	return rc.ring.cur - rc.cur
}

// Peek returns the next pending message if any without consuming it. A nil
// message is returned if no message is available.
func (rc *RingConsumer) Peek() *irc.Message {
	diff := rc.diff()
	if diff == 0 {
		return nil
	}
	if diff > rc.ring.cap {
		// Consumer drops diff - cap entries
		rc.cur = rc.ring.cur - rc.ring.cap
	}
	i := int(rc.cur % rc.ring.cap)
	msg := rc.ring.buffer[i]
	if msg == nil {
		panic(fmt.Sprintf("soju: unexpected nil ring buffer entry at index %v", i))
	}
	return msg
}

// Consume consumes and returns the next pending message. A nil message is
// returned if no message is available.
func (rc *RingConsumer) Consume() *irc.Message {
	msg := rc.Peek()
	if msg != nil {
		rc.cur++
	}
	return msg
}

M server.go => server.go +0 -2
@@ 47,7 47,6 @@ func (l *prefixLogger) Printf(format string, v ...interface{}) {
type Server struct {
	Hostname       string
	Logger         Logger
	RingCap        int
	HistoryLimit   int
	LogPath        string
	Debug          bool


@@ 64,7 63,6 @@ type Server struct {
func NewServer(db *DB) *Server {
	return &Server{
		Logger:       log.New(log.Writer(), "", log.LstdFlags),
		RingCap:      4096,
		HistoryLimit: 1000,
		users:        make(map[string]*user),
		db:           db,

M upstream.go => upstream.go +17 -20
@@ 708,7 708,6 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
				delete(ch.Members, msg.Prefix.Name)
				ch.Members[newNick] = memberships
				uc.appendLog(ch.Name, msg)
				uc.appendHistory(ch.Name, msg)
			}
		}



@@ 818,7 817,6 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
				delete(ch.Members, msg.Prefix.Name)

				uc.appendLog(ch.Name, msg)
				uc.appendHistory(ch.Name, msg)
			}
		}



@@ 1611,7 1609,6 @@ func (uc *upstreamConn) SendMessageLabeled(downstreamID uint64, msg *irc.Message
	uc.SendMessage(msg)
}

// TODO: handle moving logs when a network name changes, when support for this is added
func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
	if uc.srv.LogPath == "" {
		return


@@ 1623,13 1620,6 @@ func (uc *upstreamConn) appendLog(entity string, msg *irc.Message) {
		uc.messageLoggers[entity] = ml
	}

	if _, err := ml.Append(msg); err != nil {
		uc.logger.Printf("failed to log message: %v", err)
	}
}

// appendHistory appends a message to the history. entity can be empty.
func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) {
	detached := false
	if ch, ok := uc.network.channels[entity]; ok {
		detached = ch.Detached


@@ 1637,36 1627,45 @@ func (uc *upstreamConn) appendHistory(entity string, msg *irc.Message) {

	history, ok := uc.network.history[entity]
	if !ok {
		lastID, err := lastMsgID(uc.network, entity, time.Now())
		if err != nil {
			uc.logger.Printf("failed to log message: failed to get last message ID: %v", err)
			return
		}

		history = &networkHistory{
			clients: make(map[string]uint64),
			ring:    NewRing(uc.srv.RingCap),
			clients: make(map[string]string),
		}
		uc.network.history[entity] = history

		for clientName, _ := range uc.network.offlineClients {
			history.clients[clientName] = 0
			history.clients[clientName] = lastID
		}

		if detached {
			// If the channel is detached, online clients act as offline
			// clients too
			uc.forEachDownstream(func(dc *downstreamConn) {
				history.clients[dc.clientName] = 0
				history.clients[dc.clientName] = lastID
			})
		}
	}

	history.ring.Produce(msg)
	msgID, err := ml.Append(msg)
	if err != nil {
		uc.logger.Printf("failed to log message: %v", err)
		return
	}

	if !detached {
		uc.forEachDownstream(func(dc *downstreamConn) {
			history.clients[dc.clientName] = history.ring.Cur()
			history.clients[dc.clientName] = msgID
		})
	}
}

// produce appends a message to the logs, adds it to the history and forwards
// it to connected downstream connections.
// produce appends a message to the logs and forwards it to connected downstream
// connections.
//
// If origin is not nil and origin doesn't support echo-message, the message is
// forwarded to all connections except origin.


@@ 1675,8 1674,6 @@ func (uc *upstreamConn) produce(target string, msg *irc.Message, origin *downstr
		uc.appendLog(target, msg)
	}

	uc.appendHistory(target, msg)

	// Don't forward messages if it's a detached channel
	if ch, ok := uc.network.channels[target]; ok && ch.Detached {
		return

M user.go => user.go +1 -2
@@ 51,8 51,7 @@ type eventDownstreamDisconnected struct {
type eventStop struct{}

type networkHistory struct {
	clients map[string]uint64 // indexed by client name
	ring    *Ring             // can be nil if there are no offline clients
	clients map[string]string // indexed by client name
}

type network struct {