~whereswaldon/forest-go

4c1deda29dc2235bc870896dd1496e821d964ddd — Chris Waldon 1 year, 6 months ago 4089ee7
feat: simplify Archive implementation
1 files changed, 34 insertions(+), 58 deletions(-)

M store/archive.go
M store/archive.go => store/archive.go +34 -58
@@ 16,9 16,8 @@ type Subscription uint
const neverAssigned = 0
const firstSubscription = 1

// Archive is a wrapper type that extends the forest.Store interface
// with the observer pattern. Code can subscribe for updates each time a
// node is inserted into the store using Add or AddAs.
// Archive is a wrapper type that extends the store.ExtendedStore interface
// on top of an existing forest.Store. It is safe for concurrent use.
type Archive struct {
	store                                 forest.Store
	requests                              chan func()


@@ 100,95 99,78 @@ func (m *Archive) UnpresubscribeToNewMessages(subscriptionID Subscription) {
	m.unsubscribeInMap(m.preAddSubscribers, subscriptionID)
}

func (m *Archive) unsubscribeInMap(targetMap map[Subscription]func(forest.Node), subscriptionID Subscription) {
// executeAsync runs the provided closure in a thread-safe way and blocks until it
// completes
func (m *Archive) executeAsync(f func()) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
		f()
	}
	<-done
}

func (m *Archive) unsubscribeInMap(targetMap map[Subscription]func(forest.Node), subscriptionID Subscription) {
	m.executeAsync(func() {
		if _, subscribed := targetMap[subscriptionID]; subscribed {
			delete(targetMap, subscriptionID)
		}
	}
	<-done
	return
	})
}

func (m *Archive) CopyInto(s forest.Store) (err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
	m.executeAsync(func() {
		err = m.store.CopyInto(s)
	}
	<-done
	})
	return
}

func (m *Archive) Get(id *fields.QualifiedHash) (node forest.Node, present bool, err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
	m.executeAsync(func() {
		node, present, err = m.store.Get(id)
	}
	<-done
	})
	return
}

func (m *Archive) GetIdentity(id *fields.QualifiedHash) (node forest.Node, present bool, err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
	m.executeAsync(func() {
		node, present, err = m.store.GetIdentity(id)
	}
	<-done
	})
	return
}

func (m *Archive) GetCommunity(id *fields.QualifiedHash) (node forest.Node, present bool, err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
	m.executeAsync(func() {
		node, present, err = m.store.GetCommunity(id)
	}
	<-done
	})
	return
}

func (m *Archive) GetConversation(communityID, conversationID *fields.QualifiedHash) (node forest.Node, present bool, err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
	m.executeAsync(func() {
		node, present, err = m.store.GetConversation(communityID, conversationID)
	}
	<-done
	})
	return
}

func (m *Archive) GetReply(communityID, conversationID, replyID *fields.QualifiedHash) (node forest.Node, present bool, err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
	m.executeAsync(func() {
		node, present, err = m.store.GetReply(communityID, conversationID, replyID)
	}
	<-done
	})
	return
}

func (m *Archive) Children(id *fields.QualifiedHash) (ids []*fields.QualifiedHash, err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
	m.executeAsync(func() {
		ids, err = m.store.Children(id)
	}
	<-done
	})
	return
}

func (m *Archive) Recent(nodeType fields.NodeType, quantity int) (nodes []forest.Node, err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
	m.executeAsync(func() {
		nodes, err = m.store.Recent(nodeType, quantity)
	}
	<-done
	})
	return
}



@@ 196,15 178,12 @@ func (m *Archive) Recent(nodeType fields.NodeType, quantity int) (nodes []forest
// of a new node to *all* subscribers. If the calling code is a subscriber, it will still
// be notified of the new node. To supress this, use AddAs() instead.
func (m *Archive) Add(node forest.Node) (err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
	m.executeAsync(func() {
		m.notifySubscribed(m.preAddSubscribers, node, neverAssigned)
		if err = m.store.Add(node); err == nil {
			m.notifySubscribed(m.postAddSubscribers, node, neverAssigned)
		}
	}
	<-done
	})
	return
}



@@ 212,19 191,16 @@ func (m *Archive) Add(node forest.Node) (err error) {
// of it as a new node. The addedByID (subscription id returned from SubscribeToNewMessages)
// will not be notified of the new nodes, but all other subscribers will be.
func (m *Archive) AddAs(node forest.Node, addedByID Subscription) (err error) {
	done := make(chan struct{})
	m.requests <- func() {
		defer close(done)
	m.executeAsync(func() {
		m.notifySubscribed(m.preAddSubscribers, node, addedByID)
		if err = m.store.Add(node); err == nil {
			m.notifySubscribed(m.postAddSubscribers, node, addedByID)
		}
	}
	<-done
	})
	return
}

// notifySubscribed runs all of the subscription handlers in new goroutines with
// notifySubscribed runs all of the subscription handlers with
// the provided node as input to each handler.
func (m *Archive) notifySubscribed(targetMap map[Subscription]func(forest.Node), node forest.Node, ignore Subscription) {
	for subscriptionID, handler := range targetMap {