~tsileo/blobstash

d6248c401ca3a68ef3d13090a6732d6c36e34956 — Thomas Sileo 3 months ago 3904ab6
stash: rewrite the filetree GC in Go (instead of Lua)
4 files changed, 175 insertions(+), 18 deletions(-)

M pkg/client/filetree/filetree.go
M pkg/stash/api/api.go
M pkg/stash/gc/gc.go
M pkg/stash/stash.go
M pkg/client/filetree/filetree.go => pkg/client/filetree/filetree.go +4 -17
@@ 64,24 64,11 @@ func (f *Filetree) MakeSnapshot(ref, fs, message, userAgent string) (int64, erro

// GC performs a garbage collection to save the latest filetreee snapshot
func (f *Filetree) GC(ns, name string, rev int64) error {
	gcScript := fmt.Sprintf(`
local kvstore = require('kvstore')

local key = "_filetree:fs:%s"
local version = "%d"
local _, ref, _ = kvstore.get(key, version)

-- mark the actual KV entry
mark_kv(key, version)

-- mark the whole tree
mark_filetree_node(ref)
`, name, rev)

	resp, err := f.client.PostMsgpack(
		fmt.Sprintf("/api/stash/%s/_gc", ns),
	resp, err := f.client.PostJSON(
		fmt.Sprintf("/api/stash/%s/_merge_filetree_version", ns),
		map[string]interface{}{
			"script": gcScript,
			"ref":     fmt.Sprintf("_filetree:fs:%s", name),
			"version": rev,
		},
	)
	if err != nil {

M pkg/stash/api/api.go => pkg/stash/api/api.go +35 -0
@@ 129,9 129,44 @@ func (s *StashAPI) dataContextGCHandler() func(http.ResponseWriter, *http.Reques
	}
}

type GC2Input struct {
	Ref     string `json:"ref" msgpack:"ref"`
	Version int64  `json:"version" msgpack:"version"`
}

func (s *StashAPI) dataContextGC2Handler() func(http.ResponseWriter, *http.Request) {
	return func(w http.ResponseWriter, r *http.Request) {
		ctx := r.Context()
		name := mux.Vars(r)["name"]
		ctx = ctxutil.WithNamespace(ctx, name)

		_, ok := s.stash.DataContextByName(name)
		switch r.Method {
		case "POST":
			defer r.Body.Close()
			if !ok {
				w.WriteHeader(http.StatusNotFound)
				return
			}
			out := &GC2Input{}
			if err := httputil.Unmarshal(r, out); err != nil {
				panic(err)
			}
			fmt.Printf("\n\nGC imput: %+v\n\n", out)
			if err := s.stash.MergeFileTreeVersionAndDestroy(ctx, name, out.Ref, out.Version); err != nil {
				panic(err)
			}
			w.WriteHeader(http.StatusNoContent)
		default:
			w.WriteHeader(http.StatusMethodNotAllowed)
		}
	}
}

func (s *StashAPI) Register(r *mux.Router, basicAuth func(http.Handler) http.Handler) {
	r.Handle("/", basicAuth(http.HandlerFunc(s.listHandler())))
	r.Handle("/{name}", basicAuth(http.HandlerFunc(s.dataContextHandler())))
	r.Handle("/{name}/_merge", basicAuth(http.HandlerFunc(s.dataContextMergeHandler())))
	r.Handle("/{name}/_gc", basicAuth(http.HandlerFunc(s.dataContextGCHandler())))
	r.Handle("/{name}/_merge_filetree_version", basicAuth(http.HandlerFunc(s.dataContextGC2Handler())))
}

M pkg/stash/gc/gc.go => pkg/stash/gc/gc.go +2 -1
@@ 5,7 5,7 @@ import (
	"fmt"

	"github.com/vmihailenco/msgpack"
	"github.com/yuin/gopher-lua"
	lua "github.com/yuin/gopher-lua"

	"a4.io/blobsfile"
	"a4.io/blobstash/pkg/apps/luautil"


@@ 27,6 27,7 @@ func GC(ctx context.Context, h *hub.Hub, s *stash.Stash, dc store.DataContext, s
	orderedRefs := []string{}

	L := lua.NewState()
	defer L.Close()
	var skipped int

	// premark(<blob hash>) notify the GC that this blob is already in the root blobstore explicitely (to speedup huge GC)

M pkg/stash/stash.go => pkg/stash/stash.go +134 -0
@@ 10,9 10,11 @@ import (

	log "github.com/inconshreveable/log15"

	"a4.io/blobsfile"
	"a4.io/blobstash/pkg/blob"
	"a4.io/blobstash/pkg/blobstore"
	"a4.io/blobstash/pkg/ctxutil"
	"a4.io/blobstash/pkg/filetree/filetreeutil/node"
	"a4.io/blobstash/pkg/hub"
	"a4.io/blobstash/pkg/kvstore"
	"a4.io/blobstash/pkg/meta"


@@ 81,6 83,93 @@ func (dc *dataContext) Merge(ctx context.Context) error {
	return nil
}

// orderedRefs holds the "sorted" references
type orderedRefs struct {
	refs []string
	idx  map[string]struct{}
}

func newOrderedRefs() *orderedRefs {
	return &orderedRefs{
		refs: []string{},
		idx:  map[string]struct{}{},
	}
}

func (r *orderedRefs) Add(ref string) {
	if _, ok := r.idx[ref]; !ok {
		r.idx[ref] = struct{}{}
		r.refs = append(r.refs, ref)
	}
}

func (dc *dataContext) MergeFileTreeNode(ctx context.Context, refs *orderedRefs, bs store.BlobStore, ref string) error {
	data, err := dc.bsProxy.Get(ctx, ref)
	if err != nil {
		return err
	}

	n, err := node.NewNodeFromBlob(ref, data)
	if err != nil {
		return err
	}

	if n.Type == "file" {
		for _, dref := range n.Refs {
			// Save each blob content
			data := dref.([]interface{})
			bref := data[1].(string)
			refs.Add(bref)
		}
	} else {
		// Iter the dir
		for _, cref := range n.Refs {
			// Merge the children recursively
			if err := dc.MergeFileTreeNode(ctx, refs, bs, cref.(string)); err != nil {
				return err
			}
		}
	}

	// Only save the node ref once all it's children has been saved
	refs.Add(ref)

	return nil
}

func (dc *dataContext) MergeFileTreeVersion(ctx context.Context, key string, version int64) (*orderedRefs, error) {
	if dc.root {
		return nil, fmt.Errorf("cannot merge filtree version in root data context")
	}

	refs := newOrderedRefs()

	// Fetch the root BlobStore (as fetching the original struct behind the interface is costly)
	rootBs := dc.bsProxy.(*store.BlobStoreProxy).ReadSrc

	// Fetch the blob that contains the KV entry for the FileTree version
	kvBlobRef, err := dc.kvs.GetMetaBlob(ctx, key, version)
	if err != nil {
		return nil, err
	}

	// Now, traverse the tree, starting at the root
	kv, err := dc.kvs.Get(ctx, key, version)
	if err != nil {
		return nil, err
	}

	// Merge the root node recursively
	ftRoot := kv.HexHash()
	if err := dc.MergeFileTreeNode(ctx, refs, rootBs, ftRoot); err != nil {
		return nil, err
	}

	refs.Add(kvBlobRef)

	return refs, nil
}

func (dc *dataContext) Close() error {
	if dc.closed || dc.root {
		return nil


@@ 246,6 335,51 @@ func (s *Stash) DoAndDestroy(ctx context.Context, name string, do func(context.C
	return nil
}

func (s *Stash) MergeFileTreeVersionAndDestroy(ctx context.Context, name string, key string, version int64) error {
	s.Lock()
	defer s.Unlock()
	dc, ok := s.contexes[name]
	if !ok {
		return fmt.Errorf("data context not found")
	}

	refs, err := dc.MergeFileTreeVersion(ctx, key, version)
	if err != nil {
		return err
	}

	var blobsCnt int
	var totalSize uint64
	for _, ref := range refs.refs {
		// Get the marked blob from the blobstore proxy
		data, err := dc.StashBlobStore().Get(ctx, ref)
		if err != nil {
			if err == blobsfile.ErrBlobNotFound {
				continue
			}
			return err
		}

		// Save it in the root blobstore
		saved, err := s.Root().BlobStore().Put(ctx, &blob.Blob{Hash: ref, Data: data})
		if err != nil {
			return err
		}

		if saved {
			blobsCnt++
			totalSize += uint64(len(data))
		}
	}
	fmt.Printf("GC/merge filetree refs=%d blobs, saved %d blobs\n", len(refs.refs), blobsCnt)

	if err := s.destroy(dc, name); err != nil {
		return err
	}

	return nil
}

func (s *Stash) MergeAndDestroy(ctx context.Context, name string) error {
	s.Lock()
	defer s.Unlock()