~tsileo/blobstash

b388a9f3f33fab84db4a16d3b73b4d1a9985e4b7 — Thomas Sileo 1 year, 17 days ago 15600af
filetree/fs: rename/cleanup the FUSE FS
9 files changed, 24 insertions(+), 103 deletions(-)

A cmd/blobstash-fs/blobstash-fs.go
R pkg/filetree/fs/ngfs/{asof.go => go}
R pkg/filetree/fs/ngfs/{cache.go => .go}
R pkg/filetree/fs/ngfs/{config.go => g.go}
R pkg/filetree/fs/ngfs/{debug.go => .go}
R pkg/filetree/fs/ngfs/{ngfs.go => }
R pkg/filetree/fs/ngfs/{node.go => go}
R pkg/filetree/fs/ngfs/{recent.go => t.go}
R pkg/filetree/fs/ngfs/{versions.go => ons.go}
A cmd/blobstash-fs/blobstash-fs.go => cmd/blobstash-fs/blobstash-fs.go +7 -0
@@ 0,0 1,7 @@
package main

import "a4.io/blobstash/pkg/filetree/fs"

func main() {
	fs.Main()
}

R pkg/filetree/fs/ngfs/asof.go => pkg/filetree/fs/asof.go +1 -1
@@ 1,4 1,4 @@
package main
package fs

import (
	"context"

R pkg/filetree/fs/ngfs/cache.go => pkg/filetree/fs/cache.go +4 -68
@@ 1,26 1,19 @@
package main
package fs

import (
	"bytes"
	"fmt"
	"sync"

	"a4.io/blobstash/pkg/backend/s3/s3util"
	"a4.io/blobstash/pkg/blob"
	bcache "a4.io/blobstash/pkg/cache"
	"a4.io/blobstash/pkg/client/blobstore"
	"a4.io/blobstash/pkg/hashutil"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/s3"
	"golang.org/x/net/context"
)

// cache implements the blobStore interface with a local disk-backed LRU cache
type cache struct {
	fs         *FS
	bs         *blobstore.BlobStore
	mu         sync.Mutex
	remoteRefs map[string]string
	fs *FS
	bs *blobstore.BlobStore
	mu sync.Mutex

	blobsCache *bcache.Cache
}


@@ 36,16 29,9 @@ func newCache(fs *FS, bs *blobstore.BlobStore, path string) (*cache, error) {
		fs:         fs,
		bs:         bs,
		blobsCache: blobsCache,
		remoteRefs: map[string]string{},
	}, nil
}

func (c *cache) RemoteRefs() map[string]string {
	c.mu.Lock()
	defer c.mu.Unlock()
	return c.remoteRefs
}

// Close implements the io.Closer interface
func (c *cache) Close() error {
	return c.blobsCache.Close()


@@ 56,11 42,6 @@ func (c *cache) Stat(ctx context.Context, hash string) (bool, error) {
	c.mu.Lock()
	defer c.mu.Unlock()

	// Check if the blob has already been uploaded to the remote storage
	if _, ok := c.remoteRefs[hash]; ok {
		return true, nil
	}

	stat, err := c.bs.Stat(context.TODO(), hash)
	if err != nil {
		return false, err


@@ 84,51 65,6 @@ func (c *cache) Put(ctx context.Context, hash string, data []byte) error {
	return nil
}

// Get implements the BlobStore interface for filereader.File
func (c *cache) PutRemote(ctx context.Context, hash string, data []byte) error {
	if !c.fs.useRemote {
		return c.Put(ctx, hash, data)
	}
	logger.Printf("[remote] uploading %s (%d bytes) to remote", hash, len(data))

	c.mu.Lock()
	defer c.mu.Unlock()
	var err error

	if _, ok := c.remoteRefs[hash]; ok {
		return nil
	}

	if err := c.blobsCache.Add(hash, data); err != nil {
		return err
	}

	// Encrypt
	data, err = s3util.Seal(c.fs.key, &blob.Blob{Hash: hash, Data: data})
	if err != nil {
		return err
	}
	// Re-compute the hash
	ehash := hashutil.Compute(data)

	// Prepare the upload request
	params := &s3.PutObjectInput{
		Bucket:   aws.String(c.fs.profile.RemoteConfig.Bucket),
		Key:      aws.String("tmp/" + ehash),
		Body:     bytes.NewReader(data),
		Metadata: map[string]*string{},
	}

	// Actually upload the blob
	if _, err := c.fs.s3.PutObject(params); err != nil {
		return err
	}

	c.remoteRefs[hash] = "tmp/" + ehash

	return nil
}

// Get implements the blobStore interface for filereader.File
func (c *cache) Get(ctx context.Context, hash string) ([]byte, error) {
	logger.Printf("Cache.Get(%q)\n", hash)

R pkg/filetree/fs/ngfs/config.go => pkg/filetree/fs/config.go +1 -1
@@ 1,4 1,4 @@
package main
package fs

import (
	"fmt"

R pkg/filetree/fs/ngfs/debug.go => pkg/filetree/fs/debug.go +1 -1
@@ 1,4 1,4 @@
package main
package fs

import (
	"bytes"

R pkg/filetree/fs/ngfs/ngfs.go => pkg/filetree/fs/fs.go +7 -29
@@ 1,4 1,4 @@
package main
package fs // import "a4.io/blobstash/pkg/filetree/fs"

import (
	"flag"


@@ 41,7 41,7 @@ var startedAt = time.Now()

func usage() {
	fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0])
	fmt.Fprintf(os.Stderr, "  %s MOUNTPOINT\n", os.Args[0])
	fmt.Fprintf(os.Stderr, "  %s MOUNTPOINT FSNAME\n", os.Args[0])
	flag.PrintDefaults()
}



@@ 61,13 61,11 @@ const (
	otherExecute
)

func main() {
func Main() {
	// Scans the arg list and sets up flags
	//debug := flag.Bool("debug", false, "print debugging messages.")
	resetCache := flag.Bool("reset-cache", false, "remove the local cache before starting.")
	syncDelay := flag.Duration("sync-delay", 5*time.Minute, "delay to wait after the last modification to initate a sync")
	forceRemote := flag.Bool("force-remote", false, "force fetching data blobs from object storage")
	disableRemote := flag.Bool("disable-remote", false, "disable fetching data blobs from object storage")
	configFile := flag.String("config-file", filepath.Join(pathutil.ConfigDir(), "fs_client.yaml"), "confg file path")
	configProfile := flag.String("config-profile", "default", "config profile name")



@@ 157,18 155,7 @@ func main() {
		fmt.Printf("invalid BLOBS_API_HOST")
		os.Exit(1)
	}

	var useRemote bool
	switch {
	case *disableRemote, !caps.ReplicationEnabled:
		if *forceRemote {
			logger.Printf("WARNING: disabling remote as server does not support it\n")
		}
	case *forceRemote:
		useRemote = true
	case isHostLocal:
		useRemote = isHostLocal
	}
	fmt.Printf("isHostLocal=%v\n", isHostLocal)

	c, err := fuse.Mount(
		mountpoint,


@@ 205,7 192,6 @@ func main() {
		freaderCache: freaderCache,
		atCache:      atCache,
		caps:         caps,
		useRemote:    useRemote,
	}
	blobfs.bs, err = newCache(blobfs, bs, cacheDir)
	if err != nil {


@@ 213,7 199,7 @@ func main() {
	}
	defer blobfs.bs.(*cache).Close()

	logger.Printf("caps=%+v use_remote=%v\n", caps, useRemote)
	logger.Printf("caps=%+v\n", caps)

	go func() {
		ticker := time.NewTicker(45 * time.Second)


@@ 279,8 265,7 @@ type FS struct {
	caps       *clientutil.Caps

	// config profile
	profile   *profile
	useRemote bool
	profile *profile

	// S3 client and key
	s3  *s3.S3


@@ 1321,14 1306,7 @@ func (f *file) Reader() (fileReader, error) {
	}

	// Instanciate the filereader
	var fr preloadableFileReader
	logger.Printf("use_remote=%v remote_refs=%+v\n", f.fs.useRemote, n.RemoteRefs)
	if f.fs.useRemote && n.RemoteRefs != nil {
		logger.Println("opening file with remote")
		fr = filereader.NewFileRemote(context.Background(), f.fs.bs, meta, n.RemoteRefs, f.fs.freaderCache)
	} else {
		fr = filereader.NewFile(context.Background(), f.fs.bs, meta, f.fs.freaderCache)
	}
	fr := filereader.NewFile(context.Background(), f.fs.bs, meta, f.fs.freaderCache)

	// FIXME(tsileo): test if preloading is worth it
	// fr.PreloadChunks()

R pkg/filetree/fs/ngfs/node.go => pkg/filetree/fs/node.go +1 -1
@@ 1,4 1,4 @@
package main
package fs

import (
	"time"

R pkg/filetree/fs/ngfs/recent.go => pkg/filetree/fs/recent.go +1 -1
@@ 1,4 1,4 @@
package main
package fs

import (
	"context"

R pkg/filetree/fs/ngfs/versions.go => pkg/filetree/fs/versions.go +1 -1
@@ 1,4 1,4 @@
package main
package fs

import (
	"context"