~tsileo/blobsfile

ref: 76a89107299fb76b273d92e13602ab1822aa4f76 blobsfile/index.go -rw-r--r-- 4.5 KiB
76a89107Thomas Sileo Bugfixes 2 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package blobsfile

import (
	"bytes"
	"encoding/binary"
	"encoding/hex"
	"fmt"
	"os"
	"path/filepath"
	"strconv"

	"a4.io/blobstash/pkg/rangedb"
)

// FIXME(tsileo): optimize the index with the benchmark (not worth it if inserting the blob take longer)

// MetaKey and BlobPosKey are used to namespace the DB keys.
const (
	metaKey byte = iota
	blobPosKey
)

// formatKey prepends the prefix byte to the given key.
func formatKey(prefix byte, bkey []byte) []byte {
	res := make([]byte, len(bkey)+1)
	res[0] = prefix
	copy(res[1:], bkey)
	return res
}

// blobsIndex holds the position of blobs in BlobsFile.
type blobsIndex struct {
	db   *rangedb.RangeDB
	path string
}

// blobPos is a blob entry in the index.
type blobPos struct {
	// bobs-n files
	n int
	// blobs offset/size in the blobs file
	offset   int64
	size     int
	blobSize int // the actual blob size (will be different from size if compression is enabled)
}

// Size returns the blob size (as stored in the BlobsFile).
func (blob *blobPos) Size() int {
	return blob.size
}

// Value serialize a BlobsPos as string.
// (value is encoded as uvarint: n + offset + size + blob size)
func (blob *blobPos) Value() []byte {
	bufTmp := make([]byte, 10)
	var buf bytes.Buffer
	w := binary.PutUvarint(bufTmp[:], uint64(blob.n))
	buf.Write(bufTmp[:w])
	w = binary.PutUvarint(bufTmp[:], uint64(blob.offset))
	buf.Write(bufTmp[:w])
	w = binary.PutUvarint(bufTmp[:], uint64(blob.size))
	buf.Write(bufTmp[:w])
	w = binary.PutUvarint(bufTmp[:], uint64(blob.blobSize))
	buf.Write(bufTmp[:w])
	return buf.Bytes()
}

func decodeBlobPos(data []byte) (blob *blobPos, error error) {
	blob = &blobPos{}
	r := bytes.NewBuffer(data)
	// read blob.n
	ures, err := binary.ReadUvarint(r)
	if err != nil {
		return blob, err
	}
	blob.n = int(ures)

	// read blob.offset
	ures, err = binary.ReadUvarint(r)
	if err != nil {
		return blob, err
	}
	blob.offset = int64(ures)

	// read blob.size
	ures, err = binary.ReadUvarint(r)
	if err != nil {
		return blob, err
	}
	blob.size = int(ures)

	// read blob.blobSize
	ures, err = binary.ReadUvarint(r)
	if err != nil {
		return blob, err
	}
	blob.blobSize = int(ures)

	return blob, nil
}

// newIndex initializes a new index.
func newIndex(path string) (*blobsIndex, error) {
	dbPath := filepath.Join(path, "blobs-index")
	db, err := rangedb.New(dbPath)
	return &blobsIndex{db: db, path: dbPath}, err
}

func (index *blobsIndex) formatBlobPosKey(key string) []byte {
	return formatKey(blobPosKey, []byte(key))
}

// Close closes all the open file descriptors.
func (index *blobsIndex) Close() error {
	return index.db.Close()
}

// remove removes the kv file.
func (index *blobsIndex) remove() error {
	return os.RemoveAll(index.path)
}

// setPos creates a new blobPos entry in the index for the given hash.
func (index *blobsIndex) setPos(hexHash string, pos *blobPos) error {
	hash, err := hex.DecodeString(hexHash)
	if err != nil {
		return err
	}
	return index.db.Set(formatKey(blobPosKey, hash), pos.Value())
}

// deletePos deletes the stored blobPos for the given hash.
// func (index *blobsIndex) deletePos(hexHash string) error {
//	hash, err := hex.DecodeString(hexHash)
//	if err != nil {
//		return err
//	}
//	return index.db.Delete(formatKey(blobPosKey, hash))
//}

// checkPos checks if a blobPos exists for the given hash (without decoding it).
func (index *blobsIndex) checkPos(hexHash string) (bool, error) {
	hash, err := hex.DecodeString(hexHash)
	if err != nil {
		return false, err
	}
	data, err := index.db.Get(formatKey(blobPosKey, hash))
	if err != nil {
		return false, fmt.Errorf("error getting BlobPos: %v", err)
	}
	if data == nil || len(data) == 0 {
		return false, nil
	}
	return true, nil
}

// getPos retrieve the stored blobPos for the given hash.
func (index *blobsIndex) getPos(hexHash string) (*blobPos, error) {
	hash, err := hex.DecodeString(hexHash)
	if err != nil {
		return nil, err
	}
	data, err := index.db.Get(formatKey(blobPosKey, hash))
	if err != nil {
		return nil, fmt.Errorf("error getting BlobPos: %v", err)
	}
	if data == nil {
		return nil, nil
	}
	bpos, err := decodeBlobPos(data)
	return bpos, err
}

// setN stores the latest N (blobs-N) to remember the latest BlobsFile opened.
func (index *blobsIndex) setN(n int) error {
	return index.db.Set(formatKey(metaKey, []byte("n")), []byte(strconv.Itoa(n)))
}

// getN retrieves the latest N (blobs-N) stored.
func (index *blobsIndex) getN() (int, error) {
	data, err := index.db.Get(formatKey(metaKey, []byte("n")))
	if err != nil || string(data) == "" {
		return 0, nil
	}
	return strconv.Atoi(string(data))
}