mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-11-25 23:53:32 -06:00
add git.iim.gay/grufwub/go-store for storage backend, replacing blob.Storage
Signed-off-by: kim (grufwub) <grufwub@gmail.com>
This commit is contained in:
parent
ab32ce642b
commit
e43a46e982
89 changed files with 9372 additions and 240 deletions
64
vendor/git.iim.gay/grufwub/go-store/kv/iterator.go
vendored
Normal file
64
vendor/git.iim.gay/grufwub/go-store/kv/iterator.go
vendored
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
package kv
|
||||
|
||||
import (
|
||||
"git.iim.gay/grufwub/go-errors"
|
||||
"git.iim.gay/grufwub/go-store/storage"
|
||||
)
|
||||
|
||||
var ErrIteratorClosed = errors.Define("store/kv: iterator closed")
|
||||
|
||||
// KVIterator provides a read-only iterator to all the key-value
|
||||
// pairs in a KVStore. While the iterator is open the store is read
|
||||
// locked, you MUST release the iterator when you are finished with
|
||||
// it.
|
||||
//
|
||||
// Please note:
|
||||
// - individual iterators are NOT concurrency safe, though it is safe to
|
||||
// have multiple iterators running concurrently
|
||||
type KVIterator struct {
|
||||
store *KVStore // store is the linked KVStore
|
||||
entries []storage.StorageEntry
|
||||
index int
|
||||
key string
|
||||
onClose func()
|
||||
}
|
||||
|
||||
// Next attempts to set the next key-value pair, the
|
||||
// return value is if there was another pair remaining
|
||||
func (i *KVIterator) Next() bool {
|
||||
next := i.index + 1
|
||||
if next >= len(i.entries) {
|
||||
i.key = ""
|
||||
return false
|
||||
}
|
||||
i.key = i.entries[next].Key()
|
||||
i.index = next
|
||||
return true
|
||||
}
|
||||
|
||||
// Key returns the next key from the store
|
||||
func (i *KVIterator) Key() string {
|
||||
return i.key
|
||||
}
|
||||
|
||||
// Release releases the KVIterator and KVStore's read lock
|
||||
func (i *KVIterator) Release() {
|
||||
// Reset key, path, entries
|
||||
i.store = nil
|
||||
i.key = ""
|
||||
i.entries = nil
|
||||
|
||||
// Perform requested callback
|
||||
i.onClose()
|
||||
}
|
||||
|
||||
// Value returns the next value from the KVStore
|
||||
func (i *KVIterator) Value() ([]byte, error) {
|
||||
// Check store isn't closed
|
||||
if i.store == nil {
|
||||
return nil, ErrIteratorClosed
|
||||
}
|
||||
|
||||
// Attempt to fetch from store
|
||||
return i.store.get(i.key)
|
||||
}
|
||||
125
vendor/git.iim.gay/grufwub/go-store/kv/state.go
vendored
Normal file
125
vendor/git.iim.gay/grufwub/go-store/kv/state.go
vendored
Normal file
|
|
@ -0,0 +1,125 @@
|
|||
package kv
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"git.iim.gay/grufwub/go-errors"
|
||||
)
|
||||
|
||||
var ErrStateClosed = errors.Define("store/kv: state closed")
|
||||
|
||||
// StateRO provides a read-only window to the store. While this
|
||||
// state is active during the Read() function window, the entire
|
||||
// store will be read-locked. The state is thread-safe for concurrent
|
||||
// use UNTIL the moment that your supplied function to Read() returns,
|
||||
// then the state has zero guarantees
|
||||
type StateRO struct {
|
||||
store *KVStore
|
||||
}
|
||||
|
||||
func (st *StateRO) Get(key string) ([]byte, error) {
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return nil, ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.get(key)
|
||||
}
|
||||
|
||||
func (st *StateRO) GetStream(key string) (io.ReadCloser, error) {
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return nil, ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.getStream(key)
|
||||
}
|
||||
|
||||
func (st *StateRO) Has(key string) (bool, error) {
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return false, ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.has(key)
|
||||
}
|
||||
|
||||
func (st *StateRO) close() {
|
||||
st.store = nil
|
||||
}
|
||||
|
||||
// StateRW provides a read-write window to the store. While this
|
||||
// state is active during the Update() function window, the entire
|
||||
// store will be locked. The state is thread-safe for concurrent
|
||||
// use UNTIL the moment that your supplied function to Update() returns,
|
||||
// then the state has zero guarantees
|
||||
type StateRW struct {
|
||||
store *KVStore
|
||||
}
|
||||
|
||||
func (st *StateRW) Get(key string) ([]byte, error) {
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return nil, ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.get(key)
|
||||
}
|
||||
|
||||
func (st *StateRW) GetStream(key string) (io.ReadCloser, error) {
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return nil, ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.getStream(key)
|
||||
}
|
||||
|
||||
func (st *StateRW) Put(key string, value []byte) error {
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.put(key, value)
|
||||
}
|
||||
|
||||
func (st *StateRW) PutStream(key string, r io.Reader) error {
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.putStream(key, r)
|
||||
}
|
||||
|
||||
func (st *StateRW) Has(key string) (bool, error) {
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return false, ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.has(key)
|
||||
}
|
||||
|
||||
func (st *StateRW) Delete(key string) error {
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.delete(key)
|
||||
}
|
||||
|
||||
func (st *StateRW) close() {
|
||||
st.store = nil
|
||||
}
|
||||
243
vendor/git.iim.gay/grufwub/go-store/kv/store.go
vendored
Normal file
243
vendor/git.iim.gay/grufwub/go-store/kv/store.go
vendored
Normal file
|
|
@ -0,0 +1,243 @@
|
|||
package kv
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"git.iim.gay/grufwub/go-mutexes"
|
||||
"git.iim.gay/grufwub/go-store/storage"
|
||||
"git.iim.gay/grufwub/go-store/util"
|
||||
)
|
||||
|
||||
// KVStore is a very simple, yet performant key-value store
|
||||
type KVStore struct {
|
||||
mutexMap mutexes.MutexMap // mutexMap is a map of keys to mutexes to protect file access
|
||||
mutex sync.RWMutex // mutex is the total store mutex
|
||||
storage storage.Storage // storage is the underlying storage
|
||||
}
|
||||
|
||||
func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) {
|
||||
// Attempt to open disk storage
|
||||
storage, err := storage.OpenFile(path, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Return new KVStore
|
||||
return OpenStorage(storage)
|
||||
}
|
||||
|
||||
func OpenBlock(path string, cfg *storage.BlockConfig) (*KVStore, error) {
|
||||
// Attempt to open block storage
|
||||
storage, err := storage.OpenBlock(path, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Return new KVStore
|
||||
return OpenStorage(storage)
|
||||
}
|
||||
|
||||
func OpenStorage(storage storage.Storage) (*KVStore, error) {
|
||||
// Perform initial storage clean
|
||||
err := storage.Clean()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Return new KVStore
|
||||
return &KVStore{
|
||||
mutexMap: mutexes.NewMap(mutexes.NewRW),
|
||||
mutex: sync.RWMutex{},
|
||||
storage: storage,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Get fetches the bytes for supplied key in the store
|
||||
func (st *KVStore) Get(key string) ([]byte, error) {
|
||||
// Acquire store read lock
|
||||
st.mutex.RLock()
|
||||
defer st.mutex.RUnlock()
|
||||
|
||||
// Pass to unprotected fn
|
||||
return st.get(key)
|
||||
}
|
||||
|
||||
func (st *KVStore) get(key string) ([]byte, error) {
|
||||
// Acquire read lock for key
|
||||
runlock := st.mutexMap.RLock(key)
|
||||
defer runlock()
|
||||
|
||||
// Read file bytes
|
||||
return st.storage.ReadBytes(key)
|
||||
}
|
||||
|
||||
// GetStream fetches a ReadCloser for the bytes at the supplied key location in the store
|
||||
func (st *KVStore) GetStream(key string) (io.ReadCloser, error) {
|
||||
// Acquire store read lock
|
||||
st.mutex.RLock()
|
||||
defer st.mutex.RUnlock()
|
||||
|
||||
// Pass to unprotected fn
|
||||
return st.getStream(key)
|
||||
}
|
||||
|
||||
func (st *KVStore) getStream(key string) (io.ReadCloser, error) {
|
||||
// Acquire read lock for key
|
||||
runlock := st.mutexMap.RLock(key)
|
||||
|
||||
// Attempt to open stream for read
|
||||
rd, err := st.storage.ReadStream(key)
|
||||
if err != nil {
|
||||
runlock()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wrap readcloser in our own callback closer
|
||||
return util.ReadCloserWithCallback(rd, runlock), nil
|
||||
}
|
||||
|
||||
// Put places the bytes at the supplied key location in the store
|
||||
func (st *KVStore) Put(key string, value []byte) error {
|
||||
// Acquire store write lock
|
||||
st.mutex.Lock()
|
||||
defer st.mutex.Unlock()
|
||||
|
||||
// Pass to unprotected fn
|
||||
return st.put(key, value)
|
||||
}
|
||||
|
||||
func (st *KVStore) put(key string, value []byte) error {
|
||||
// Acquire write lock for key
|
||||
unlock := st.mutexMap.Lock(key)
|
||||
defer unlock()
|
||||
|
||||
// Write file bytes
|
||||
return st.storage.WriteBytes(key, value)
|
||||
}
|
||||
|
||||
// PutStream writes the bytes from the supplied Reader at the supplied key location in the store
|
||||
func (st *KVStore) PutStream(key string, r io.Reader) error {
|
||||
// Acquire store write lock
|
||||
st.mutex.Lock()
|
||||
defer st.mutex.Unlock()
|
||||
|
||||
// Pass to unprotected fn
|
||||
return st.putStream(key, r)
|
||||
}
|
||||
|
||||
func (st *KVStore) putStream(key string, r io.Reader) error {
|
||||
// Acquire write lock for key
|
||||
unlock := st.mutexMap.Lock(key)
|
||||
defer unlock()
|
||||
|
||||
// Write file stream
|
||||
return st.storage.WriteStream(key, r)
|
||||
}
|
||||
|
||||
// Has checks whether the supplied key exists in the store
|
||||
func (st *KVStore) Has(key string) (bool, error) {
|
||||
// Acquire store read lock
|
||||
st.mutex.RLock()
|
||||
defer st.mutex.RUnlock()
|
||||
|
||||
// Pass to unprotected fn
|
||||
return st.has(key)
|
||||
}
|
||||
|
||||
func (st *KVStore) has(key string) (bool, error) {
|
||||
// Acquire read lock for key
|
||||
runlock := st.mutexMap.RLock(key)
|
||||
defer runlock()
|
||||
|
||||
// Stat file on disk
|
||||
return st.storage.Stat(key)
|
||||
}
|
||||
|
||||
// Delete removes the supplied key-value pair from the store
|
||||
func (st *KVStore) Delete(key string) error {
|
||||
// Acquire store write lock
|
||||
st.mutex.Lock()
|
||||
defer st.mutex.Unlock()
|
||||
|
||||
// Pass to unprotected fn
|
||||
return st.delete(key)
|
||||
}
|
||||
|
||||
func (st *KVStore) delete(key string) error {
|
||||
// Acquire write lock for key
|
||||
unlock := st.mutexMap.Lock(key)
|
||||
defer unlock()
|
||||
|
||||
// Remove file from disk
|
||||
return st.storage.Remove(key)
|
||||
}
|
||||
|
||||
// Iterator returns an Iterator for key-value pairs in the store, using supplied match function
|
||||
func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) {
|
||||
// If no function, match all
|
||||
if matchFn == nil {
|
||||
matchFn = func(string) bool { return true }
|
||||
}
|
||||
|
||||
// Get store read lock
|
||||
st.mutex.RLock()
|
||||
|
||||
// Setup the walk keys function
|
||||
entries := []storage.StorageEntry{}
|
||||
walkFn := func(entry storage.StorageEntry) {
|
||||
// Ignore unmatched entries
|
||||
if !matchFn(entry.Key()) {
|
||||
return
|
||||
}
|
||||
|
||||
// Add to entries
|
||||
entries = append(entries, entry)
|
||||
}
|
||||
|
||||
// Walk keys in the storage
|
||||
err := st.storage.WalkKeys(&storage.WalkKeysOptions{WalkFn: walkFn})
|
||||
if err != nil {
|
||||
st.mutex.RUnlock()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Return new iterator
|
||||
return &KVIterator{
|
||||
store: st,
|
||||
entries: entries,
|
||||
index: -1,
|
||||
key: "",
|
||||
onClose: st.mutex.RUnlock,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Read provides a read-only window to the store, holding it in a read-locked state until
|
||||
// the supplied function returns
|
||||
func (st *KVStore) Read(do func(*StateRO)) {
|
||||
// Get store read lock
|
||||
st.mutex.RLock()
|
||||
defer st.mutex.RUnlock()
|
||||
|
||||
// Create new store state (defer close)
|
||||
state := &StateRO{store: st}
|
||||
defer state.close()
|
||||
|
||||
// Pass state
|
||||
do(state)
|
||||
}
|
||||
|
||||
// Update provides a read-write window to the store, holding it in a read-write-locked state
|
||||
// until the supplied functions returns
|
||||
func (st *KVStore) Update(do func(*StateRW)) {
|
||||
// Get store lock
|
||||
st.mutex.Lock()
|
||||
defer st.mutex.Unlock()
|
||||
|
||||
// Create new store state (defer close)
|
||||
state := &StateRW{store: st}
|
||||
defer state.close()
|
||||
|
||||
// Pass state
|
||||
do(state)
|
||||
}
|
||||
785
vendor/git.iim.gay/grufwub/go-store/storage/block.go
vendored
Normal file
785
vendor/git.iim.gay/grufwub/go-store/storage/block.go
vendored
Normal file
|
|
@ -0,0 +1,785 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"git.iim.gay/grufwub/fastpath"
|
||||
"git.iim.gay/grufwub/go-bytes"
|
||||
"git.iim.gay/grufwub/go-errors"
|
||||
"git.iim.gay/grufwub/go-hashenc"
|
||||
"git.iim.gay/grufwub/go-store/util"
|
||||
)
|
||||
|
||||
var (
|
||||
nodePathPrefix = "node/"
|
||||
blockPathPrefix = "block/"
|
||||
)
|
||||
|
||||
// DefaultBlockConfig is the default BlockStorage configuration
|
||||
var DefaultBlockConfig = &BlockConfig{
|
||||
BlockSize: 1024 * 16,
|
||||
WriteBufSize: 4096,
|
||||
Overwrite: false,
|
||||
Compression: NoCompression(),
|
||||
}
|
||||
|
||||
// BlockConfig defines options to be used when opening a BlockStorage
|
||||
type BlockConfig struct {
|
||||
// BlockSize is the chunking size to use when splitting and storing blocks of data
|
||||
BlockSize int
|
||||
|
||||
// WriteBufSize is the buffer size to use when writing file streams (PutStream)
|
||||
WriteBufSize int
|
||||
|
||||
// Overwrite allows overwriting values of stored keys in the storage
|
||||
Overwrite bool
|
||||
|
||||
// Compression is the Compressor to use when reading / writing files, default is no compression
|
||||
Compression Compressor
|
||||
}
|
||||
|
||||
// getBlockConfig returns a valid BlockConfig for supplied ptr
|
||||
func getBlockConfig(cfg *BlockConfig) BlockConfig {
|
||||
// If nil, use default
|
||||
if cfg == nil {
|
||||
cfg = DefaultBlockConfig
|
||||
}
|
||||
|
||||
// Assume nil compress == none
|
||||
if cfg.Compression == nil {
|
||||
cfg.Compression = NoCompression()
|
||||
}
|
||||
|
||||
// Assume 0 chunk size == use default
|
||||
if cfg.BlockSize < 1 {
|
||||
cfg.BlockSize = DefaultBlockConfig.BlockSize
|
||||
}
|
||||
|
||||
// Assume 0 buf size == use default
|
||||
if cfg.WriteBufSize < 1 {
|
||||
cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize
|
||||
}
|
||||
|
||||
// Return owned config copy
|
||||
return BlockConfig{
|
||||
BlockSize: cfg.BlockSize,
|
||||
WriteBufSize: cfg.WriteBufSize,
|
||||
Overwrite: cfg.Overwrite,
|
||||
Compression: cfg.Compression,
|
||||
}
|
||||
}
|
||||
|
||||
// BlockStorage is a Storage implementation that stores input data as chunks on
|
||||
// a filesystem. Each value is chunked into blocks of configured size and these
|
||||
// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A
|
||||
// "node" file is finally created containing an array of hashes contained within
|
||||
// this value
|
||||
type BlockStorage struct {
|
||||
path string // path is the root path of this store
|
||||
blockPath string // blockPath is the joined root path + block path prefix
|
||||
nodePath string // nodePath is the joined root path + node path prefix
|
||||
config BlockConfig // cfg is the supplied configuration for this store
|
||||
hashPool sync.Pool // hashPool is this store's hashEncoder pool
|
||||
|
||||
// NOTE:
|
||||
// BlockStorage does not need to lock each of the underlying block files
|
||||
// as the filename itself directly relates to the contents. If there happens
|
||||
// to be an overwrite, it will just be of the same data since the filename is
|
||||
// the hash of the data.
|
||||
}
|
||||
|
||||
// OpenBlock opens a BlockStorage instance for given folder path and configuration
|
||||
func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
|
||||
// Acquire path builder
|
||||
pb := util.AcquirePathBuilder()
|
||||
defer util.ReleasePathBuilder(pb)
|
||||
|
||||
// Clean provided path, ensure ends in '/' (should
|
||||
// be dir, this helps with file path trimming later)
|
||||
path = pb.Clean(path) + "/"
|
||||
|
||||
// Get checked config
|
||||
config := getBlockConfig(cfg)
|
||||
|
||||
// Attempt to open path
|
||||
file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
|
||||
if err != nil {
|
||||
// If not a not-exist error, return
|
||||
if !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Attempt to make store path dirs
|
||||
err = os.MkdirAll(path, defaultDirPerms)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reopen dir now it's been created
|
||||
file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Double check this is a dir (NOT a file!)
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if !stat.IsDir() {
|
||||
return nil, errPathIsFile
|
||||
}
|
||||
|
||||
// Return new BlockStorage
|
||||
return &BlockStorage{
|
||||
path: path,
|
||||
blockPath: pb.Join(path, blockPathPrefix),
|
||||
nodePath: pb.Join(path, nodePathPrefix),
|
||||
config: config,
|
||||
hashPool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
return newHashEncoder()
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Clean implements storage.Clean()
|
||||
func (st *BlockStorage) Clean() error {
|
||||
nodes := map[string]*node{}
|
||||
|
||||
// Acquire path builder
|
||||
pb := fastpath.AcquireBuilder()
|
||||
defer fastpath.ReleaseBuilder(pb)
|
||||
|
||||
// Walk nodes dir for entries
|
||||
onceErr := errors.OnceError{}
|
||||
err := util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) {
|
||||
// Only deal with regular files
|
||||
if !fsentry.Type().IsRegular() {
|
||||
return
|
||||
}
|
||||
|
||||
// Stop if we hit error previously
|
||||
if onceErr.IsSet() {
|
||||
return
|
||||
}
|
||||
|
||||
// Get joined node path name
|
||||
npath = pb.Join(npath, fsentry.Name())
|
||||
|
||||
// Attempt to open RO file
|
||||
file, err := open(npath, defaultFileROFlags)
|
||||
if err != nil {
|
||||
onceErr.Store(err)
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Alloc new Node + acquire hash buffer for writes
|
||||
hbuf := util.AcquireBuffer(encodedHashLen)
|
||||
defer util.ReleaseBuffer(hbuf)
|
||||
node := node{}
|
||||
|
||||
// Write file contents to node
|
||||
_, err = io.CopyBuffer(
|
||||
&nodeWriter{
|
||||
node: &node,
|
||||
buf: hbuf,
|
||||
},
|
||||
file,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
onceErr.Store(err)
|
||||
return
|
||||
}
|
||||
|
||||
// Append to nodes slice
|
||||
nodes[fsentry.Name()] = &node
|
||||
})
|
||||
|
||||
// Handle errors (though nodePath may not have been created yet)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
} else if onceErr.IsSet() {
|
||||
return onceErr.Load()
|
||||
}
|
||||
|
||||
// Walk blocks dir for entries
|
||||
onceErr.Reset()
|
||||
err = util.WalkDir(pb, st.blockPath, func(bpath string, fsentry fs.DirEntry) {
|
||||
// Only deal with regular files
|
||||
if !fsentry.Type().IsRegular() {
|
||||
return
|
||||
}
|
||||
|
||||
// Stop if we hit error previously
|
||||
if onceErr.IsSet() {
|
||||
return
|
||||
}
|
||||
|
||||
inUse := false
|
||||
for key, node := range nodes {
|
||||
if node.removeHash(fsentry.Name()) {
|
||||
if len(node.hashes) < 1 {
|
||||
// This node contained hash, and after removal is now empty.
|
||||
// Remove this node from our tracked nodes slice
|
||||
delete(nodes, key)
|
||||
}
|
||||
inUse = true
|
||||
}
|
||||
}
|
||||
|
||||
// Block hash is used by node
|
||||
if inUse {
|
||||
return
|
||||
}
|
||||
|
||||
// Get joined block path name
|
||||
bpath = pb.Join(bpath, fsentry.Name())
|
||||
|
||||
// Remove this unused block path
|
||||
err := os.Remove(bpath)
|
||||
if err != nil {
|
||||
onceErr.Store(err)
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
// Handle errors (though blockPath may not have been created yet)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
} else if onceErr.IsSet() {
|
||||
return onceErr.Load()
|
||||
}
|
||||
|
||||
// If there are nodes left at this point, they are corrupt
|
||||
// (i.e. they're referencing block hashes that don't exist)
|
||||
if len(nodes) > 0 {
|
||||
nodeKeys := []string{}
|
||||
for key := range nodes {
|
||||
nodeKeys = append(nodeKeys, key)
|
||||
}
|
||||
return errCorruptNodes.Extend("%v", nodeKeys)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReadBytes implements Storage.ReadBytes()
|
||||
func (st *BlockStorage) ReadBytes(key string) ([]byte, error) {
|
||||
// Get stream reader for key
|
||||
rc, err := st.ReadStream(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Read all bytes and return
|
||||
return io.ReadAll(rc)
|
||||
}
|
||||
|
||||
// ReadStream implements Storage.ReadStream()
|
||||
func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
|
||||
// Get node file path for key
|
||||
npath, err := st.nodePathForKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Attempt to open RO file
|
||||
file, err := open(npath, defaultFileROFlags)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Alloc new Node + acquire hash buffer for writes
|
||||
hbuf := util.AcquireBuffer(encodedHashLen)
|
||||
defer util.ReleaseBuffer(hbuf)
|
||||
node := node{}
|
||||
|
||||
// Write file contents to node
|
||||
_, err = io.CopyBuffer(
|
||||
&nodeWriter{
|
||||
node: &node,
|
||||
buf: hbuf,
|
||||
},
|
||||
file,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Return new block reader
|
||||
return util.NopReadCloser(&blockReader{
|
||||
storage: st,
|
||||
node: &node,
|
||||
}), nil
|
||||
}
|
||||
|
||||
func (st *BlockStorage) readBlock(key string) ([]byte, error) {
|
||||
// Get block file path for key
|
||||
bpath := st.blockPathForKey(key)
|
||||
|
||||
// Attempt to open RO file
|
||||
file, err := open(bpath, defaultFileROFlags)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Wrap the file in a compressor
|
||||
cFile, err := st.config.Compression.Reader(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer cFile.Close()
|
||||
|
||||
// Read the entire file
|
||||
return io.ReadAll(cFile)
|
||||
}
|
||||
|
||||
// WriteBytes implements Storage.WriteBytes()
|
||||
func (st *BlockStorage) WriteBytes(key string, value []byte) error {
|
||||
return st.WriteStream(key, bytes.NewReader(value))
|
||||
}
|
||||
|
||||
// WriteStream implements Storage.WriteStream()
|
||||
func (st *BlockStorage) WriteStream(key string, r io.Reader) error {
|
||||
// Get node file path for key
|
||||
npath, err := st.nodePathForKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if this exists
|
||||
ok, err := stat(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if we allow overwrites
|
||||
if ok && !st.config.Overwrite {
|
||||
return ErrAlreadyExists
|
||||
}
|
||||
|
||||
// Ensure nodes dir (and any leading up to) exists
|
||||
err = os.MkdirAll(st.nodePath, defaultDirPerms)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Ensure blocks dir (and any leading up to) exists
|
||||
err = os.MkdirAll(st.blockPath, defaultDirPerms)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Alloc new node
|
||||
node := node{}
|
||||
|
||||
// Acquire HashEncoder
|
||||
hc := st.hashPool.Get().(*hashEncoder)
|
||||
defer st.hashPool.Put(hc)
|
||||
|
||||
// Create new waitgroup and OnceError for
|
||||
// goroutine error tracking and propagating
|
||||
wg := sync.WaitGroup{}
|
||||
onceErr := errors.OnceError{}
|
||||
|
||||
loop:
|
||||
for !onceErr.IsSet() {
|
||||
// Fetch new buffer for this loop
|
||||
buf := util.AcquireBuffer(st.config.BlockSize)
|
||||
buf.Grow(st.config.BlockSize)
|
||||
|
||||
// Read next chunk
|
||||
n, err := io.ReadFull(r, buf.B)
|
||||
switch err {
|
||||
case nil, io.ErrUnexpectedEOF:
|
||||
// do nothing
|
||||
case io.EOF:
|
||||
util.ReleaseBuffer(buf)
|
||||
break loop
|
||||
default:
|
||||
util.ReleaseBuffer(buf)
|
||||
return err
|
||||
}
|
||||
|
||||
// Hash the encoded data
|
||||
sum := hc.EncodeSum(buf.B)
|
||||
|
||||
// Append to the node's hashes
|
||||
node.hashes = append(node.hashes, sum.String())
|
||||
|
||||
// If already on disk, skip
|
||||
has, err := st.statBlock(sum.StringPtr())
|
||||
if err != nil {
|
||||
util.ReleaseBuffer(buf)
|
||||
return err
|
||||
} else if has {
|
||||
util.ReleaseBuffer(buf)
|
||||
continue loop
|
||||
}
|
||||
|
||||
// Write in separate goroutine
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
// Defer buffer release + signal done
|
||||
defer func() {
|
||||
util.ReleaseBuffer(buf)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
// Write block to store at hash
|
||||
err = st.writeBlock(sum.StringPtr(), buf.B[:n])
|
||||
if err != nil {
|
||||
onceErr.Store(err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
// We reached EOF
|
||||
if n < buf.Len() {
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
// Wait, check errors
|
||||
wg.Wait()
|
||||
if onceErr.IsSet() {
|
||||
return onceErr.Load()
|
||||
}
|
||||
|
||||
// If no hashes created, return
|
||||
if len(node.hashes) < 1 {
|
||||
return errNoHashesWritten
|
||||
}
|
||||
|
||||
// Prepare to swap error if need-be
|
||||
errSwap := errSwapNoop
|
||||
|
||||
// Build file RW flags
|
||||
// NOTE: we performed an initial check for
|
||||
// this before writing blocks, but if
|
||||
// the utilizer of this storage didn't
|
||||
// correctly mutex protect this key then
|
||||
// someone may have beaten us to the
|
||||
// punch at writing the node file.
|
||||
flags := defaultFileRWFlags
|
||||
if !st.config.Overwrite {
|
||||
flags |= syscall.O_EXCL
|
||||
|
||||
// Catch + replace err exist
|
||||
errSwap = errSwapExist
|
||||
}
|
||||
|
||||
// Attempt to open RW file
|
||||
file, err := open(npath, flags)
|
||||
if err != nil {
|
||||
return errSwap(err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Acquire write buffer
|
||||
buf := util.AcquireBuffer(st.config.WriteBufSize)
|
||||
defer util.ReleaseBuffer(buf)
|
||||
buf.Grow(st.config.WriteBufSize)
|
||||
|
||||
// Finally, write data to file
|
||||
_, err = io.CopyBuffer(file, &nodeReader{node: &node}, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
// writeBlock writes the block with hash and supplied value to the filesystem
|
||||
func (st *BlockStorage) writeBlock(hash string, value []byte) error {
|
||||
// Get block file path for key
|
||||
bpath := st.blockPathForKey(hash)
|
||||
|
||||
// Attempt to open RW file
|
||||
file, err := open(bpath, defaultFileRWFlags)
|
||||
if err != nil {
|
||||
if err == ErrAlreadyExists {
|
||||
err = nil /* race issue describe in struct NOTE */
|
||||
}
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Wrap the file in a compressor
|
||||
cFile, err := st.config.Compression.Writer(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cFile.Close()
|
||||
|
||||
// Write value to file
|
||||
_, err = cFile.Write(value)
|
||||
return err
|
||||
}
|
||||
|
||||
// statBlock checks for existence of supplied block hash
|
||||
func (st *BlockStorage) statBlock(hash string) (bool, error) {
|
||||
return stat(st.blockPathForKey(hash))
|
||||
}
|
||||
|
||||
// Stat implements Storage.Stat()
|
||||
func (st *BlockStorage) Stat(key string) (bool, error) {
|
||||
// Get node file path for key
|
||||
kpath, err := st.nodePathForKey(key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Check for file on disk
|
||||
return stat(kpath)
|
||||
}
|
||||
|
||||
// Remove implements Storage.Remove()
|
||||
func (st *BlockStorage) Remove(key string) error {
|
||||
// Get node file path for key
|
||||
kpath, err := st.nodePathForKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Attempt to remove file
|
||||
return os.Remove(kpath)
|
||||
}
|
||||
|
||||
// WalkKeys implements Storage.WalkKeys()
|
||||
func (st *BlockStorage) WalkKeys(opts *WalkKeysOptions) error {
|
||||
// Acquire path builder
|
||||
pb := fastpath.AcquireBuilder()
|
||||
defer fastpath.ReleaseBuilder(pb)
|
||||
|
||||
// Walk dir for entries
|
||||
return util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) {
|
||||
// Only deal with regular files
|
||||
if fsentry.Type().IsRegular() {
|
||||
opts.WalkFn(entry(fsentry.Name()))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// nodePathForKey calculates the node file path for supplied key
|
||||
func (st *BlockStorage) nodePathForKey(key string) (string, error) {
|
||||
// Path separators are illegal
|
||||
if strings.Contains(key, "/") {
|
||||
return "", ErrInvalidKey
|
||||
}
|
||||
|
||||
// Acquire path builder
|
||||
pb := util.AcquirePathBuilder()
|
||||
defer util.ReleasePathBuilder(pb)
|
||||
|
||||
// Return joined + cleaned node-path
|
||||
return pb.Join(st.nodePath, key), nil
|
||||
}
|
||||
|
||||
// blockPathForKey calculates the block file path for supplied hash
|
||||
func (st *BlockStorage) blockPathForKey(hash string) string {
|
||||
pb := util.AcquirePathBuilder()
|
||||
defer util.ReleasePathBuilder(pb)
|
||||
return pb.Join(st.blockPath, hash)
|
||||
}
|
||||
|
||||
// hashSeparator is the separating byte between block hashes
|
||||
const hashSeparator = byte(':')
|
||||
|
||||
// node represents the contents of a node file in storage
|
||||
type node struct {
|
||||
hashes []string
|
||||
}
|
||||
|
||||
// removeHash attempts to remove supplied block hash from the node's hash array
|
||||
func (n *node) removeHash(hash string) bool {
|
||||
haveDropped := false
|
||||
for i := 0; i < len(n.hashes); {
|
||||
if n.hashes[i] == hash {
|
||||
// Drop this hash from slice
|
||||
n.hashes = append(n.hashes[:i], n.hashes[i+1:]...)
|
||||
haveDropped = true
|
||||
} else {
|
||||
// Continue iter
|
||||
i++
|
||||
}
|
||||
}
|
||||
return haveDropped
|
||||
}
|
||||
|
||||
// nodeReader is an io.Reader implementation for the node file representation,
|
||||
// which is useful when calculated node file is being written to the store
|
||||
type nodeReader struct {
|
||||
node *node
|
||||
idx int
|
||||
last int
|
||||
}
|
||||
|
||||
func (r *nodeReader) Read(b []byte) (int, error) {
|
||||
n := 0
|
||||
|
||||
// '-1' means we missed writing
|
||||
// hash separator on last iteration
|
||||
if r.last == -1 {
|
||||
b[n] = hashSeparator
|
||||
n++
|
||||
r.last = 0
|
||||
}
|
||||
|
||||
for r.idx < len(r.node.hashes) {
|
||||
hash := r.node.hashes[r.idx]
|
||||
|
||||
// Copy into buffer + update read count
|
||||
m := copy(b[n:], hash[r.last:])
|
||||
n += m
|
||||
|
||||
// If incomplete copy, return here
|
||||
if m < len(hash)-r.last {
|
||||
r.last = m
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// Check we can write last separator
|
||||
if n == len(b) {
|
||||
r.last = -1
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// Write separator, iter, reset
|
||||
b[n] = hashSeparator
|
||||
n++
|
||||
r.idx++
|
||||
r.last = 0
|
||||
}
|
||||
|
||||
// We reached end of hashes
|
||||
return n, io.EOF
|
||||
}
|
||||
|
||||
// nodeWriter is an io.Writer implementation for the node file representation,
|
||||
// which is useful when calculated node file is being read from the store
|
||||
type nodeWriter struct {
|
||||
node *node
|
||||
buf *bytes.Buffer
|
||||
}
|
||||
|
||||
func (w *nodeWriter) Write(b []byte) (int, error) {
|
||||
n := 0
|
||||
|
||||
for {
|
||||
// Find next hash separator position
|
||||
idx := bytes.IndexByte(b[n:], hashSeparator)
|
||||
if idx == -1 {
|
||||
// Check we shouldn't be expecting it
|
||||
if w.buf.Len() > encodedHashLen {
|
||||
return n, errInvalidNode
|
||||
}
|
||||
|
||||
// Write all contents to buffer
|
||||
w.buf.Write(b[n:])
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
// Found hash separator, write
|
||||
// current buf contents to Node hashes
|
||||
w.buf.Write(b[n : n+idx])
|
||||
n += idx + 1
|
||||
if w.buf.Len() != encodedHashLen {
|
||||
return n, errInvalidNode
|
||||
}
|
||||
|
||||
// Append to hashes & reset
|
||||
w.node.hashes = append(w.node.hashes, w.buf.String())
|
||||
w.buf.Reset()
|
||||
}
|
||||
}
|
||||
|
||||
// blockReader is an io.Reader implementation for the combined, linked block
|
||||
// data contained with a node file. Basically, this allows reading value data
|
||||
// from the store for a given node file
|
||||
type blockReader struct {
|
||||
storage *BlockStorage
|
||||
node *node
|
||||
buf []byte
|
||||
prev int
|
||||
}
|
||||
|
||||
func (r *blockReader) Read(b []byte) (int, error) {
|
||||
n := 0
|
||||
|
||||
// Data left in buf, copy as much as we
|
||||
// can into supplied read buffer
|
||||
if r.prev < len(r.buf)-1 {
|
||||
n += copy(b, r.buf[r.prev:])
|
||||
r.prev += n
|
||||
if n >= len(b) {
|
||||
return n, nil
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
// Check we have any hashes left
|
||||
if len(r.node.hashes) < 1 {
|
||||
return n, io.EOF
|
||||
}
|
||||
|
||||
// Get next key from slice
|
||||
key := r.node.hashes[0]
|
||||
r.node.hashes = r.node.hashes[1:]
|
||||
|
||||
// Attempt to fetch next batch of data
|
||||
var err error
|
||||
r.buf, err = r.storage.readBlock(key)
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
r.prev = 0
|
||||
|
||||
// Copy as much as can from new buffer
|
||||
m := copy(b[n:], r.buf)
|
||||
r.prev += m
|
||||
n += m
|
||||
|
||||
// If we hit end of supplied buf, return
|
||||
if n >= len(b) {
|
||||
return n, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// hashEncoder is a HashEncoder with built-in encode buffer
|
||||
type hashEncoder struct {
|
||||
henc hashenc.HashEncoder
|
||||
ebuf []byte
|
||||
}
|
||||
|
||||
// encodedHashLen is the once-calculated encoded hash-sum length
|
||||
var encodedHashLen = hashenc.Base64().EncodedLen(
|
||||
sha256.New().Size(),
|
||||
)
|
||||
|
||||
// newHashEncoder returns a new hashEncoder instance
|
||||
func newHashEncoder() *hashEncoder {
|
||||
hash := sha256.New()
|
||||
enc := hashenc.Base64()
|
||||
return &hashEncoder{
|
||||
henc: hashenc.New(hash, enc),
|
||||
ebuf: make([]byte, enc.EncodedLen(hash.Size())),
|
||||
}
|
||||
}
|
||||
|
||||
// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum()
|
||||
func (henc *hashEncoder) EncodeSum(src []byte) bytes.Bytes {
|
||||
henc.henc.EncodeSum(henc.ebuf, src)
|
||||
return bytes.ToBytes(henc.ebuf)
|
||||
}
|
||||
104
vendor/git.iim.gay/grufwub/go-store/storage/compressor.go
vendored
Normal file
104
vendor/git.iim.gay/grufwub/go-store/storage/compressor.go
vendored
Normal file
|
|
@ -0,0 +1,104 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"compress/zlib"
|
||||
"io"
|
||||
|
||||
"git.iim.gay/grufwub/go-store/util"
|
||||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
// Compressor defines a means of compressing/decompressing values going into a key-value store
|
||||
type Compressor interface {
|
||||
// Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader
|
||||
Reader(io.Reader) (io.ReadCloser, error)
|
||||
|
||||
// Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer
|
||||
Writer(io.Writer) (io.WriteCloser, error)
|
||||
}
|
||||
|
||||
type gzipCompressor struct {
|
||||
level int
|
||||
}
|
||||
|
||||
// GZipCompressor returns a new Compressor that implements GZip at default compression level
|
||||
func GZipCompressor() Compressor {
|
||||
return GZipCompressorLevel(gzip.DefaultCompression)
|
||||
}
|
||||
|
||||
// GZipCompressorLevel returns a new Compressor that implements GZip at supplied compression level
|
||||
func GZipCompressorLevel(level int) Compressor {
|
||||
return &gzipCompressor{
|
||||
level: level,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *gzipCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
|
||||
return gzip.NewReader(r)
|
||||
}
|
||||
|
||||
func (c *gzipCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
|
||||
return gzip.NewWriterLevel(w, c.level)
|
||||
}
|
||||
|
||||
type zlibCompressor struct {
|
||||
level int
|
||||
dict []byte
|
||||
}
|
||||
|
||||
// ZLibCompressor returns a new Compressor that implements ZLib at default compression level
|
||||
func ZLibCompressor() Compressor {
|
||||
return ZLibCompressorLevelDict(zlib.DefaultCompression, nil)
|
||||
}
|
||||
|
||||
// ZLibCompressorLevel returns a new Compressor that implements ZLib at supplied compression level
|
||||
func ZLibCompressorLevel(level int) Compressor {
|
||||
return ZLibCompressorLevelDict(level, nil)
|
||||
}
|
||||
|
||||
// ZLibCompressorLevelDict returns a new Compressor that implements ZLib at supplied compression level with supplied dict
|
||||
func ZLibCompressorLevelDict(level int, dict []byte) Compressor {
|
||||
return &zlibCompressor{
|
||||
level: level,
|
||||
dict: dict,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *zlibCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
|
||||
return zlib.NewReaderDict(r, c.dict)
|
||||
}
|
||||
|
||||
func (c *zlibCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
|
||||
return zlib.NewWriterLevelDict(w, c.level, c.dict)
|
||||
}
|
||||
|
||||
type snappyCompressor struct{}
|
||||
|
||||
// SnappyCompressor returns a new Compressor that implements Snappy
|
||||
func SnappyCompressor() Compressor {
|
||||
return &snappyCompressor{}
|
||||
}
|
||||
|
||||
func (c *snappyCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
|
||||
return util.NopReadCloser(snappy.NewReader(r)), nil
|
||||
}
|
||||
|
||||
func (c *snappyCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
|
||||
return snappy.NewBufferedWriter(w), nil
|
||||
}
|
||||
|
||||
type nopCompressor struct{}
|
||||
|
||||
// NoCompression is a Compressor that simply does nothing
|
||||
func NoCompression() Compressor {
|
||||
return &nopCompressor{}
|
||||
}
|
||||
|
||||
func (c *nopCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
|
||||
return util.NopReadCloser(r), nil
|
||||
}
|
||||
|
||||
func (c *nopCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
|
||||
return util.NopWriteCloser(w), nil
|
||||
}
|
||||
289
vendor/git.iim.gay/grufwub/go-store/storage/disk.go
vendored
Normal file
289
vendor/git.iim.gay/grufwub/go-store/storage/disk.go
vendored
Normal file
|
|
@ -0,0 +1,289 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path"
|
||||
"syscall"
|
||||
|
||||
"git.iim.gay/grufwub/fastpath"
|
||||
"git.iim.gay/grufwub/go-bytes"
|
||||
"git.iim.gay/grufwub/go-store/util"
|
||||
)
|
||||
|
||||
// DefaultDiskConfig is the default DiskStorage configuration
|
||||
var DefaultDiskConfig = &DiskConfig{
|
||||
Overwrite: true,
|
||||
WriteBufSize: 4096,
|
||||
Transform: NopTransform(),
|
||||
Compression: NoCompression(),
|
||||
}
|
||||
|
||||
// DiskConfig defines options to be used when opening a DiskStorage
|
||||
type DiskConfig struct {
|
||||
// Transform is the supplied key<-->path KeyTransform
|
||||
Transform KeyTransform
|
||||
|
||||
// WriteBufSize is the buffer size to use when writing file streams (PutStream)
|
||||
WriteBufSize int
|
||||
|
||||
// Overwrite allows overwriting values of stored keys in the storage
|
||||
Overwrite bool
|
||||
|
||||
// Compression is the Compressor to use when reading / writing files, default is no compression
|
||||
Compression Compressor
|
||||
}
|
||||
|
||||
// getDiskConfig returns a valid DiskConfig for supplied ptr
|
||||
func getDiskConfig(cfg *DiskConfig) DiskConfig {
|
||||
// If nil, use default
|
||||
if cfg == nil {
|
||||
cfg = DefaultDiskConfig
|
||||
}
|
||||
|
||||
// Assume nil transform == none
|
||||
if cfg.Transform == nil {
|
||||
cfg.Transform = NopTransform()
|
||||
}
|
||||
|
||||
// Assume nil compress == none
|
||||
if cfg.Compression == nil {
|
||||
cfg.Compression = NoCompression()
|
||||
}
|
||||
|
||||
// Assume 0 buf size == use default
|
||||
if cfg.WriteBufSize < 1 {
|
||||
cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize
|
||||
}
|
||||
|
||||
// Return owned config copy
|
||||
return DiskConfig{
|
||||
Transform: cfg.Transform,
|
||||
WriteBufSize: cfg.WriteBufSize,
|
||||
Overwrite: cfg.Overwrite,
|
||||
Compression: cfg.Compression,
|
||||
}
|
||||
}
|
||||
|
||||
// DiskStorage is a Storage implementation that stores directly to a filesystem
|
||||
type DiskStorage struct {
|
||||
path string // path is the root path of this store
|
||||
dots int // dots is the "dotdot" count for the root store path
|
||||
config DiskConfig // cfg is the supplied configuration for this store
|
||||
}
|
||||
|
||||
// OpenFile opens a DiskStorage instance for given folder path and configuration
|
||||
func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
|
||||
// Acquire path builder
|
||||
pb := util.AcquirePathBuilder()
|
||||
defer util.ReleasePathBuilder(pb)
|
||||
|
||||
// Clean provided path, ensure ends in '/' (should
|
||||
// be dir, this helps with file path trimming later)
|
||||
path = pb.Clean(path) + "/"
|
||||
|
||||
// Get checked config
|
||||
config := getDiskConfig(cfg)
|
||||
|
||||
// Attempt to open dir path
|
||||
file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
|
||||
if err != nil {
|
||||
// If not a not-exist error, return
|
||||
if !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Attempt to make store path dirs
|
||||
err = os.MkdirAll(path, defaultDirPerms)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reopen dir now it's been created
|
||||
file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Double check this is a dir (NOT a file!)
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if !stat.IsDir() {
|
||||
return nil, errPathIsFile
|
||||
}
|
||||
|
||||
// Return new DiskStorage
|
||||
return &DiskStorage{
|
||||
path: path,
|
||||
dots: util.CountDotdots(path),
|
||||
config: config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Clean implements Storage.Clean()
|
||||
func (st *DiskStorage) Clean() error {
|
||||
return util.CleanDirs(st.path)
|
||||
}
|
||||
|
||||
// ReadBytes implements Storage.ReadBytes()
|
||||
func (st *DiskStorage) ReadBytes(key string) ([]byte, error) {
|
||||
// Get stream reader for key
|
||||
rc, err := st.ReadStream(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
// Read all bytes and return
|
||||
return io.ReadAll(rc)
|
||||
}
|
||||
|
||||
// ReadStream implements Storage.ReadStream()
|
||||
func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) {
|
||||
// Get file path for key
|
||||
kpath, err := st.filepath(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Attempt to open file (replace ENOENT with our own)
|
||||
file, err := open(kpath, defaultFileROFlags)
|
||||
if err != nil {
|
||||
return nil, errSwapNotFound(err)
|
||||
}
|
||||
|
||||
// Wrap the file in a compressor
|
||||
cFile, err := st.config.Compression.Reader(file)
|
||||
if err != nil {
|
||||
file.Close() // close this here, ignore error
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wrap compressor to ensure file close
|
||||
return util.ReadCloserWithCallback(cFile, func() {
|
||||
file.Close()
|
||||
}), nil
|
||||
}
|
||||
|
||||
// WriteBytes implements Storage.WriteBytes()
|
||||
func (st *DiskStorage) WriteBytes(key string, value []byte) error {
|
||||
return st.WriteStream(key, bytes.NewReader(value))
|
||||
}
|
||||
|
||||
// WriteStream implements Storage.WriteStream()
|
||||
func (st *DiskStorage) WriteStream(key string, r io.Reader) error {
|
||||
// Get file path for key
|
||||
kpath, err := st.filepath(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Ensure dirs leading up to file exist
|
||||
err = os.MkdirAll(path.Dir(kpath), defaultDirPerms)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Prepare to swap error if need-be
|
||||
errSwap := errSwapNoop
|
||||
|
||||
// Build file RW flags
|
||||
flags := defaultFileRWFlags
|
||||
if !st.config.Overwrite {
|
||||
flags |= syscall.O_EXCL
|
||||
|
||||
// Catch + replace err exist
|
||||
errSwap = errSwapExist
|
||||
}
|
||||
|
||||
// Attempt to open file
|
||||
file, err := open(kpath, flags)
|
||||
if err != nil {
|
||||
return errSwap(err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Wrap the file in a compressor
|
||||
cFile, err := st.config.Compression.Writer(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cFile.Close()
|
||||
|
||||
// Acquire write buffer
|
||||
buf := util.AcquireBuffer(st.config.WriteBufSize)
|
||||
defer util.ReleaseBuffer(buf)
|
||||
buf.Grow(st.config.WriteBufSize)
|
||||
|
||||
// Copy reader to file
|
||||
_, err = io.CopyBuffer(cFile, r, buf.B)
|
||||
return err
|
||||
}
|
||||
|
||||
// Stat implements Storage.Stat()
|
||||
func (st *DiskStorage) Stat(key string) (bool, error) {
|
||||
// Get file path for key
|
||||
kpath, err := st.filepath(key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Check for file on disk
|
||||
return stat(kpath)
|
||||
}
|
||||
|
||||
// Remove implements Storage.Remove()
|
||||
func (st *DiskStorage) Remove(key string) error {
|
||||
// Get file path for key
|
||||
kpath, err := st.filepath(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Attempt to remove file
|
||||
return os.Remove(kpath)
|
||||
}
|
||||
|
||||
// WalkKeys implements Storage.WalkKeys()
|
||||
func (st *DiskStorage) WalkKeys(opts *WalkKeysOptions) error {
|
||||
// Acquire path builder
|
||||
pb := fastpath.AcquireBuilder()
|
||||
defer fastpath.ReleaseBuilder(pb)
|
||||
|
||||
// Walk dir for entries
|
||||
return util.WalkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) {
|
||||
// Only deal with regular files
|
||||
if fsentry.Type().IsRegular() {
|
||||
// Get full item path (without root)
|
||||
kpath = pb.Join(kpath, fsentry.Name())[len(st.path):]
|
||||
|
||||
// Perform provided walk function
|
||||
opts.WalkFn(entry(st.config.Transform.PathToKey(kpath)))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// filepath checks and returns a formatted filepath for given key
|
||||
func (st *DiskStorage) filepath(key string) (string, error) {
|
||||
// Acquire path builder
|
||||
pb := util.AcquirePathBuilder()
|
||||
defer util.ReleasePathBuilder(pb)
|
||||
|
||||
// Calculate transformed key path
|
||||
key = st.config.Transform.KeyToPath(key)
|
||||
|
||||
// Generated joined root path
|
||||
pb.AppendString(st.path)
|
||||
pb.AppendString(key)
|
||||
|
||||
// If path is dir traversal, and traverses FURTHER
|
||||
// than store root, this is an error
|
||||
if util.CountDotdots(pb.StringPtr()) > st.dots {
|
||||
return "", ErrInvalidKey
|
||||
}
|
||||
return pb.String(), nil
|
||||
}
|
||||
63
vendor/git.iim.gay/grufwub/go-store/storage/errors.go
vendored
Normal file
63
vendor/git.iim.gay/grufwub/go-store/storage/errors.go
vendored
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// errorString is our own simple error type
|
||||
type errorString string
|
||||
|
||||
// Error implements error
|
||||
func (e errorString) Error() string {
|
||||
return string(e)
|
||||
}
|
||||
|
||||
// Extend appends extra information to an errorString
|
||||
func (e errorString) Extend(s string, a ...interface{}) errorString {
|
||||
return errorString(string(e) + ": " + fmt.Sprintf(s, a...))
|
||||
}
|
||||
|
||||
var (
|
||||
// ErrNotFound is the error returned when a key cannot be found in storage
|
||||
ErrNotFound = errorString("store/storage: key not found")
|
||||
|
||||
// ErrAlreadyExist is the error returned when a key already exists in storage
|
||||
ErrAlreadyExists = errorString("store/storage: key already exists")
|
||||
|
||||
// ErrInvalidkey is the error returned when an invalid key is passed to storage
|
||||
ErrInvalidKey = errorString("store/storage: invalid key")
|
||||
|
||||
// errPathIsFile is returned when a path for a disk config is actually a file
|
||||
errPathIsFile = errorString("store/storage: path is file")
|
||||
|
||||
// errNoHashesWritten is returned when no blocks are written for given input value
|
||||
errNoHashesWritten = errorString("storage/storage: no hashes written")
|
||||
|
||||
// errInvalidNode is returned when read on an invalid node in the store is attempted
|
||||
errInvalidNode = errorString("store/storage: invalid node")
|
||||
|
||||
// errCorruptNodes is returned when nodes with missing blocks are found during a BlockStorage clean
|
||||
errCorruptNodes = errorString("store/storage: corrupted nodes")
|
||||
)
|
||||
|
||||
// errSwapNoop performs no error swaps
|
||||
func errSwapNoop(err error) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// ErrSwapNotFound swaps syscall.ENOENT for ErrNotFound
|
||||
func errSwapNotFound(err error) error {
|
||||
if err == syscall.ENOENT {
|
||||
return ErrNotFound
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// errSwapExist swaps syscall.EEXIST for ErrAlreadyExists
|
||||
func errSwapExist(err error) error {
|
||||
if err == syscall.EEXIST {
|
||||
return ErrAlreadyExists
|
||||
}
|
||||
return err
|
||||
}
|
||||
48
vendor/git.iim.gay/grufwub/go-store/storage/fs.go
vendored
Normal file
48
vendor/git.iim.gay/grufwub/go-store/storage/fs.go
vendored
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
"git.iim.gay/grufwub/go-store/util"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultDirPerms = 0755
|
||||
defaultFilePerms = 0644
|
||||
defaultFileROFlags = syscall.O_RDONLY
|
||||
defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR
|
||||
defaultFileLockFlags = syscall.O_RDONLY | syscall.O_EXCL | syscall.O_CREAT
|
||||
)
|
||||
|
||||
// NOTE:
|
||||
// These functions are for opening storage files,
|
||||
// not necessarily for e.g. initial setup (OpenFile)
|
||||
|
||||
// open should not be called directly
|
||||
func open(path string, flags int) (*os.File, error) {
|
||||
var fd int
|
||||
err := util.RetryOnEINTR(func() (err error) {
|
||||
fd, err = syscall.Open(path, flags, defaultFilePerms)
|
||||
return
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return os.NewFile(uintptr(fd), path), nil
|
||||
}
|
||||
|
||||
// stat checks for a file on disk
|
||||
func stat(path string) (bool, error) {
|
||||
var stat syscall.Stat_t
|
||||
err := util.RetryOnEINTR(func() error {
|
||||
return syscall.Stat(path, &stat)
|
||||
})
|
||||
if err != nil {
|
||||
if err == syscall.ENOENT {
|
||||
err = nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
34
vendor/git.iim.gay/grufwub/go-store/storage/lock.go
vendored
Normal file
34
vendor/git.iim.gay/grufwub/go-store/storage/lock.go
vendored
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
"git.iim.gay/grufwub/go-store/util"
|
||||
)
|
||||
|
||||
type lockableFile struct {
|
||||
*os.File
|
||||
}
|
||||
|
||||
func openLock(path string) (*lockableFile, error) {
|
||||
file, err := open(path, defaultFileLockFlags)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &lockableFile{file}, nil
|
||||
}
|
||||
|
||||
func (f *lockableFile) lock() error {
|
||||
return f.flock(syscall.LOCK_EX | syscall.LOCK_NB)
|
||||
}
|
||||
|
||||
func (f *lockableFile) unlock() error {
|
||||
return f.flock(syscall.LOCK_UN | syscall.LOCK_NB)
|
||||
}
|
||||
|
||||
func (f *lockableFile) flock(how int) error {
|
||||
return util.RetryOnEINTR(func() error {
|
||||
return syscall.Flock(int(f.Fd()), how)
|
||||
})
|
||||
}
|
||||
51
vendor/git.iim.gay/grufwub/go-store/storage/storage.go
vendored
Normal file
51
vendor/git.iim.gay/grufwub/go-store/storage/storage.go
vendored
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
// StorageEntry defines a key in Storage
|
||||
type StorageEntry interface {
|
||||
// Key returns the storage entry's key
|
||||
Key() string
|
||||
}
|
||||
|
||||
// entry is the simplest possible StorageEntry
|
||||
type entry string
|
||||
|
||||
func (e entry) Key() string {
|
||||
return string(e)
|
||||
}
|
||||
|
||||
// Storage defines a means of storing and accessing key value pairs
|
||||
type Storage interface {
|
||||
// Clean removes unused values and unclutters the storage (e.g. removing empty folders)
|
||||
Clean() error
|
||||
|
||||
// ReadBytes returns the byte value for key in storage
|
||||
ReadBytes(key string) ([]byte, error)
|
||||
|
||||
// ReadStream returns an io.ReadCloser for the value bytes at key in the storage
|
||||
ReadStream(key string) (io.ReadCloser, error)
|
||||
|
||||
// WriteBytes writes the supplied value bytes at key in the storage
|
||||
WriteBytes(key string, value []byte) error
|
||||
|
||||
// WriteStream writes the bytes from supplied reader at key in the storage
|
||||
WriteStream(key string, r io.Reader) error
|
||||
|
||||
// Stat checks if the supplied key is in the storage
|
||||
Stat(key string) (bool, error)
|
||||
|
||||
// Remove attempts to remove the supplied key-value pair from storage
|
||||
Remove(key string) error
|
||||
|
||||
// WalkKeys walks the keys in the storage
|
||||
WalkKeys(opts *WalkKeysOptions) error
|
||||
}
|
||||
|
||||
// WalkKeysOptions defines how to walk the keys in a storage implementation
|
||||
type WalkKeysOptions struct {
|
||||
// WalkFn is the function to apply on each StorageEntry
|
||||
WalkFn func(StorageEntry)
|
||||
}
|
||||
25
vendor/git.iim.gay/grufwub/go-store/storage/transform.go
vendored
Normal file
25
vendor/git.iim.gay/grufwub/go-store/storage/transform.go
vendored
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
package storage
|
||||
|
||||
// KeyTransform defines a method of converting store keys to storage paths (and vice-versa)
|
||||
type KeyTransform interface {
|
||||
// KeyToPath converts a supplied key to storage path
|
||||
KeyToPath(string) string
|
||||
|
||||
// PathToKey converts a supplied storage path to key
|
||||
PathToKey(string) string
|
||||
}
|
||||
|
||||
type nopKeyTransform struct{}
|
||||
|
||||
// NopTransform returns a nop key transform (i.e. key = path)
|
||||
func NopTransform() KeyTransform {
|
||||
return &nopKeyTransform{}
|
||||
}
|
||||
|
||||
func (t *nopKeyTransform) KeyToPath(key string) string {
|
||||
return key
|
||||
}
|
||||
|
||||
func (t *nopKeyTransform) PathToKey(path string) string {
|
||||
return path
|
||||
}
|
||||
105
vendor/git.iim.gay/grufwub/go-store/util/fs.go
vendored
Normal file
105
vendor/git.iim.gay/grufwub/go-store/util/fs.go
vendored
Normal file
|
|
@ -0,0 +1,105 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"git.iim.gay/grufwub/fastpath"
|
||||
)
|
||||
|
||||
var dotdot = "../"
|
||||
|
||||
// CountDotdots returns the number of "dot-dots" (../) in a cleaned filesystem path
|
||||
func CountDotdots(path string) int {
|
||||
if !strings.HasSuffix(path, dotdot) {
|
||||
return 0
|
||||
}
|
||||
return strings.Count(path, dotdot)
|
||||
}
|
||||
|
||||
// WalkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry
|
||||
func WalkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry)) error {
|
||||
// Read supplied dir path
|
||||
dirEntries, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Iter entries
|
||||
for _, entry := range dirEntries {
|
||||
// Pass to walk fn
|
||||
walkFn(path, entry)
|
||||
|
||||
// Recurse dir entries
|
||||
if entry.IsDir() {
|
||||
err = WalkDir(pb, pb.Join(path, entry.Name()), walkFn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CleanDirs traverses the dir tree of the supplied path, removing any folders with zero children
|
||||
func CleanDirs(path string) error {
|
||||
// Acquire builder
|
||||
pb := AcquirePathBuilder()
|
||||
defer ReleasePathBuilder(pb)
|
||||
|
||||
// Get dir entries
|
||||
entries, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Recurse dirs
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
err := cleanDirs(pb, pb.Join(path, entry.Name()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// cleanDirs performs the actual dir cleaning logic for the exported version
|
||||
func cleanDirs(pb *fastpath.Builder, path string) error {
|
||||
// Get dir entries
|
||||
entries, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If no entries, delete
|
||||
if len(entries) < 1 {
|
||||
return os.Remove(path)
|
||||
}
|
||||
|
||||
// Recurse dirs
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
err := cleanDirs(pb, pb.Join(path, entry.Name()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RetryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received
|
||||
func RetryOnEINTR(do func() error) error {
|
||||
for {
|
||||
err := do()
|
||||
if err == syscall.EINTR {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
42
vendor/git.iim.gay/grufwub/go-store/util/io.go
vendored
Normal file
42
vendor/git.iim.gay/grufwub/go-store/util/io.go
vendored
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
package util
|
||||
|
||||
import "io"
|
||||
|
||||
// NopReadCloser turns a supplied io.Reader into io.ReadCloser with a nop Close() implementation
|
||||
func NopReadCloser(r io.Reader) io.ReadCloser {
|
||||
return &nopReadCloser{r}
|
||||
}
|
||||
|
||||
// NopWriteCloser turns a supplied io.Writer into io.WriteCloser with a nop Close() implementation
|
||||
func NopWriteCloser(w io.Writer) io.WriteCloser {
|
||||
return &nopWriteCloser{w}
|
||||
}
|
||||
|
||||
// ReadCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.ReadCloser
|
||||
func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser {
|
||||
return &callbackReadCloser{
|
||||
ReadCloser: rc,
|
||||
callback: cb,
|
||||
}
|
||||
}
|
||||
|
||||
// nopReadCloser turns an io.Reader -> io.ReadCloser with a nop Close()
|
||||
type nopReadCloser struct{ io.Reader }
|
||||
|
||||
func (r *nopReadCloser) Close() error { return nil }
|
||||
|
||||
// nopWriteCloser turns an io.Writer -> io.WriteCloser with a nop Close()
|
||||
type nopWriteCloser struct{ io.Writer }
|
||||
|
||||
func (w nopWriteCloser) Close() error { return nil }
|
||||
|
||||
// callbackReadCloser allows adding our own custom callback to an io.ReadCloser
|
||||
type callbackReadCloser struct {
|
||||
io.ReadCloser
|
||||
callback func()
|
||||
}
|
||||
|
||||
func (c *callbackReadCloser) Close() error {
|
||||
defer c.callback()
|
||||
return c.ReadCloser.Close()
|
||||
}
|
||||
6
vendor/git.iim.gay/grufwub/go-store/util/nocopy.go
vendored
Normal file
6
vendor/git.iim.gay/grufwub/go-store/util/nocopy.go
vendored
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
package util
|
||||
|
||||
type NoCopy struct{}
|
||||
|
||||
func (*NoCopy) Lock() {}
|
||||
func (*NoCopy) Unlock() {}
|
||||
44
vendor/git.iim.gay/grufwub/go-store/util/pools.go
vendored
Normal file
44
vendor/git.iim.gay/grufwub/go-store/util/pools.go
vendored
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"git.iim.gay/grufwub/fastpath"
|
||||
"git.iim.gay/grufwub/go-bufpool"
|
||||
"git.iim.gay/grufwub/go-bytes"
|
||||
)
|
||||
|
||||
// pathBuilderPool is the global fastpath.Builder pool, we implement
|
||||
// our own here instead of using fastpath's default one because we
|
||||
// don't want to deal with fastpath's sync.Once locks on every Acquire/Release
|
||||
var pathBuilderPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
pb := fastpath.NewBuilder(make([]byte, 0, 512))
|
||||
return &pb
|
||||
},
|
||||
}
|
||||
|
||||
// AcquirePathBuilder returns a reset fastpath.Builder instance
|
||||
func AcquirePathBuilder() *fastpath.Builder {
|
||||
return pathBuilderPool.Get().(*fastpath.Builder)
|
||||
}
|
||||
|
||||
// ReleasePathBuilder resets and releases provided fastpath.Builder instance to global pool
|
||||
func ReleasePathBuilder(pb *fastpath.Builder) {
|
||||
pb.Reset()
|
||||
pathBuilderPool.Put(pb)
|
||||
}
|
||||
|
||||
// bufferPool is the global BufferPool, we implement this here
|
||||
// so we can share allocations across whatever libaries need them.
|
||||
var bufferPool = bufpool.BufferPool{}
|
||||
|
||||
// AcquireBuffer returns a reset bytes.Buffer with at least requested capacity
|
||||
func AcquireBuffer(cap int) *bytes.Buffer {
|
||||
return bufferPool.Get(cap)
|
||||
}
|
||||
|
||||
// ReleaseBuffer resets and releases provided bytes.Buffer to global BufferPool
|
||||
func ReleaseBuffer(buf *bytes.Buffer) {
|
||||
bufferPool.Put(buf)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue