~whereswaldon/sprout-go

a4188845a9a8c297521f84be40656c3b5cc464fa — Chris Waldon 2 months ago c8287a8 master
feat: remove redundant SubscriberStore

Forest now provides the Archive type and the ExtendedStore type
directly, which eliminates the need for this extra store
interface and implementation.
4 files changed, 16 insertions(+), 259 deletions(-)

M cmd/relay/main.go
D subscriber_store.go
M supervisor.go
M worker.go
M cmd/relay/main.go => cmd/relay/main.go +2 -1
@@ 12,6 12,7 @@ import (

	"git.sr.ht/~whereswaldon/forest-go"
	"git.sr.ht/~whereswaldon/forest-go/grove"
	"git.sr.ht/~whereswaldon/forest-go/store"
	sprout "git.sr.ht/~whereswaldon/sprout-go"
	"git.sr.ht/~whereswaldon/sprout-go/watch"
)


@@ 71,7 72,7 @@ and will establish Sprout connections to all addresses provided as arguments.
	if err != nil {
		log.Fatalf("Failed to create grove at %s: %v", *grovePath, err)
	}
	messages := sprout.NewSubscriberStore(grove)
	messages := store.NewArchive(grove)
	defer messages.Destroy()

	// track node ids of nodes that we've recently inserted into the grove so that

D subscriber_store.go => subscriber_store.go +0 -239
@@ 1,239 0,0 @@
package sprout

import (
	"git.sr.ht/~whereswaldon/forest-go"
	"git.sr.ht/~whereswaldon/forest-go/fields"
)

// Subscription is an identifier for a particular handler function within
// a SubscriberStore. It can be provided to delete a handler function or to
// suppress notifications to the corresponding handler.
type Subscription uint

// the zero subscription is never used
const neverAssigned = 0
const firstSubscription = 1

// SubscriberStore is a wrapper type that extends the forest.Store interface
// with the observer pattern. Code can subscribe for updates each time a
// node is inserted into the store using Add or AddAs.
type SubscriberStore struct {
	store                                 forest.Store
	requests                              chan func()
	nextSubscriberKey                     Subscription
	postAddSubscribers, preAddSubscribers map[Subscription]func(forest.Node)
}

var _ forest.Store = &SubscriberStore{}

// NewMessageStore creates a thread-safe storage structure for
// forest nodes by wrapping an existing store implementation
func NewSubscriberStore(store forest.Store) *SubscriberStore {
	m := &SubscriberStore{
		store:              store,
		requests:           make(chan func()),
		nextSubscriberKey:  firstSubscription,
		postAddSubscribers: make(map[Subscription]func(forest.Node)),
		preAddSubscribers:  make(map[Subscription]func(forest.Node)),
	}
	go func() {
		for function := range m.requests {
			function()
		}
	}()
	return m
}

// SubscribeToNewMessages establishes the given function as a handler to be
// invoked on each node added to the store. The returned subscription ID
// can be used to unsubscribe later, as well as to supress notifications
// with AddAs().
//
// Handler functions are invoked synchronously on the same goroutine that invokes
// Add() or AddAs(), and should not block. If long-running code is needed in a
// handler, launch a new goroutine.
func (m *SubscriberStore) SubscribeToNewMessages(handler func(n forest.Node)) (subscriptionID Subscription) {
	return m.subscribeInMap(m.postAddSubscribers, handler)
}

// PresubscribeToNewMessages establishes the given function as a handler to be
// invoked on each node added to the store. The returned subscription ID
// can be used to unsubscribe later, as well as to supress notifications
// with AddAs(). The handler function will be invoked *before* nodes are
// inserted into the store instead of after (like a normal Subscribe).
//
// Handler functions are invoked synchronously on the same goroutine that invokes
// Add() or AddAs(), and should not block. If long-running code is needed in a
// handler, launch a new goroutine.
func (m *SubscriberStore) PresubscribeToNewMessages(handler func(n forest.Node)) (subscriptionID Subscription) {
	return m.subscribeInMap(m.preAddSubscribers, handler)
}

func (m *SubscriberStore) subscribeInMap(targetMap map[Subscription]func(forest.Node), handler func(n forest.Node)) (subscriptionID Subscription) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
		subscriptionID = m.nextSubscriberKey
		m.nextSubscriberKey++
		// handler unsigned overflow
		// TODO: ensure subscription reuse can't occur
		if m.nextSubscriberKey == neverAssigned {
			m.nextSubscriberKey = firstSubscription
		}
		targetMap[subscriptionID] = handler
	}
	<-done
	return
}

// UnsubscribeToNewMessages removes the handler for a given subscription from
// the store.
func (m *SubscriberStore) UnsubscribeToNewMessages(subscriptionID Subscription) {
	m.unsubscribeInMap(m.postAddSubscribers, subscriptionID)
}

// UnpresubscribeToNewMessages removes the handler for a given subscription from
// the store.
func (m *SubscriberStore) UnpresubscribeToNewMessages(subscriptionID Subscription) {
	m.unsubscribeInMap(m.preAddSubscribers, subscriptionID)
}

func (m *SubscriberStore) unsubscribeInMap(targetMap map[Subscription]func(forest.Node), subscriptionID Subscription) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
		if _, subscribed := targetMap[subscriptionID]; subscribed {
			delete(targetMap, subscriptionID)
		}
	}
	<-done
	return
}

func (m *SubscriberStore) CopyInto(s forest.Store) (err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
		err = m.store.CopyInto(s)
	}
	<-done
	return
}

func (m *SubscriberStore) Get(id *fields.QualifiedHash) (node forest.Node, present bool, err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
		node, present, err = m.store.Get(id)
	}
	<-done
	return
}

func (m *SubscriberStore) GetIdentity(id *fields.QualifiedHash) (node forest.Node, present bool, err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
		node, present, err = m.store.GetIdentity(id)
	}
	<-done
	return
}

func (m *SubscriberStore) GetCommunity(id *fields.QualifiedHash) (node forest.Node, present bool, err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
		node, present, err = m.store.GetCommunity(id)
	}
	<-done
	return
}

func (m *SubscriberStore) GetConversation(communityID, conversationID *fields.QualifiedHash) (node forest.Node, present bool, err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
		node, present, err = m.store.GetConversation(communityID, conversationID)
	}
	<-done
	return
}

func (m *SubscriberStore) GetReply(communityID, conversationID, replyID *fields.QualifiedHash) (node forest.Node, present bool, err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
		node, present, err = m.store.GetReply(communityID, conversationID, replyID)
	}
	<-done
	return
}

func (m *SubscriberStore) Children(id *fields.QualifiedHash) (ids []*fields.QualifiedHash, err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
		ids, err = m.store.Children(id)
	}
	<-done
	return
}

func (m *SubscriberStore) Recent(nodeType fields.NodeType, quantity int) (nodes []forest.Node, err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
		nodes, err = m.store.Recent(nodeType, quantity)
	}
	<-done
	return
}

// Add inserts a node into the underlying store. Importantly, this will send a notification
// of a new node to *all* subscribers. If the calling code is a subscriber, it will still
// be notified of the new node. To supress this, use AddAs() instead.
func (m *SubscriberStore) Add(node forest.Node) (err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
		m.notifySubscribed(m.preAddSubscribers, node, neverAssigned)
		if err = m.store.Add(node); err == nil {
			m.notifySubscribed(m.postAddSubscribers, node, neverAssigned)
		}
	}
	<-done
	return
}

// AddAs allows adding a node to the underlying store without being notified
// of it as a new node. The addedByID (subscription id returned from SubscribeToNewMessages)
// will not be notified of the new nodes, but all other subscribers will be.
func (m *SubscriberStore) AddAs(node forest.Node, addedByID Subscription) (err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
		m.notifySubscribed(m.preAddSubscribers, node, addedByID)
		if err = m.store.Add(node); err == nil {
			m.notifySubscribed(m.postAddSubscribers, node, addedByID)
		}
	}
	<-done
	return
}

// notifySubscribed runs all of the subscription handlers in new goroutines with
// the provided node as input to each handler.
func (m *SubscriberStore) notifySubscribed(targetMap map[Subscription]func(forest.Node), node forest.Node, ignore Subscription) {
	for subscriptionID, handler := range targetMap {
		if subscriptionID != ignore {
			handler(node)
		}
	}
}

// Shut down the worker gorountine that powers this store. Subsequent
// calls to methods on this MessageStore have undefined behavior
func (m *SubscriberStore) Destroy() {
	close(m.requests)
}

M supervisor.go => supervisor.go +4 -2
@@ 5,6 5,8 @@ import (
	"fmt"
	"log"
	"time"

	"git.sr.ht/~whereswaldon/forest-go/store"
)

// LaunchSupervisedWorker launches a worker in a new goroutine that will


@@ 13,7 15,7 @@ import (
// `logger`.
//
// BUG(whereswaldon): this interface is experimental and likely to change.
func LaunchSupervisedWorker(done <-chan struct{}, addr string, store SubscribableStore, tlsConfig *tls.Config, logger *log.Logger) {
func LaunchSupervisedWorker(done <-chan struct{}, addr string, s store.ExtendedStore, tlsConfig *tls.Config, logger *log.Logger) {
	go func() {
		firstAttempt := true
		for {


@@ 27,7 29,7 @@ func LaunchSupervisedWorker(done <-chan struct{}, addr string, store Subscribabl
				logger.Printf("Failed to connect to %s: %v", addr, err)
				continue
			}
			worker, err := NewWorker(done, conn, store)
			worker, err := NewWorker(done, conn, s)
			if err != nil {
				logger.Printf("Failed launching worker to connect to address %s: %v", addr, err)
				continue

M worker.go => worker.go +10 -17
@@ 13,27 13,20 @@ import (
	"git.sr.ht/~whereswaldon/forest-go/store"
)

type SubscribableStore interface {
	forest.Store
	SubscribeToNewMessages(handler func(n forest.Node)) Subscription
	UnsubscribeToNewMessages(Subscription)
	AddAs(forest.Node, Subscription) (err error)
}

type Worker struct {
	Done           <-chan struct{}
	DefaultTimeout time.Duration
	*Conn
	*log.Logger
	*Session
	SubscribableStore
	subscriptionID Subscription
	SubscribableStore store.ExtendedStore
	subscriptionID    store.Subscription
}

func NewWorker(done <-chan struct{}, conn net.Conn, store SubscribableStore) (*Worker, error) {
func NewWorker(done <-chan struct{}, conn net.Conn, s store.ExtendedStore) (*Worker, error) {
	w := &Worker{
		Done:              done,
		SubscribableStore: store,
		SubscribableStore: s,
		Logger:            log.New(log.Writer(), "", log.LstdFlags|log.Lshortfile),
		DefaultTimeout:    time.Minute,
	}


@@ 280,7 273,7 @@ func (c *Worker) IngestNode(node forest.Node) error {
			if err := ancestor.ValidateDeep(c.SubscribableStore); err != nil {
				return fmt.Errorf("validation failed for ancestor %s: %w", ancestor.ID(), err)
			}
			if err := c.AddAs(ancestor, c.subscriptionID); err != nil {
			if err := c.SubscribableStore.AddAs(ancestor, c.subscriptionID); err != nil {
				return fmt.Errorf("failed inserting ancestory %s into store: %w", ancestor.ID(), err)
			}
		}


@@ 362,7 355,7 @@ func (c *Worker) BootstrapLocalStore(maxCommunities int) {
			c.Printf("Couldn't fetch author information for node %s: %v", community.ID().String(), err)
			continue
		}
		if err := c.AddAs(community, c.subscriptionID); err != nil {
		if err := c.SubscribableStore.AddAs(community, c.subscriptionID); err != nil {
			c.Printf("Couldn't add community %s to store: %v", community.ID().String(), err)
			continue
		}


@@ 404,7 397,7 @@ func (c *Worker) synchronizeFullTree(root forest.Node, maxNodes int, perRequestT
	}
	c.Printf("Announced local leaves of %s to peer", root.ID())
	for _, leaf := range leafList.Nodes {
		if _, alreadyInStore, err := c.Get(leaf.ID()); err != nil {
		if _, alreadyInStore, err := c.SubscribableStore.Get(leaf.ID()); err != nil {
			return fmt.Errorf("failed checking if we already have leaf node %s: %w", leaf.ID().String(), err)
		} else if alreadyInStore {
			continue


@@ 424,7 417,7 @@ func (c *Worker) synchronizeFullTree(root forest.Node, maxNodes int, perRequestT
			if err := ancestor.ValidateDeep(c.SubscribableStore); err != nil {
				return fmt.Errorf("couldn't validate node %s: %w", ancestor.ID().String(), err)
			}
			if err := c.AddAs(ancestor, c.subscriptionID); err != nil {
			if err := c.SubscribableStore.AddAs(ancestor, c.subscriptionID); err != nil {
				return fmt.Errorf("couldn't add node %s to store: %w", ancestor.ID().String(), err)
			}
		}


@@ 450,7 443,7 @@ func (c *Worker) ensureAuthorAvailable(node forest.Node, perRequestTimeout time.
	default:
		return fmt.Errorf("unsupported type in ensureAuthorAvailable: %T", n)
	}
	_, inStore, err := c.GetIdentity(authorID)
	_, inStore, err := c.SubscribableStore.GetIdentity(authorID)
	if err != nil {
		return fmt.Errorf("failed looking for author id %s in store: %w", authorID.String(), err)
	}


@@ 468,7 461,7 @@ func (c *Worker) ensureAuthorAvailable(node forest.Node, perRequestTimeout time.
	if err := author.ValidateDeep(c.SubscribableStore); err != nil {
		return fmt.Errorf("unable to validate author %s: %w", author.ID().String(), err)
	}
	if err := c.AddAs(author, c.subscriptionID); err != nil {
	if err := c.SubscribableStore.AddAs(author, c.subscriptionID); err != nil {
		return fmt.Errorf("failed inserting new valid author %s into store: %w", author.ID().String(), err)
	}
	return nil