~whereswaldon/forest-go

cd38454fdf9fbb4c51ccaadbaf43266f3044be1d — Chris Waldon 6 months ago 9cb49ff
feat: gracefully handle corrupt nodes in groves

The grove type now allows applications to provide a hook that will
be invoked when a corrupt node is discovered. Additionally, the grove
type now attempts to recover from corrupt nodes by deleting them
automatically.
3 files changed, 95 insertions(+), 14 deletions(-)

M grove/grove.go
M store.go
M store/archive.go
M grove/grove.go => grove/grove.go +90 -14
@@ 89,6 89,8 @@ type Grove struct {
	FS
	NodeCache *store.MemoryStore
	*ChildCache

	corruptNodeHandler func(id string)
}

// New constructs a Grove that stores nodes in a hierarchy rooted at


@@ 110,6 112,21 @@ func NewWithFS(fs FS) (*Grove, error) {
	}, nil
}

// SetCorruptNodeHandler establishes a handler function that will be invoked with
// the string ID of a node that was detected to be corrupt on disk. The provided
// function should be as simple and fast as possible in order to ensure good
// performance for the grove.
func (g *Grove) SetCorruptNodeHandler(handler func(string)) {
	g.corruptNodeHandler = handler
}

func (g *Grove) handleCorrupt(id string) {
	handler := g.corruptNodeHandler
	if handler != nil {
		handler(id)
	}
}

// Get searches the grove for a node with the given id. It returns the node if it was
// found, a boolean indicating whether it was found, and an error (if there was a
// problem searching for the node).


@@ 171,7 188,17 @@ func (g *Grove) getAllNodeFileInfo() ([]os.FileInfo, error) {

// nodeFromInfo converts the info about a file into a node extracted from
// the contents of that file (it opens, reads, and parses the file).
func (g *Grove) nodeFromInfo(info os.FileInfo) (forest.Node, error) {
// If it fails, the error will be of type UnmarshalError wrapping the
// underlying error.
func (g *Grove) nodeFromInfo(info os.FileInfo) (node forest.Node, err error) {
	defer func() {
		if err != nil {
			err = UnmarshalError{
				Reason: err,
				Node:   info.Name(),
			}
		}
	}()
	nodeIDString := info.Name()
	nodeID := &fields.QualifiedHash{}
	if err := nodeID.UnmarshalText([]byte(nodeIDString)); err != nil {


@@ 189,7 216,7 @@ func (g *Grove) nodeFromInfo(info os.FileInfo) (forest.Node, error) {
	if err != nil {
		return nil, fmt.Errorf("failed reading node file %s: %w", info.Name(), err)
	}
	node, err := forest.UnmarshalBinaryNode(nodeData)
	node, err = forest.UnmarshalBinaryNode(nodeData)
	if err != nil {
		return nil, fmt.Errorf("failed parsing node file %s: %w", info.Name(), err)
	}


@@ 197,31 224,72 @@ func (g *Grove) nodeFromInfo(info os.FileInfo) (forest.Node, error) {
	return node, nil
}

// ErrorGroup wraps multiple errors into a single return value.
type ErrorGroup []error

func (e ErrorGroup) Error() string {
	return fmt.Sprintf("%v", []error(e))
}

type UnmarshalError struct {
	Reason error
	Node   string
}

func (e UnmarshalError) Error() string {
	return fmt.Sprintf("%s failed to unmarshal: %v", e.Node, e.Reason)
}

func (e UnmarshalError) Unwrap() error {
	return e.Reason
}

// nodesFromInfo batch-converts a slice of file info into a slice of
// forest nodes by calling nodeFromInfo on each.
// forest nodes by calling nodeFromInfo on each. Errors unmarshalling
// nodes will be returned in an UnmarshalErrorGroup (even if there is
// only one failure).
//
// NOTE: unlike many functions, if this function returns an error, the
// other return value will still hold valid data if any nodes were able
// to be read.
func (g *Grove) nodesFromInfo(info []os.FileInfo) ([]forest.Node, error) {
	nodes := make([]forest.Node, 0, len(info))
	var errorGroup []error
	for _, nodeFileInfo := range info {
		node, err := g.nodeFromInfo(nodeFileInfo)
		if err != nil {
			return nil, fmt.Errorf("failed transforming fileInfo into Node: %w", err)
			if errors.Is(err, io.EOF) {
				// automatically recover from an empty or incomplete node file
				// by removing it.
				err = g.Remove(nodeFileInfo.Name())
				g.handleCorrupt(nodeFileInfo.Name())
			}

			// if deleting the corrupt node failed or the error wasn't io.EOF
			if err != nil {
				errorGroup = append(errorGroup, err)
			}
			continue
		}
		nodes = append(nodes, node)
	}
	if len(errorGroup) > 0 {
		return nodes, ErrorGroup(errorGroup)
	}
	return nodes, nil
}

// allNodes returns a slice of every node in the grove.
//
// NOTE: unlike many functions, if this function returns an error, the
// other return value will still hold valid data if any nodes were able
// to be read.
func (g *Grove) allNodes() ([]forest.Node, error) {
	nodeInfo, err := g.getAllNodeFileInfo()
	if err != nil {
		return nil, fmt.Errorf("failed listing node file candidates: %w", err)
	}
	nodes, err := g.nodesFromInfo(nodeInfo)
	if err != nil {
		return nil, fmt.Errorf("failed converting node files into nodes: %w", err)
	}
	return nodes, nil
	return g.nodesFromInfo(nodeInfo)
}

// Children returns the IDs of all known child nodes of the specified ID.


@@ 246,9 314,15 @@ func (g *Grove) Children(id *fields.QualifiedHash) ([]*fields.QualifiedHash, err

// Recent returns a slice of the most recently-created nodes of the given type.
// The slice is sorted so that the most-recently-created nodes are at the beginning.
//
// NOTE: this function may return both a valid slice of nodes and an error
// in the case that some nodes failed to be unmarshaled from disk, but others
// were successful. Calling code should always check whether the node list is
// empty before throwing it away. If there is an error, it will be of type
// ErrorGroup.
func (g *Grove) Recent(nodeType fields.NodeType, quantity int) ([]forest.Node, error) {
	nodes, err := g.allNodes()
	if err != nil {
	if err != nil && len(nodes) == 0 {
		return nil, fmt.Errorf("failed getting all nodes from grove: %w", err)
	}
	// TODO: find a cleaner way to sort nodes by time


@@ 292,21 366,23 @@ func (g *Grove) Recent(nodeType fields.NodeType, quantity int) ([]forest.Node, e
	if len(rightType) > quantity {
		rightType = rightType[:quantity]
	}
	return rightType, nil
	return rightType, err
}

// RebuildChildCache must be called each time a node is inserted into the
// underlying storage without actually calling Add() on the grove. Without
// this, calls to Children() will not always include new results.
// this, calls to Children() will not always include new results. This method
// may return an error if not all nodes in the grove could be read from
// disk.
func (g *Grove) RebuildChildCache() error {
	nodes, err := g.allNodes()
	if err != nil {
		return fmt.Errorf("failed getting all nodes from grove: %w", err)
		err = fmt.Errorf("failed getting all nodes from grove: %w", err)
	}
	for _, node := range nodes {
		g.CacheChildInfo(node)
	}
	return nil
	return err
}

// CacheChildInfo updates the child cache information for the given node.

M store.go => store.go +3 -0
@@ 12,6 12,9 @@ type Store interface {
	GetConversation(communityID, conversationID *fields.QualifiedHash) (Node, bool, error)
	GetReply(communityID, conversationID, replyID *fields.QualifiedHash) (Node, bool, error)
	Children(*fields.QualifiedHash) ([]*fields.QualifiedHash, error)
	// Recent returns recently-created (as per the timestamp in the node) nodes.
	// It may return both a slice of nodes and an error if some nodes in the
	// store were unreadable.
	Recent(nodeType fields.NodeType, quantity int) ([]Node, error)
	// Add inserts a node into the store. It is *not* an error to insert a node which is already
	// stored. Implementations must not return an error in this case.

M store/archive.go => store/archive.go +2 -0
@@ 167,6 167,8 @@ func (m *Archive) Children(id *fields.QualifiedHash) (ids []*fields.QualifiedHas
	return
}

// Recent returns recently-created nodes of the provided NodeType. It
// may return both some nodes and an error
func (m *Archive) Recent(nodeType fields.NodeType, quantity int) (nodes []forest.Node, err error) {
	m.executeAsync(func() {
		nodes, err = m.store.Recent(nodeType, quantity)