[chore] bump gruf/go-store to v2 (#953)

* [chore] bump gruf/go-store to v2

* no more boobs
This commit is contained in:
tobi 2022-11-05 12:10:19 +01:00 committed by GitHub
commit bcb80d3ff4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
105 changed files with 12360 additions and 4859 deletions

View file

@ -1,63 +0,0 @@
package kv
import (
"errors"
"codeberg.org/gruf/go-mutexes"
"codeberg.org/gruf/go-store/storage"
)
var ErrIteratorClosed = errors.New("store/kv: iterator closed")
// KVIterator provides a read-only iterator to all the key-value
// pairs in a KVStore. While the iterator is open the store is read
// locked, you MUST release the iterator when you are finished with
// it.
//
// Please note:
// - individual iterators are NOT concurrency safe, though it is safe to
// have multiple iterators running concurrently
type KVIterator struct {
store *KVStore // store is the linked KVStore
state *mutexes.LockState
entries []storage.StorageEntry
index int
key string
}
// Next attempts to set the next key-value pair, the
// return value is if there was another pair remaining
func (i *KVIterator) Next() bool {
next := i.index + 1
if next >= len(i.entries) {
i.key = ""
return false
}
i.key = i.entries[next].Key()
i.index = next
return true
}
// Key returns the next key from the store
func (i *KVIterator) Key() string {
return i.key
}
// Release releases the KVIterator and KVStore's read lock
func (i *KVIterator) Release() {
i.state.UnlockMap()
i.store = nil
i.key = ""
i.entries = nil
}
// Value returns the next value from the KVStore
func (i *KVIterator) Value() ([]byte, error) {
// Check store isn't closed
if i.store == nil {
return nil, ErrIteratorClosed
}
// Attempt to fetch from store
return i.store.get(i.state.RLock, i.key)
}

View file

@ -1,130 +0,0 @@
package kv
import (
"errors"
"io"
"codeberg.org/gruf/go-mutexes"
)
var ErrStateClosed = errors.New("store/kv: state closed")
// StateRO provides a read-only window to the store. While this
// state is active during the Read() function window, the entire
// store will be read-locked. The state is thread-safe for concurrent
// use UNTIL the moment that your supplied function to Read() returns,
// then the state has zero guarantees
type StateRO struct {
store *KVStore
state *mutexes.LockState
}
func (st *StateRO) Get(key string) ([]byte, error) {
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
return st.store.get(st.state.RLock, key)
}
func (st *StateRO) GetStream(key string) (io.ReadCloser, error) {
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
return st.store.getStream(st.state.RLock, key)
}
func (st *StateRO) Has(key string) (bool, error) {
// Check not closed
if st.store == nil {
return false, ErrStateClosed
}
// Pass request to store
return st.store.has(st.state.RLock, key)
}
func (st *StateRO) Release() {
st.state.UnlockMap()
st.store = nil
}
// StateRW provides a read-write window to the store. While this
// state is active during the Update() function window, the entire
// store will be locked. The state is thread-safe for concurrent
// use UNTIL the moment that your supplied function to Update() returns,
// then the state has zero guarantees
type StateRW struct {
store *KVStore
state *mutexes.LockState
}
func (st *StateRW) Get(key string) ([]byte, error) {
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
return st.store.get(st.state.RLock, key)
}
func (st *StateRW) GetStream(key string) (io.ReadCloser, error) {
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
return st.store.getStream(st.state.RLock, key)
}
func (st *StateRW) Put(key string, value []byte) error {
// Check not closed
if st.store == nil {
return ErrStateClosed
}
// Pass request to store
return st.store.put(st.state.Lock, key, value)
}
func (st *StateRW) PutStream(key string, r io.Reader) error {
// Check not closed
if st.store == nil {
return ErrStateClosed
}
// Pass request to store
return st.store.putStream(st.state.Lock, key, r)
}
func (st *StateRW) Has(key string) (bool, error) {
// Check not closed
if st.store == nil {
return false, ErrStateClosed
}
// Pass request to store
return st.store.has(st.state.RLock, key)
}
func (st *StateRW) Delete(key string) error {
// Check not closed
if st.store == nil {
return ErrStateClosed
}
// Pass request to store
return st.store.delete(st.state.Lock, key)
}
func (st *StateRW) Release() {
st.state.UnlockMap()
st.store = nil
}

View file

@ -1,227 +0,0 @@
package kv
import (
"io"
"codeberg.org/gruf/go-mutexes"
"codeberg.org/gruf/go-store/storage"
"codeberg.org/gruf/go-store/util"
)
// KVStore is a very simple, yet performant key-value store
type KVStore struct {
mutex mutexes.MutexMap // mutex is a map of keys to mutexes to protect file access
storage storage.Storage // storage is the underlying storage
}
func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) {
// Attempt to open disk storage
storage, err := storage.OpenFile(path, cfg)
if err != nil {
return nil, err
}
// Return new KVStore
return OpenStorage(storage)
}
func OpenBlock(path string, cfg *storage.BlockConfig) (*KVStore, error) {
// Attempt to open block storage
storage, err := storage.OpenBlock(path, cfg)
if err != nil {
return nil, err
}
// Return new KVStore
return OpenStorage(storage)
}
func OpenStorage(storage storage.Storage) (*KVStore, error) {
// Perform initial storage clean
err := storage.Clean()
if err != nil {
return nil, err
}
// Return new KVStore
return &KVStore{
mutex: mutexes.NewMap(-1, -1),
storage: storage,
}, nil
}
// RLock acquires a read-lock on supplied key, returning unlock function.
func (st *KVStore) RLock(key string) (runlock func()) {
return st.mutex.RLock(key)
}
// Lock acquires a write-lock on supplied key, returning unlock function.
func (st *KVStore) Lock(key string) (unlock func()) {
return st.mutex.Lock(key)
}
// Get fetches the bytes for supplied key in the store
func (st *KVStore) Get(key string) ([]byte, error) {
return st.get(st.RLock, key)
}
func (st *KVStore) get(rlock func(string) func(), key string) ([]byte, error) {
// Acquire read lock for key
runlock := rlock(key)
defer runlock()
// Read file bytes
return st.storage.ReadBytes(key)
}
// GetStream fetches a ReadCloser for the bytes at the supplied key location in the store
func (st *KVStore) GetStream(key string) (io.ReadCloser, error) {
return st.getStream(st.RLock, key)
}
func (st *KVStore) getStream(rlock func(string) func(), key string) (io.ReadCloser, error) {
// Acquire read lock for key
runlock := rlock(key)
// Attempt to open stream for read
rd, err := st.storage.ReadStream(key)
if err != nil {
runlock()
return nil, err
}
// Wrap readcloser in our own callback closer
return util.ReadCloserWithCallback(rd, runlock), nil
}
// Put places the bytes at the supplied key location in the store
func (st *KVStore) Put(key string, value []byte) error {
return st.put(st.Lock, key, value)
}
func (st *KVStore) put(lock func(string) func(), key string, value []byte) error {
// Acquire write lock for key
unlock := lock(key)
defer unlock()
// Write file bytes
return st.storage.WriteBytes(key, value)
}
// PutStream writes the bytes from the supplied Reader at the supplied key location in the store
func (st *KVStore) PutStream(key string, r io.Reader) error {
return st.putStream(st.Lock, key, r)
}
func (st *KVStore) putStream(lock func(string) func(), key string, r io.Reader) error {
// Acquire write lock for key
unlock := lock(key)
defer unlock()
// Write file stream
return st.storage.WriteStream(key, r)
}
// Has checks whether the supplied key exists in the store
func (st *KVStore) Has(key string) (bool, error) {
return st.has(st.RLock, key)
}
func (st *KVStore) has(rlock func(string) func(), key string) (bool, error) {
// Acquire read lock for key
runlock := rlock(key)
defer runlock()
// Stat file on disk
return st.storage.Stat(key)
}
// Delete removes the supplied key-value pair from the store
func (st *KVStore) Delete(key string) error {
return st.delete(st.Lock, key)
}
func (st *KVStore) delete(lock func(string) func(), key string) error {
// Acquire write lock for key
unlock := lock(key)
defer unlock()
// Remove file from disk
return st.storage.Remove(key)
}
// Iterator returns an Iterator for key-value pairs in the store, using supplied match function
func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) {
// If no function, match all
if matchFn == nil {
matchFn = func(string) bool { return true }
}
// Get store read lock
state := st.mutex.RLockMap()
// Setup the walk keys function
entries := []storage.StorageEntry{}
walkFn := func(entry storage.StorageEntry) {
// Ignore unmatched entries
if !matchFn(entry.Key()) {
return
}
// Add to entries
entries = append(entries, entry)
}
// Walk keys in the storage
err := st.storage.WalkKeys(storage.WalkKeysOptions{WalkFn: walkFn})
if err != nil {
state.UnlockMap()
return nil, err
}
// Return new iterator
return &KVIterator{
store: st,
state: state,
entries: entries,
index: -1,
key: "",
}, nil
}
// Read provides a read-only window to the store, holding it in a read-locked state until release
func (st *KVStore) Read() *StateRO {
state := st.mutex.RLockMap()
return &StateRO{store: st, state: state}
}
// ReadFn provides a read-only window to the store, holding it in a read-locked state until fn return.
func (st *KVStore) ReadFn(fn func(*StateRO)) {
// Acquire read-only state
state := st.Read()
defer state.Release()
// Pass to fn
fn(state)
}
// Update provides a read-write window to the store, holding it in a write-locked state until release
func (st *KVStore) Update() *StateRW {
state := st.mutex.LockMap()
return &StateRW{store: st, state: state}
}
// UpdateFn provides a read-write window to the store, holding it in a write-locked state until fn return.
func (st *KVStore) UpdateFn(fn func(*StateRW)) {
// Acquire read-write state
state := st.Update()
defer state.Release()
// Pass to fn
fn(state)
}
// Close will close the underlying storage, the mutex map locking (e.g. RLock(), Lock() will still work).
func (st *KVStore) Close() error {
return st.storage.Close()
}

View file

@ -1,104 +0,0 @@
package storage
import (
"compress/gzip"
"compress/zlib"
"io"
"codeberg.org/gruf/go-store/util"
"github.com/golang/snappy"
)
// Compressor defines a means of compressing/decompressing values going into a key-value store
type Compressor interface {
// Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader
Reader(io.Reader) (io.ReadCloser, error)
// Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer
Writer(io.Writer) (io.WriteCloser, error)
}
type gzipCompressor struct {
level int
}
// GZipCompressor returns a new Compressor that implements GZip at default compression level
func GZipCompressor() Compressor {
return GZipCompressorLevel(gzip.DefaultCompression)
}
// GZipCompressorLevel returns a new Compressor that implements GZip at supplied compression level
func GZipCompressorLevel(level int) Compressor {
return &gzipCompressor{
level: level,
}
}
func (c *gzipCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
return gzip.NewReader(r)
}
func (c *gzipCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
return gzip.NewWriterLevel(w, c.level)
}
type zlibCompressor struct {
level int
dict []byte
}
// ZLibCompressor returns a new Compressor that implements ZLib at default compression level
func ZLibCompressor() Compressor {
return ZLibCompressorLevelDict(zlib.DefaultCompression, nil)
}
// ZLibCompressorLevel returns a new Compressor that implements ZLib at supplied compression level
func ZLibCompressorLevel(level int) Compressor {
return ZLibCompressorLevelDict(level, nil)
}
// ZLibCompressorLevelDict returns a new Compressor that implements ZLib at supplied compression level with supplied dict
func ZLibCompressorLevelDict(level int, dict []byte) Compressor {
return &zlibCompressor{
level: level,
dict: dict,
}
}
func (c *zlibCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
return zlib.NewReaderDict(r, c.dict)
}
func (c *zlibCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
return zlib.NewWriterLevelDict(w, c.level, c.dict)
}
type snappyCompressor struct{}
// SnappyCompressor returns a new Compressor that implements Snappy
func SnappyCompressor() Compressor {
return &snappyCompressor{}
}
func (c *snappyCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
return util.NopReadCloser(snappy.NewReader(r)), nil
}
func (c *snappyCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
return snappy.NewBufferedWriter(w), nil
}
type nopCompressor struct{}
// NoCompression is a Compressor that simply does nothing
func NoCompression() Compressor {
return &nopCompressor{}
}
func (c *nopCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
return util.NopReadCloser(r), nil
}
func (c *nopCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
return util.NopWriteCloser(w), nil
}

View file

@ -1,65 +0,0 @@
package storage
import (
"os"
"syscall"
"codeberg.org/gruf/go-store/util"
)
const (
// default file permission bits
defaultDirPerms = 0o755
defaultFilePerms = 0o644
// default file open flags
defaultFileROFlags = syscall.O_RDONLY
defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR
defaultFileLockFlags = syscall.O_RDONLY | syscall.O_CREAT
)
// NOTE:
// These functions are for opening storage files,
// not necessarily for e.g. initial setup (OpenFile)
// open should not be called directly.
func open(path string, flags int) (*os.File, error) {
var fd int
err := util.RetryOnEINTR(func() (err error) {
fd, err = syscall.Open(path, flags, defaultFilePerms)
return
})
if err != nil {
return nil, err
}
return os.NewFile(uintptr(fd), path), nil
}
// stat checks for a file on disk.
func stat(path string) (bool, error) {
var stat syscall.Stat_t
err := util.RetryOnEINTR(func() error {
return syscall.Stat(path, &stat)
})
if err != nil {
if err == syscall.ENOENT { //nolint
err = nil
}
return false, err
}
return true, nil
}
// unlink removes a file (not dir!) on disk.
func unlink(path string) error {
return util.RetryOnEINTR(func() error {
return syscall.Unlink(path)
})
}
// rmdir removes a dir (not file!) on disk.
func rmdir(path string) error {
return util.RetryOnEINTR(func() error {
return syscall.Rmdir(path)
})
}

View file

@ -1,188 +0,0 @@
package storage
import (
"io"
"sync"
"codeberg.org/gruf/go-bytes"
"codeberg.org/gruf/go-store/util"
)
// MemoryStorage is a storage implementation that simply stores key-value
// pairs in a Go map in-memory. The map is protected by a mutex.
type MemoryStorage struct {
ow bool // overwrites
fs map[string][]byte
mu sync.Mutex
st uint32
}
// OpenMemory opens a new MemoryStorage instance with internal map of 'size'.
func OpenMemory(size int, overwrites bool) *MemoryStorage {
return &MemoryStorage{
fs: make(map[string][]byte, size),
mu: sync.Mutex{},
ow: overwrites,
}
}
// Clean implements Storage.Clean().
func (st *MemoryStorage) Clean() error {
st.mu.Lock()
defer st.mu.Unlock()
if st.st == 1 {
return ErrClosed
}
return nil
}
// ReadBytes implements Storage.ReadBytes().
func (st *MemoryStorage) ReadBytes(key string) ([]byte, error) {
// Lock storage
st.mu.Lock()
// Check store open
if st.st == 1 {
st.mu.Unlock()
return nil, ErrClosed
}
// Check for key
b, ok := st.fs[key]
st.mu.Unlock()
// Return early if not exist
if !ok {
return nil, ErrNotFound
}
// Create return copy
return bytes.Copy(b), nil
}
// ReadStream implements Storage.ReadStream().
func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) {
// Lock storage
st.mu.Lock()
// Check store open
if st.st == 1 {
st.mu.Unlock()
return nil, ErrClosed
}
// Check for key
b, ok := st.fs[key]
st.mu.Unlock()
// Return early if not exist
if !ok {
return nil, ErrNotFound
}
// Create io.ReadCloser from 'b' copy
b = bytes.Copy(b)
r := bytes.NewReader(b)
return util.NopReadCloser(r), nil
}
// WriteBytes implements Storage.WriteBytes().
func (st *MemoryStorage) WriteBytes(key string, b []byte) error {
// Lock storage
st.mu.Lock()
defer st.mu.Unlock()
// Check store open
if st.st == 1 {
return ErrClosed
}
_, ok := st.fs[key]
// Check for already exist
if ok && !st.ow {
return ErrAlreadyExists
}
// Write + unlock
st.fs[key] = bytes.Copy(b)
return nil
}
// WriteStream implements Storage.WriteStream().
func (st *MemoryStorage) WriteStream(key string, r io.Reader) error {
// Read all from reader
b, err := io.ReadAll(r)
if err != nil {
return err
}
// Write to storage
return st.WriteBytes(key, b)
}
// Stat implements Storage.Stat().
func (st *MemoryStorage) Stat(key string) (bool, error) {
// Lock storage
st.mu.Lock()
defer st.mu.Unlock()
// Check store open
if st.st == 1 {
return false, ErrClosed
}
// Check for key
_, ok := st.fs[key]
return ok, nil
}
// Remove implements Storage.Remove().
func (st *MemoryStorage) Remove(key string) error {
// Lock storage
st.mu.Lock()
defer st.mu.Unlock()
// Check store open
if st.st == 1 {
return ErrClosed
}
// Check for key
_, ok := st.fs[key]
if !ok {
return ErrNotFound
}
// Remove from store
delete(st.fs, key)
return nil
}
// Close implements Storage.Close().
func (st *MemoryStorage) Close() error {
st.mu.Lock()
st.st = 1
st.mu.Unlock()
return nil
}
// WalkKeys implements Storage.WalkKeys().
func (st *MemoryStorage) WalkKeys(opts WalkKeysOptions) error {
// Lock storage
st.mu.Lock()
defer st.mu.Unlock()
// Check store open
if st.st == 1 {
return ErrClosed
}
// Walk store keys
for key := range st.fs {
opts.WalkFn(entry(key))
}
return nil
}

View file

@ -1,82 +0,0 @@
package util
import (
"io/fs"
"os"
"codeberg.org/gruf/go-fastpath"
)
// WalkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry
func WalkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry)) error {
// Read supplied dir path
dirEntries, err := os.ReadDir(path)
if err != nil {
return err
}
// Iter entries
for _, entry := range dirEntries {
// Pass to walk fn
walkFn(path, entry)
// Recurse dir entries
if entry.IsDir() {
err = WalkDir(pb, pb.Join(path, entry.Name()), walkFn)
if err != nil {
return err
}
}
}
return nil
}
// CleanDirs traverses the dir tree of the supplied path, removing any folders with zero children
func CleanDirs(path string) error {
// Acquire builder
pb := GetPathBuilder()
defer PutPathBuilder(pb)
// Get dir entries
entries, err := os.ReadDir(path)
if err != nil {
return err
}
// Recurse dirs
for _, entry := range entries {
if entry.IsDir() {
err := cleanDirs(pb, pb.Join(path, entry.Name()))
if err != nil {
return err
}
}
}
return nil
}
// cleanDirs performs the actual dir cleaning logic for the exported version
func cleanDirs(pb *fastpath.Builder, path string) error {
// Get dir entries
entries, err := os.ReadDir(path)
if err != nil {
return err
}
// If no entries, delete
if len(entries) < 1 {
return os.Remove(path)
}
// Recurse dirs
for _, entry := range entries {
if entry.IsDir() {
err := cleanDirs(pb, pb.Join(path, entry.Name()))
if err != nil {
return err
}
}
}
return nil
}

View file

@ -1,42 +0,0 @@
package util
import "io"
// NopReadCloser turns a supplied io.Reader into io.ReadCloser with a nop Close() implementation
func NopReadCloser(r io.Reader) io.ReadCloser {
return &nopReadCloser{r}
}
// NopWriteCloser turns a supplied io.Writer into io.WriteCloser with a nop Close() implementation
func NopWriteCloser(w io.Writer) io.WriteCloser {
return &nopWriteCloser{w}
}
// ReadCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.ReadCloser
func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser {
return &callbackReadCloser{
ReadCloser: rc,
callback: cb,
}
}
// nopReadCloser turns an io.Reader -> io.ReadCloser with a nop Close()
type nopReadCloser struct{ io.Reader }
func (r *nopReadCloser) Close() error { return nil }
// nopWriteCloser turns an io.Writer -> io.WriteCloser with a nop Close()
type nopWriteCloser struct{ io.Writer }
func (w nopWriteCloser) Close() error { return nil }
// callbackReadCloser allows adding our own custom callback to an io.ReadCloser
type callbackReadCloser struct {
io.ReadCloser
callback func()
}
func (c *callbackReadCloser) Close() error {
defer c.callback()
return c.ReadCloser.Close()
}

View file

@ -1,14 +0,0 @@
package util
import "syscall"
// RetryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received
func RetryOnEINTR(do func() error) error {
for {
err := do()
if err == syscall.EINTR {
continue
}
return err
}
}

View file

@ -1,6 +1,6 @@
MIT License
Copyright (c) 2021 gruf
Copyright (c) 2022 gruf
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

63
vendor/codeberg.org/gruf/go-store/v2/kv/iterator.go generated vendored Normal file
View file

@ -0,0 +1,63 @@
package kv
import (
"context"
"errors"
"codeberg.org/gruf/go-mutexes"
"codeberg.org/gruf/go-store/v2/storage"
)
var ErrIteratorClosed = errors.New("store/kv: iterator closed")
// Iterator provides a read-only iterator to all the key-value
// pairs in a KVStore. While the iterator is open the store is read
// locked, you MUST release the iterator when you are finished with
// it.
//
// Please note:
// individual iterators are NOT concurrency safe, though it is safe to
// have multiple iterators running concurrently.
type Iterator struct {
store *KVStore // store is the linked KVStore
state *mutexes.LockState
entries []storage.Entry
index int
key string
}
// Next attempts to fetch the next key-value pair, the
// return value indicates whether another pair remains.
func (i *Iterator) Next() bool {
next := i.index + 1
if next >= len(i.entries) {
i.key = ""
return false
}
i.key = i.entries[next].Key
i.index = next
return true
}
// Key returns the current iterator key.
func (i *Iterator) Key() string {
return i.key
}
// Value returns the current iterator value at key.
func (i *Iterator) Value(ctx context.Context) ([]byte, error) {
if i.store == nil {
return nil, ErrIteratorClosed
}
return i.store.get(i.state.RLock, ctx, i.key)
}
// Release will release the store read-lock, and close this iterator.
func (i *Iterator) Release() {
i.state.UnlockMap()
i.state = nil
i.store = nil
i.key = ""
i.entries = nil
i.index = 0
}

116
vendor/codeberg.org/gruf/go-store/v2/kv/state.go generated vendored Normal file
View file

@ -0,0 +1,116 @@
package kv
import (
"context"
"errors"
"io"
"codeberg.org/gruf/go-mutexes"
)
// ErrStateClosed is returned on further calls to states after calling Release().
var ErrStateClosed = errors.New("store/kv: state closed")
// StateRO provides a read-only window to the store. While this
// state is active during the Read() function window, the entire
// store will be read-locked. The state is thread-safe for concurrent
// use UNTIL the moment that your supplied function to Read() returns.
type StateRO struct {
store *KVStore
state *mutexes.LockState
}
// Get: see KVStore.Get(). Returns error if state already closed.
func (st *StateRO) Get(ctx context.Context, key string) ([]byte, error) {
if st.store == nil {
return nil, ErrStateClosed
}
return st.store.get(st.state.RLock, ctx, key)
}
// GetStream: see KVStore.GetStream(). Returns error if state already closed.
func (st *StateRO) GetStream(ctx context.Context, key string) (io.ReadCloser, error) {
if st.store == nil {
return nil, ErrStateClosed
}
return st.store.getStream(st.state.RLock, ctx, key)
}
// Has: see KVStore.Has(). Returns error if state already closed.
func (st *StateRO) Has(ctx context.Context, key string) (bool, error) {
if st.store == nil {
return false, ErrStateClosed
}
return st.store.has(st.state.RLock, ctx, key)
}
// Release will release the store read-lock, and close this state.
func (st *StateRO) Release() {
st.state.UnlockMap()
st.state = nil
st.store = nil
}
// StateRW provides a read-write window to the store. While this
// state is active during the Update() function window, the entire
// store will be locked. The state is thread-safe for concurrent
// use UNTIL the moment that your supplied function to Update() returns.
type StateRW struct {
store *KVStore
state *mutexes.LockState
}
// Get: see KVStore.Get(). Returns error if state already closed.
func (st *StateRW) Get(ctx context.Context, key string) ([]byte, error) {
if st.store == nil {
return nil, ErrStateClosed
}
return st.store.get(st.state.RLock, ctx, key)
}
// GetStream: see KVStore.GetStream(). Returns error if state already closed.
func (st *StateRW) GetStream(ctx context.Context, key string) (io.ReadCloser, error) {
if st.store == nil {
return nil, ErrStateClosed
}
return st.store.getStream(st.state.RLock, ctx, key)
}
// Put: see KVStore.Put(). Returns error if state already closed.
func (st *StateRW) Put(ctx context.Context, key string, value []byte) error {
if st.store == nil {
return ErrStateClosed
}
return st.store.put(st.state.Lock, ctx, key, value)
}
// PutStream: see KVStore.PutStream(). Returns error if state already closed.
func (st *StateRW) PutStream(ctx context.Context, key string, r io.Reader) error {
if st.store == nil {
return ErrStateClosed
}
return st.store.putStream(st.state.Lock, ctx, key, r)
}
// Has: see KVStore.Has(). Returns error if state already closed.
func (st *StateRW) Has(ctx context.Context, key string) (bool, error) {
if st.store == nil {
return false, ErrStateClosed
}
return st.store.has(st.state.RLock, ctx, key)
}
// Delete: see KVStore.Delete(). Returns error if state already closed.
func (st *StateRW) Delete(ctx context.Context, key string) error {
if st.store == nil {
return ErrStateClosed
}
return st.store.delete(st.state.Lock, ctx, key)
}
// Release will release the store lock, and close this state.
func (st *StateRW) Release() {
st.state.UnlockMap()
st.state = nil
st.store = nil
}

253
vendor/codeberg.org/gruf/go-store/v2/kv/store.go generated vendored Normal file
View file

@ -0,0 +1,253 @@
package kv
import (
"context"
"io"
"codeberg.org/gruf/go-mutexes"
"codeberg.org/gruf/go-store/v2/storage"
"codeberg.org/gruf/go-store/v2/util"
)
// KVStore is a very simple, yet performant key-value store
type KVStore struct {
mu mutexes.MutexMap // map of keys to mutexes to protect key access
st storage.Storage // underlying storage implementation
}
func OpenDisk(path string, cfg *storage.DiskConfig) (*KVStore, error) {
// Attempt to open disk storage
storage, err := storage.OpenDisk(path, cfg)
if err != nil {
return nil, err
}
// Return new KVStore
return OpenStorage(storage)
}
func OpenBlock(path string, cfg *storage.BlockConfig) (*KVStore, error) {
// Attempt to open block storage
storage, err := storage.OpenBlock(path, cfg)
if err != nil {
return nil, err
}
// Return new KVStore
return OpenStorage(storage)
}
func OpenMemory(overwrites bool) *KVStore {
return &KVStore{
mu: mutexes.NewMap(-1, -1),
st: storage.OpenMemory(100, overwrites),
}
}
func OpenS3(endpoint string, bucket string, cfg *storage.S3Config) (*KVStore, error) {
// Attempt to open S3 storage
storage, err := storage.OpenS3(endpoint, bucket, cfg)
if err != nil {
return nil, err
}
// Return new KVStore
return OpenStorage(storage)
}
func OpenStorage(storage storage.Storage) (*KVStore, error) {
// Perform initial storage clean
err := storage.Clean(context.Background())
if err != nil {
return nil, err
}
// Return new KVStore
return &KVStore{
mu: mutexes.NewMap(-1, -1),
st: storage,
}, nil
}
// RLock acquires a read-lock on supplied key, returning unlock function.
func (st *KVStore) RLock(key string) (runlock func()) {
return st.mu.RLock(key)
}
// Lock acquires a write-lock on supplied key, returning unlock function.
func (st *KVStore) Lock(key string) (unlock func()) {
return st.mu.Lock(key)
}
// Get fetches the bytes for supplied key in the store.
func (st *KVStore) Get(ctx context.Context, key string) ([]byte, error) {
return st.get(st.RLock, ctx, key)
}
// get performs the underlying logic for KVStore.Get(), using supplied read lock func to allow use with states.
func (st *KVStore) get(rlock func(string) func(), ctx context.Context, key string) ([]byte, error) {
// Acquire read lock for key
runlock := rlock(key)
defer runlock()
// Read file bytes from storage
return st.st.ReadBytes(ctx, key)
}
// GetStream fetches a ReadCloser for the bytes at the supplied key in the store.
func (st *KVStore) GetStream(ctx context.Context, key string) (io.ReadCloser, error) {
return st.getStream(st.RLock, ctx, key)
}
// getStream performs the underlying logic for KVStore.GetStream(), using supplied read lock func to allow use with states.
func (st *KVStore) getStream(rlock func(string) func(), ctx context.Context, key string) (io.ReadCloser, error) {
// Acquire read lock for key
runlock := rlock(key)
// Attempt to open stream for read
rd, err := st.st.ReadStream(ctx, key)
if err != nil {
runlock()
return nil, err
}
// Wrap readcloser in our own callback closer
return util.ReadCloserWithCallback(rd, runlock), nil
}
// Put places the bytes at the supplied key in the store.
func (st *KVStore) Put(ctx context.Context, key string, value []byte) error {
return st.put(st.Lock, ctx, key, value)
}
// put performs the underlying logic for KVStore.Put(), using supplied lock func to allow use with states.
func (st *KVStore) put(lock func(string) func(), ctx context.Context, key string, value []byte) error {
// Acquire write lock for key
unlock := lock(key)
defer unlock()
// Write file bytes to storage
return st.st.WriteBytes(ctx, key, value)
}
// PutStream writes the bytes from the supplied Reader at the supplied key in the store.
func (st *KVStore) PutStream(ctx context.Context, key string, r io.Reader) error {
return st.putStream(st.Lock, ctx, key, r)
}
// putStream performs the underlying logic for KVStore.PutStream(), using supplied lock func to allow use with states.
func (st *KVStore) putStream(lock func(string) func(), ctx context.Context, key string, r io.Reader) error {
// Acquire write lock for key
unlock := lock(key)
defer unlock()
// Write file stream to storage
return st.st.WriteStream(ctx, key, r)
}
// Has checks whether the supplied key exists in the store.
func (st *KVStore) Has(ctx context.Context, key string) (bool, error) {
return st.has(st.RLock, ctx, key)
}
// has performs the underlying logic for KVStore.Has(), using supplied read lock func to allow use with states.
func (st *KVStore) has(rlock func(string) func(), ctx context.Context, key string) (bool, error) {
// Acquire read lock for key
runlock := rlock(key)
defer runlock()
// Stat file in storage
return st.st.Stat(ctx, key)
}
// Delete removes value at supplied key from the store.
func (st *KVStore) Delete(ctx context.Context, key string) error {
return st.delete(st.Lock, ctx, key)
}
// delete performs the underlying logic for KVStore.Delete(), using supplied lock func to allow use with states.
func (st *KVStore) delete(lock func(string) func(), ctx context.Context, key string) error {
// Acquire write lock for key
unlock := lock(key)
defer unlock()
// Remove file from storage
return st.st.Remove(ctx, key)
}
// Iterator returns an Iterator for key-value pairs in the store, using supplied match function
func (st *KVStore) Iterator(ctx context.Context, matchFn func(string) bool) (*Iterator, error) {
if matchFn == nil {
// By default simply match all keys
matchFn = func(string) bool { return true }
}
// Get store read lock state
state := st.mu.RLockMap()
var entries []storage.Entry
walkFn := func(ctx context.Context, entry storage.Entry) error {
// Ignore unmatched entries
if !matchFn(entry.Key) {
return nil
}
// Add to entries
entries = append(entries, entry)
return nil
}
// Collate keys in storage with our walk function
err := st.st.WalkKeys(ctx, storage.WalkKeysOptions{WalkFn: walkFn})
if err != nil {
state.UnlockMap()
return nil, err
}
// Return new iterator
return &Iterator{
store: st,
state: state,
entries: entries,
index: -1,
key: "",
}, nil
}
// Read provides a read-only window to the store, holding it in a read-locked state until release.
func (st *KVStore) Read() *StateRO {
state := st.mu.RLockMap()
return &StateRO{store: st, state: state}
}
// ReadFn provides a read-only window to the store, holding it in a read-locked state until fn return..
func (st *KVStore) ReadFn(fn func(*StateRO)) {
// Acquire read-only state
state := st.Read()
defer state.Release()
// Pass to fn
fn(state)
}
// Update provides a read-write window to the store, holding it in a write-locked state until release.
func (st *KVStore) Update() *StateRW {
state := st.mu.LockMap()
return &StateRW{store: st, state: state}
}
// UpdateFn provides a read-write window to the store, holding it in a write-locked state until fn return.
func (st *KVStore) UpdateFn(fn func(*StateRW)) {
// Acquire read-write state
state := st.Update()
defer state.Release()
// Pass to fn
fn(state)
}
// Close will close the underlying storage, the mutex map locking (e.g. RLock(), Lock()) will continue to function.
func (st *KVStore) Close() error {
return st.st.Close()
}

View file

@ -2,6 +2,7 @@ package storage
import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"io"
@ -16,7 +17,7 @@ import (
"codeberg.org/gruf/go-fastcopy"
"codeberg.org/gruf/go-hashenc"
"codeberg.org/gruf/go-pools"
"codeberg.org/gruf/go-store/util"
"codeberg.org/gruf/go-store/v2/util"
)
var (
@ -24,7 +25,7 @@ var (
blockPathPrefix = "block/"
)
// DefaultBlockConfig is the default BlockStorage configuration
// DefaultBlockConfig is the default BlockStorage configuration.
var DefaultBlockConfig = &BlockConfig{
BlockSize: 1024 * 16,
WriteBufSize: 4096,
@ -32,25 +33,26 @@ var DefaultBlockConfig = &BlockConfig{
Compression: NoCompression(),
}
// BlockConfig defines options to be used when opening a BlockStorage
// BlockConfig defines options to be used when opening a BlockStorage.
type BlockConfig struct {
// BlockSize is the chunking size to use when splitting and storing blocks of data
// BlockSize is the chunking size to use when splitting and storing blocks of data.
BlockSize int
// ReadBufSize is the buffer size to use when reading node files
// ReadBufSize is the buffer size to use when reading node files.
ReadBufSize int
// WriteBufSize is the buffer size to use when writing file streams (PutStream)
// WriteBufSize is the buffer size to use when writing file streams.
WriteBufSize int
// Overwrite allows overwriting values of stored keys in the storage
// Overwrite allows overwriting values of stored keys in the storage.
Overwrite bool
// Compression is the Compressor to use when reading / writing files, default is no compression
// Compression is the Compressor to use when reading / writing files,
// default is no compression.
Compression Compressor
}
// getBlockConfig returns a valid BlockConfig for supplied ptr
// getBlockConfig returns a valid BlockConfig for supplied ptr.
func getBlockConfig(cfg *BlockConfig) BlockConfig {
// If nil, use default
if cfg == nil {
@ -63,12 +65,12 @@ func getBlockConfig(cfg *BlockConfig) BlockConfig {
}
// Assume 0 chunk size == use default
if cfg.BlockSize < 1 {
if cfg.BlockSize <= 0 {
cfg.BlockSize = DefaultBlockConfig.BlockSize
}
// Assume 0 buf size == use default
if cfg.WriteBufSize < 1 {
if cfg.WriteBufSize <= 0 {
cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize
}
@ -85,7 +87,7 @@ func getBlockConfig(cfg *BlockConfig) BlockConfig {
// a filesystem. Each value is chunked into blocks of configured size and these
// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A
// "node" file is finally created containing an array of hashes contained within
// this value
// this value.
type BlockStorage struct {
path string // path is the root path of this store
blockPath string // blockPath is the joined root path + block path prefix
@ -103,7 +105,7 @@ type BlockStorage struct {
// the hash of the data.
}
// OpenBlock opens a BlockStorage instance for given folder path and configuration
// OpenBlock opens a BlockStorage instance for given folder path and configuration.
func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
// Acquire path builder
pb := util.GetPathBuilder()
@ -143,7 +145,7 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
if err != nil {
return nil, err
} else if !stat.IsDir() {
return nil, errPathIsFile
return nil, new_error("path is file")
}
// Open and acquire storage lock for path
@ -182,34 +184,29 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
return st, nil
}
// Clean implements storage.Clean()
func (st *BlockStorage) Clean() error {
// Track open
st.lock.Add()
defer st.lock.Done()
// Clean implements storage.Clean().
func (st *BlockStorage) Clean(ctx context.Context) error {
// Check if open
if st.lock.Closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
nodes := map[string]*node{}
onceErr := errors.OnceError{}
// Walk nodes dir for entries
err := util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) {
err := walkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) error {
// Only deal with regular files
if !fsentry.Type().IsRegular() {
return
}
// Stop if we hit error previously
if onceErr.IsSet() {
return
return nil
}
// Get joined node path name
@ -218,8 +215,7 @@ func (st *BlockStorage) Clean() error {
// Attempt to open RO file
file, err := open(npath, defaultFileROFlags)
if err != nil {
onceErr.Store(err)
return
return err
}
defer file.Close()
@ -239,32 +235,24 @@ func (st *BlockStorage) Clean() error {
nil,
)
if err != nil {
onceErr.Store(err)
return
return err
}
// Append to nodes slice
nodes[fsentry.Name()] = &node
return nil
})
// Handle errors (though nodePath may not have been created yet)
if err != nil && !os.IsNotExist(err) {
return err
} else if onceErr.IsSet() {
return onceErr.Load()
}
// Walk blocks dir for entries
onceErr.Reset()
err = util.WalkDir(pb, st.blockPath, func(bpath string, fsentry fs.DirEntry) {
err = walkDir(pb, st.blockPath, func(bpath string, fsentry fs.DirEntry) error {
// Only deal with regular files
if !fsentry.Type().IsRegular() {
return
}
// Stop if we hit error previously
if onceErr.IsSet() {
return
return nil
}
inUse := false
@ -281,25 +269,19 @@ func (st *BlockStorage) Clean() error {
// Block hash is used by node
if inUse {
return
return nil
}
// Get joined block path name
bpath = pb.Join(bpath, fsentry.Name())
// Remove this unused block path
err := os.Remove(bpath)
if err != nil {
onceErr.Store(err)
return
}
return os.Remove(bpath)
})
// Handle errors (though blockPath may not have been created yet)
if err != nil && !os.IsNotExist(err) {
return err
} else if onceErr.IsSet() {
return onceErr.Load()
}
// If there are nodes left at this point, they are corrupt
@ -315,10 +297,10 @@ func (st *BlockStorage) Clean() error {
return nil
}
// ReadBytes implements Storage.ReadBytes()
func (st *BlockStorage) ReadBytes(key string) ([]byte, error) {
// ReadBytes implements Storage.ReadBytes().
func (st *BlockStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
// Get stream reader for key
rc, err := st.ReadStream(key)
rc, err := st.ReadStream(ctx, key)
if err != nil {
return nil, err
}
@ -328,27 +310,27 @@ func (st *BlockStorage) ReadBytes(key string) ([]byte, error) {
return io.ReadAll(rc)
}
// ReadStream implements Storage.ReadStream()
func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
// ReadStream implements Storage.ReadStream().
func (st *BlockStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
// Get node file path for key
npath, err := st.nodePathForKey(key)
if err != nil {
return nil, err
}
// Track open
st.lock.Add()
// Check if open
if st.lock.Closed() {
st.lock.Done()
return nil, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return nil, err
}
// Attempt to open RO file
file, err := open(npath, defaultFileROFlags)
if err != nil {
st.lock.Done()
return nil, errSwapNotFound(err)
}
defer file.Close()
@ -357,8 +339,9 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
hbuf := st.bufpool.Get()
defer st.bufpool.Put(hbuf)
var node node
// Write file contents to node
node := node{}
_, err = st.cppool.Copy(
&nodeWriter{
node: &node,
@ -367,18 +350,17 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
file,
)
if err != nil {
st.lock.Done()
return nil, err
}
// Prepare block reader and return
rc := util.NopReadCloser(&blockReader{
return util.NopReadCloser(&blockReader{
storage: st,
node: &node,
}) // we wrap the blockreader to decr lockfile waitgroup
return util.ReadCloserWithCallback(rc, st.lock.Done), nil
}), nil
}
// readBlock reads the block with hash (key) from the filesystem.
func (st *BlockStorage) readBlock(key string) ([]byte, error) {
// Get block file path for key
bpath := st.blockPathForKey(key)
@ -386,14 +368,14 @@ func (st *BlockStorage) readBlock(key string) ([]byte, error) {
// Attempt to open RO file
file, err := open(bpath, defaultFileROFlags)
if err != nil {
return nil, wrap(errCorruptNode, err)
return nil, wrap(new_error("corrupted node"), err)
}
defer file.Close()
// Wrap the file in a compressor
cFile, err := st.config.Compression.Reader(file)
if err != nil {
return nil, wrap(errCorruptNode, err)
return nil, wrap(new_error("corrupted node"), err)
}
defer cFile.Close()
@ -401,28 +383,29 @@ func (st *BlockStorage) readBlock(key string) ([]byte, error) {
return io.ReadAll(cFile)
}
// WriteBytes implements Storage.WriteBytes()
func (st *BlockStorage) WriteBytes(key string, value []byte) error {
return st.WriteStream(key, bytes.NewReader(value))
// WriteBytes implements Storage.WriteBytes().
func (st *BlockStorage) WriteBytes(ctx context.Context, key string, value []byte) error {
return st.WriteStream(ctx, key, bytes.NewReader(value))
}
// WriteStream implements Storage.WriteStream()
func (st *BlockStorage) WriteStream(key string, r io.Reader) error {
// WriteStream implements Storage.WriteStream().
func (st *BlockStorage) WriteStream(ctx context.Context, key string, r io.Reader) error {
// Get node file path for key
npath, err := st.nodePathForKey(key)
if err != nil {
return err
}
// Track open
st.lock.Add()
defer st.lock.Done()
// Check if open
if st.lock.Closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Check if this exists
ok, err := stat(key)
if err != nil {
@ -446,8 +429,7 @@ func (st *BlockStorage) WriteStream(key string, r io.Reader) error {
return err
}
// Alloc new node
node := node{}
var node node
// Acquire HashEncoder
hc := st.hashPool.Get().(*hashEncoder)
@ -529,7 +511,7 @@ loop:
// If no hashes created, return
if len(node.hashes) < 1 {
return errNoHashesWritten
return new_error("no hashes written")
}
// Prepare to swap error if need-be
@ -563,11 +545,11 @@ loop:
buf.Grow(st.config.WriteBufSize)
// Finally, write data to file
_, err = io.CopyBuffer(file, &nodeReader{node: &node}, nil)
_, err = io.CopyBuffer(file, &nodeReader{node: node}, buf.B)
return err
}
// writeBlock writes the block with hash and supplied value to the filesystem
// writeBlock writes the block with hash and supplied value to the filesystem.
func (st *BlockStorage) writeBlock(hash string, value []byte) error {
// Get block file path for key
bpath := st.blockPathForKey(hash)
@ -594,49 +576,51 @@ func (st *BlockStorage) writeBlock(hash string, value []byte) error {
return err
}
// statBlock checks for existence of supplied block hash
// statBlock checks for existence of supplied block hash.
func (st *BlockStorage) statBlock(hash string) (bool, error) {
return stat(st.blockPathForKey(hash))
}
// Stat implements Storage.Stat()
func (st *BlockStorage) Stat(key string) (bool, error) {
func (st *BlockStorage) Stat(ctx context.Context, key string) (bool, error) {
// Get node file path for key
kpath, err := st.nodePathForKey(key)
if err != nil {
return false, err
}
// Track open
st.lock.Add()
defer st.lock.Done()
// Check if open
if st.lock.Closed() {
return false, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return false, err
}
// Check for file on disk
return stat(kpath)
}
// Remove implements Storage.Remove()
func (st *BlockStorage) Remove(key string) error {
// Remove implements Storage.Remove().
func (st *BlockStorage) Remove(ctx context.Context, key string) error {
// Get node file path for key
kpath, err := st.nodePathForKey(key)
if err != nil {
return err
}
// Track open
st.lock.Add()
defer st.lock.Done()
// Check if open
if st.lock.Closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Remove at path (we know this is file)
if err := unlink(kpath); err != nil {
return errSwapNotFound(err)
@ -645,36 +629,43 @@ func (st *BlockStorage) Remove(key string) error {
return nil
}
// Close implements Storage.Close()
// Close implements Storage.Close().
func (st *BlockStorage) Close() error {
return st.lock.Close()
}
// WalkKeys implements Storage.WalkKeys()
func (st *BlockStorage) WalkKeys(opts WalkKeysOptions) error {
// Track open
st.lock.Add()
defer st.lock.Done()
// WalkKeys implements Storage.WalkKeys().
func (st *BlockStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error {
// Check if open
if st.lock.Closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
// Walk dir for entries
return util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) {
// Only deal with regular files
if fsentry.Type().IsRegular() {
opts.WalkFn(entry(fsentry.Name()))
return walkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) error {
if !fsentry.Type().IsRegular() {
// Only deal with regular files
return nil
}
// Perform provided walk function
return opts.WalkFn(ctx, Entry{
Key: fsentry.Name(),
Size: -1,
})
})
}
// nodePathForKey calculates the node file path for supplied key
// nodePathForKey calculates the node file path for supplied key.
func (st *BlockStorage) nodePathForKey(key string) (string, error) {
// Path separators are illegal, as directory paths
if strings.Contains(key, "/") || key == "." || key == ".." {
@ -693,41 +684,40 @@ func (st *BlockStorage) nodePathForKey(key string) (string, error) {
return pb.Join(st.nodePath, key), nil
}
// blockPathForKey calculates the block file path for supplied hash
// blockPathForKey calculates the block file path for supplied hash.
func (st *BlockStorage) blockPathForKey(hash string) string {
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
return pb.Join(st.blockPath, hash)
}
// hashSeparator is the separating byte between block hashes
// hashSeparator is the separating byte between block hashes.
const hashSeparator = byte('\n')
// node represents the contents of a node file in storage
// node represents the contents of a node file in storage.
type node struct {
hashes []string
}
// removeHash attempts to remove supplied block hash from the node's hash array
// removeHash attempts to remove supplied block hash from the node's hash array.
func (n *node) removeHash(hash string) bool {
haveDropped := false
for i := 0; i < len(n.hashes); {
if n.hashes[i] == hash {
// Drop this hash from slice
n.hashes = append(n.hashes[:i], n.hashes[i+1:]...)
haveDropped = true
} else {
// Continue iter
i++
return true
}
// Continue iter
i++
}
return haveDropped
return false
}
// nodeReader is an io.Reader implementation for the node file representation,
// which is useful when calculated node file is being written to the store
// which is useful when calculated node file is being written to the store.
type nodeReader struct {
node *node
node node
idx int
last int
}
@ -774,7 +764,7 @@ func (r *nodeReader) Read(b []byte) (int, error) {
}
// nodeWriter is an io.Writer implementation for the node file representation,
// which is useful when calculated node file is being read from the store
// which is useful when calculated node file is being read from the store.
type nodeWriter struct {
node *node
buf *byteutil.Buffer
@ -789,7 +779,7 @@ func (w *nodeWriter) Write(b []byte) (int, error) {
if idx == -1 {
// Check we shouldn't be expecting it
if w.buf.Len() > encodedHashLen {
return n, errInvalidNode
return n, new_error("invalid node")
}
// Write all contents to buffer
@ -802,7 +792,7 @@ func (w *nodeWriter) Write(b []byte) (int, error) {
w.buf.Write(b[n : n+idx])
n += idx + 1
if w.buf.Len() != encodedHashLen {
return n, errInvalidNode
return n, new_error("invalid node")
}
// Append to hashes & reset
@ -813,7 +803,7 @@ func (w *nodeWriter) Write(b []byte) (int, error) {
// blockReader is an io.Reader implementation for the combined, linked block
// data contained with a node file. Basically, this allows reading value data
// from the store for a given node file
// from the store for a given node file.
type blockReader struct {
storage *BlockStorage
node *node
@ -874,13 +864,13 @@ var (
)
)
// hashEncoder is a HashEncoder with built-in encode buffer
// hashEncoder is a HashEncoder with built-in encode buffer.
type hashEncoder struct {
henc hashenc.HashEncoder
ebuf []byte
}
// newHashEncoder returns a new hashEncoder instance
// newHashEncoder returns a new hashEncoder instance.
func newHashEncoder() *hashEncoder {
return &hashEncoder{
henc: hashenc.New(sha256.New(), base64Encoding),
@ -888,7 +878,7 @@ func newHashEncoder() *hashEncoder {
}
}
// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum()
// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum().
func (henc *hashEncoder) EncodeSum(src []byte) string {
henc.henc.EncodeSum(henc.ebuf, src)
return string(henc.ebuf)

View file

@ -0,0 +1,212 @@
package storage
import (
"bytes"
"io"
"sync"
"codeberg.org/gruf/go-store/v2/util"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zlib"
)
// Compressor defines a means of compressing/decompressing values going into a key-value store
type Compressor interface {
// Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader
Reader(io.Reader) (io.ReadCloser, error)
// Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer
Writer(io.Writer) (io.WriteCloser, error)
}
type gzipCompressor struct {
rpool sync.Pool
wpool sync.Pool
}
// GZipCompressor returns a new Compressor that implements GZip at default compression level
func GZipCompressor() Compressor {
return GZipCompressorLevel(gzip.DefaultCompression)
}
// GZipCompressorLevel returns a new Compressor that implements GZip at supplied compression level
func GZipCompressorLevel(level int) Compressor {
// GZip readers immediately check for valid
// header data on allocation / reset, so we
// need a set of valid header data so we can
// iniitialize reader instances in mempool.
hdr := bytes.NewBuffer(nil)
// Init writer to ensure valid level provided
gw, err := gzip.NewWriterLevel(hdr, level)
if err != nil {
panic(err)
}
// Write empty data to ensure gzip
// header data is in byte buffer.
gw.Write([]byte{})
gw.Close()
return &gzipCompressor{
rpool: sync.Pool{
New: func() any {
hdr := bytes.NewReader(hdr.Bytes())
gr, _ := gzip.NewReader(hdr)
return gr
},
},
wpool: sync.Pool{
New: func() any {
gw, _ := gzip.NewWriterLevel(nil, level)
return gw
},
},
}
}
func (c *gzipCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
gr := c.rpool.Get().(*gzip.Reader)
if err := gr.Reset(r); err != nil {
c.rpool.Put(gr)
return nil, err
}
return util.ReadCloserWithCallback(gr, func() {
c.rpool.Put(gr)
}), nil
}
func (c *gzipCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
gw := c.wpool.Get().(*gzip.Writer)
gw.Reset(w)
return util.WriteCloserWithCallback(gw, func() {
c.wpool.Put(gw)
}), nil
}
type zlibCompressor struct {
rpool sync.Pool
wpool sync.Pool
dict []byte
}
// ZLibCompressor returns a new Compressor that implements ZLib at default compression level
func ZLibCompressor() Compressor {
return ZLibCompressorLevelDict(zlib.DefaultCompression, nil)
}
// ZLibCompressorLevel returns a new Compressor that implements ZLib at supplied compression level
func ZLibCompressorLevel(level int) Compressor {
return ZLibCompressorLevelDict(level, nil)
}
// ZLibCompressorLevelDict returns a new Compressor that implements ZLib at supplied compression level with supplied dict
func ZLibCompressorLevelDict(level int, dict []byte) Compressor {
// ZLib readers immediately check for valid
// header data on allocation / reset, so we
// need a set of valid header data so we can
// iniitialize reader instances in mempool.
hdr := bytes.NewBuffer(nil)
// Init writer to ensure valid level + dict provided
zw, err := zlib.NewWriterLevelDict(hdr, level, dict)
if err != nil {
panic(err)
}
// Write empty data to ensure zlib
// header data is in byte buffer.
zw.Write([]byte{})
zw.Close()
return &zlibCompressor{
rpool: sync.Pool{
New: func() any {
hdr := bytes.NewReader(hdr.Bytes())
zr, _ := zlib.NewReaderDict(hdr, dict)
return zr
},
},
wpool: sync.Pool{
New: func() any {
zw, _ := zlib.NewWriterLevelDict(nil, level, dict)
return zw
},
},
dict: dict,
}
}
func (c *zlibCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
zr := c.rpool.Get().(interface {
io.ReadCloser
zlib.Resetter
})
if err := zr.Reset(r, c.dict); err != nil {
c.rpool.Put(zr)
return nil, err
}
return util.ReadCloserWithCallback(zr, func() {
c.rpool.Put(zr)
}), nil
}
func (c *zlibCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
zw := c.wpool.Get().(*zlib.Writer)
zw.Reset(w)
return util.WriteCloserWithCallback(zw, func() {
c.wpool.Put(zw)
}), nil
}
type snappyCompressor struct {
rpool sync.Pool
wpool sync.Pool
}
// SnappyCompressor returns a new Compressor that implements Snappy.
func SnappyCompressor() Compressor {
return &snappyCompressor{
rpool: sync.Pool{
New: func() any { return snappy.NewReader(nil) },
},
wpool: sync.Pool{
New: func() any { return snappy.NewWriter(nil) },
},
}
}
func (c *snappyCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
sr := c.rpool.Get().(*snappy.Reader)
sr.Reset(r)
return util.ReadCloserWithCallback(
util.NopReadCloser(sr),
func() { c.rpool.Put(sr) },
), nil
}
func (c *snappyCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
sw := c.wpool.Get().(*snappy.Writer)
sw.Reset(w)
return util.WriteCloserWithCallback(
util.NopWriteCloser(sw),
func() { c.wpool.Put(sw) },
), nil
}
type nopCompressor struct{}
// NoCompression is a Compressor that simply does nothing.
func NoCompression() Compressor {
return &nopCompressor{}
}
func (c *nopCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
return util.NopReadCloser(r), nil
}
func (c *nopCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
return util.NopWriteCloser(w), nil
}

View file

@ -1,6 +1,8 @@
package storage
import (
"context"
"errors"
"io"
"io/fs"
"os"
@ -11,10 +13,10 @@ import (
"codeberg.org/gruf/go-bytes"
"codeberg.org/gruf/go-fastcopy"
"codeberg.org/gruf/go-store/util"
"codeberg.org/gruf/go-store/v2/util"
)
// DefaultDiskConfig is the default DiskStorage configuration
// DefaultDiskConfig is the default DiskStorage configuration.
var DefaultDiskConfig = &DiskConfig{
Overwrite: true,
WriteBufSize: 4096,
@ -22,27 +24,28 @@ var DefaultDiskConfig = &DiskConfig{
Compression: NoCompression(),
}
// DiskConfig defines options to be used when opening a DiskStorage
// DiskConfig defines options to be used when opening a DiskStorage.
type DiskConfig struct {
// Transform is the supplied key<-->path KeyTransform
// Transform is the supplied key <--> path KeyTransform.
Transform KeyTransform
// WriteBufSize is the buffer size to use when writing file streams (PutStream)
// WriteBufSize is the buffer size to use when writing file streams.
WriteBufSize int
// Overwrite allows overwriting values of stored keys in the storage
// Overwrite allows overwriting values of stored keys in the storage.
Overwrite bool
// LockFile allows specifying the filesystem path to use for the lockfile,
// providing only a filename it will store the lockfile within provided store
// path and nest the store under `path/store` to prevent access to lockfile
// path and nest the store under `path/store` to prevent access to lockfile.
LockFile string
// Compression is the Compressor to use when reading / writing files, default is no compression
// Compression is the Compressor to use when reading / writing files,
// default is no compression.
Compression Compressor
}
// getDiskConfig returns a valid DiskConfig for supplied ptr
// getDiskConfig returns a valid DiskConfig for supplied ptr.
func getDiskConfig(cfg *DiskConfig) DiskConfig {
// If nil, use default
if cfg == nil {
@ -60,12 +63,12 @@ func getDiskConfig(cfg *DiskConfig) DiskConfig {
}
// Assume 0 buf size == use default
if cfg.WriteBufSize < 1 {
if cfg.WriteBufSize <= 0 {
cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize
}
// Assume empty lockfile path == use default
if len(cfg.LockFile) < 1 {
if len(cfg.LockFile) == 0 {
cfg.LockFile = LockFile
}
@ -79,7 +82,7 @@ func getDiskConfig(cfg *DiskConfig) DiskConfig {
}
}
// DiskStorage is a Storage implementation that stores directly to a filesystem
// DiskStorage is a Storage implementation that stores directly to a filesystem.
type DiskStorage struct {
path string // path is the root path of this store
cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool
@ -87,8 +90,8 @@ type DiskStorage struct {
lock *Lock // lock is the opened lockfile for this storage instance
}
// OpenFile opens a DiskStorage instance for given folder path and configuration
func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
// OpenDisk opens a DiskStorage instance for given folder path and configuration.
func OpenDisk(path string, cfg *DiskConfig) (*DiskStorage, error) {
// Get checked config
config := getDiskConfig(cfg)
@ -104,7 +107,7 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
lockfile := pb.Clean(config.LockFile)
// Check if lockfile is an *actual* path or just filename
if lockDir, _ := _path.Split(lockfile); len(lockDir) < 1 {
if lockDir, _ := _path.Split(lockfile); lockDir == "" {
// Lockfile is a filename, store must be nested under
// $storePath/store to prevent access to the lockfile
storePath += "store/"
@ -138,7 +141,7 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
if err != nil {
return nil, err
} else if !stat.IsDir() {
return nil, errPathIsFile
return nil, errors.New("store/storage: path is file")
}
// Open and acquire storage lock for path
@ -160,20 +163,26 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
return st, nil
}
// Clean implements Storage.Clean()
func (st *DiskStorage) Clean() error {
st.lock.Add()
defer st.lock.Done()
// Clean implements Storage.Clean().
func (st *DiskStorage) Clean(ctx context.Context) error {
// Check if open
if st.lock.Closed() {
return ErrClosed
}
return util.CleanDirs(st.path)
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Clean-out unused directories
return cleanDirs(st.path)
}
// ReadBytes implements Storage.ReadBytes()
func (st *DiskStorage) ReadBytes(key string) ([]byte, error) {
// ReadBytes implements Storage.ReadBytes().
func (st *DiskStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
// Get stream reader for key
rc, err := st.ReadStream(key)
rc, err := st.ReadStream(ctx, key)
if err != nil {
return nil, err
}
@ -183,26 +192,27 @@ func (st *DiskStorage) ReadBytes(key string) ([]byte, error) {
return io.ReadAll(rc)
}
// ReadStream implements Storage.ReadStream()
func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) {
// ReadStream implements Storage.ReadStream().
func (st *DiskStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
// Get file path for key
kpath, err := st.filepath(key)
if err != nil {
return nil, err
}
// Track open
st.lock.Add()
// Check if open
if st.lock.Closed() {
return nil, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return nil, err
}
// Attempt to open file (replace ENOENT with our own)
file, err := open(kpath, defaultFileROFlags)
if err != nil {
st.lock.Done()
return nil, errSwapNotFound(err)
}
@ -210,39 +220,38 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) {
cFile, err := st.config.Compression.Reader(file)
if err != nil {
file.Close() // close this here, ignore error
st.lock.Done()
return nil, err
}
// Wrap compressor to ensure file close
return util.ReadCloserWithCallback(cFile, func() {
file.Close()
st.lock.Done()
}), nil
}
// WriteBytes implements Storage.WriteBytes()
func (st *DiskStorage) WriteBytes(key string, value []byte) error {
return st.WriteStream(key, bytes.NewReader(value))
// WriteBytes implements Storage.WriteBytes().
func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) error {
return st.WriteStream(ctx, key, bytes.NewReader(value))
}
// WriteStream implements Storage.WriteStream()
func (st *DiskStorage) WriteStream(key string, r io.Reader) error {
// WriteStream implements Storage.WriteStream().
func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader) error {
// Get file path for key
kpath, err := st.filepath(key)
if err != nil {
return err
}
// Track open
st.lock.Add()
defer st.lock.Done()
// Check if open
if st.lock.Closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Ensure dirs leading up to file exist
err = os.MkdirAll(path.Dir(kpath), defaultDirPerms)
if err != nil {
@ -280,44 +289,46 @@ func (st *DiskStorage) WriteStream(key string, r io.Reader) error {
return err
}
// Stat implements Storage.Stat()
func (st *DiskStorage) Stat(key string) (bool, error) {
// Stat implements Storage.Stat().
func (st *DiskStorage) Stat(ctx context.Context, key string) (bool, error) {
// Get file path for key
kpath, err := st.filepath(key)
if err != nil {
return false, err
}
// Track open
st.lock.Add()
defer st.lock.Done()
// Check if open
if st.lock.Closed() {
return false, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return false, err
}
// Check for file on disk
return stat(kpath)
}
// Remove implements Storage.Remove()
func (st *DiskStorage) Remove(key string) error {
// Remove implements Storage.Remove().
func (st *DiskStorage) Remove(ctx context.Context, key string) error {
// Get file path for key
kpath, err := st.filepath(key)
if err != nil {
return err
}
// Track open
st.lock.Add()
defer st.lock.Done()
// Check if open
if st.lock.Closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Remove at path (we know this is file)
if err := unlink(kpath); err != nil {
return errSwapNotFound(err)
@ -326,41 +337,55 @@ func (st *DiskStorage) Remove(key string) error {
return nil
}
// Close implements Storage.Close()
// Close implements Storage.Close().
func (st *DiskStorage) Close() error {
return st.lock.Close()
}
// WalkKeys implements Storage.WalkKeys()
func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error {
// Track open
st.lock.Add()
defer st.lock.Done()
// WalkKeys implements Storage.WalkKeys().
func (st *DiskStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error {
// Check if open
if st.lock.Closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
// Walk dir for entries
return util.WalkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) {
if fsentry.Type().IsRegular() {
return walkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) error {
if !fsentry.Type().IsRegular() {
// Only deal with regular files
// Get full item path (without root)
kpath = pb.Join(kpath, fsentry.Name())[len(st.path):]
// Perform provided walk function
opts.WalkFn(entry(st.config.Transform.PathToKey(kpath)))
return nil
}
// Get full item path (without root)
kpath = pb.Join(kpath, fsentry.Name())
kpath = kpath[len(st.path):]
// Load file info. This should already
// be loaded due to the underlying call
// to os.File{}.ReadDir() populating them
info, err := fsentry.Info()
if err != nil {
return err
}
// Perform provided walk function
return opts.WalkFn(ctx, Entry{
Key: st.config.Transform.PathToKey(kpath),
Size: info.Size(),
})
})
}
// filepath checks and returns a formatted filepath for given key
// filepath checks and returns a formatted filepath for given key.
func (st *DiskStorage) filepath(key string) (string, error) {
// Calculate transformed key path
key = st.config.Transform.KeyToPath(key)
@ -382,7 +407,7 @@ func (st *DiskStorage) filepath(key string) (string, error) {
}
// isDirTraversal will check if rootPlusPath is a dir traversal outside of root,
// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath)
// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath).
func isDirTraversal(root, rootPlusPath string) bool {
switch {
// Root is $PWD, check for traversal out of

View file

@ -2,38 +2,34 @@ package storage
import (
"errors"
"strings"
"syscall"
"github.com/minio/minio-go/v7"
)
var (
// ErrClosed is returned on operations on a closed storage
ErrClosed = errors.New("store/storage: closed")
ErrClosed = new_error("closed")
// ErrNotFound is the error returned when a key cannot be found in storage
ErrNotFound = errors.New("store/storage: key not found")
ErrNotFound = new_error("key not found")
// ErrAlreadyExist is the error returned when a key already exists in storage
ErrAlreadyExists = errors.New("store/storage: key already exists")
ErrAlreadyExists = new_error("key already exists")
// ErrInvalidkey is the error returned when an invalid key is passed to storage
ErrInvalidKey = errors.New("store/storage: invalid key")
ErrInvalidKey = new_error("invalid key")
// ErrAlreadyLocked is returned on fail opening a storage lockfile
ErrAlreadyLocked = errors.New("store/storage: storage lock already open")
// errPathIsFile is returned when a path for a disk config is actually a file
errPathIsFile = errors.New("store/storage: path is file")
// errNoHashesWritten is returned when no blocks are written for given input value
errNoHashesWritten = errors.New("storage/storage: no hashes written")
// errInvalidNode is returned when read on an invalid node in the store is attempted
errInvalidNode = errors.New("store/storage: invalid node")
// errCorruptNode is returned when a block fails to be opened / read during read of a node.
errCorruptNode = errors.New("store/storage: corrupted node")
ErrAlreadyLocked = new_error("storage lock already open")
)
// new_error returns a new error instance prefixed by package prefix.
func new_error(msg string) error {
return errors.New("store/storage: " + msg)
}
// wrappedError allows wrapping together an inner with outer error.
type wrappedError struct {
inner error
@ -88,3 +84,27 @@ func errSwapUnavailable(err error) error {
}
return err
}
// transformS3Error transforms an error returned from S3Storage underlying
// minio.Core client, by wrapping where necessary with our own error types.
func transformS3Error(err error) error {
// Cast this to a minio error response
ersp, ok := err.(minio.ErrorResponse)
if ok {
switch ersp.Code {
case "NoSuchKey":
return wrap(ErrNotFound, err)
case "Conflict":
return wrap(ErrAlreadyExists, err)
default:
return err
}
}
// Check if error has an invalid object name prefix
if strings.HasPrefix(err.Error(), "Object name ") {
return wrap(ErrInvalidKey, err)
}
return err
}

221
vendor/codeberg.org/gruf/go-store/v2/storage/fs.go generated vendored Normal file
View file

@ -0,0 +1,221 @@
package storage
import (
"io/fs"
"os"
"syscall"
"codeberg.org/gruf/go-fastpath"
"codeberg.org/gruf/go-store/v2/util"
)
const (
// default file permission bits
defaultDirPerms = 0o755
defaultFilePerms = 0o644
// default file open flags
defaultFileROFlags = syscall.O_RDONLY
defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR
defaultFileLockFlags = syscall.O_RDONLY | syscall.O_CREAT
)
// NOTE:
// These functions are for opening storage files,
// not necessarily for e.g. initial setup (OpenFile)
// walkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry
func walkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry) error) error {
// Read directory entries
entries, err := readDir(path)
if err != nil {
return err
}
// frame represents a directory entry
// walk-loop snapshot, taken when a sub
// directory requiring iteration is found
type frame struct {
path string
entries []fs.DirEntry
}
// stack contains a list of held snapshot
// frames, representing unfinished upper
// layers of a directory structure yet to
// be traversed.
var stack []frame
outer:
for {
if len(entries) == 0 {
if len(stack) == 0 {
// Reached end
break outer
}
// Pop frame from stack
frame := stack[len(stack)-1]
stack = stack[:len(stack)-1]
// Update loop vars
entries = frame.entries
path = frame.path
}
for len(entries) > 0 {
// Pop next entry from queue
entry := entries[0]
entries = entries[1:]
// Pass to provided walk function
if err := walkFn(path, entry); err != nil {
return err
}
if entry.IsDir() {
// Push current frame to stack
stack = append(stack, frame{
path: path,
entries: entries,
})
// Update current directory path
path = pb.Join(path, entry.Name())
// Read next directory entries
next, err := readDir(path)
if err != nil {
return err
}
// Set next entries
entries = next
continue outer
}
}
}
return nil
}
// cleanDirs traverses the dir tree of the supplied path, removing any folders with zero children
func cleanDirs(path string) error {
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
// Get top-level dir entries
entries, err := readDir(path)
if err != nil {
return err
}
for _, entry := range entries {
if entry.IsDir() {
// Recursively clean sub-directory entries
if err := cleanDir(pb, pb.Join(path, entry.Name())); err != nil {
return err
}
}
}
return nil
}
// cleanDir performs the actual dir cleaning logic for the above top-level version.
func cleanDir(pb *fastpath.Builder, path string) error {
// Get dir entries
entries, err := readDir(path)
if err != nil {
return err
}
// If no entries, delete
if len(entries) < 1 {
return rmdir(path)
}
for _, entry := range entries {
if entry.IsDir() {
// Recursively clean sub-directory entries
if err := cleanDir(pb, pb.Join(path, entry.Name())); err != nil {
return err
}
}
}
return nil
}
// readDir will open file at path, read the unsorted list of entries, then close.
func readDir(path string) ([]fs.DirEntry, error) {
// Open file at path
file, err := open(path, defaultFileROFlags)
if err != nil {
return nil, err
}
// Read directory entries
entries, err := file.ReadDir(-1)
// Done with file
_ = file.Close()
return entries, err
}
// open will open a file at the given path with flags and default file perms.
func open(path string, flags int) (*os.File, error) {
var fd int
err := retryOnEINTR(func() (err error) {
fd, err = syscall.Open(path, flags, defaultFilePerms)
return
})
if err != nil {
return nil, err
}
return os.NewFile(uintptr(fd), path), nil
}
// stat checks for a file on disk.
func stat(path string) (bool, error) {
var stat syscall.Stat_t
err := retryOnEINTR(func() error {
return syscall.Stat(path, &stat)
})
if err != nil {
if err == syscall.ENOENT {
// not-found is no error
err = nil
}
return false, err
}
return true, nil
}
// unlink removes a file (not dir!) on disk.
func unlink(path string) error {
return retryOnEINTR(func() error {
return syscall.Unlink(path)
})
}
// rmdir removes a dir (not file!) on disk.
func rmdir(path string) error {
return retryOnEINTR(func() error {
return syscall.Rmdir(path)
})
}
// retryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received.
func retryOnEINTR(do func() error) error {
for {
err := do()
if err == syscall.EINTR {
continue
}
return err
}
}

View file

@ -1,11 +1,8 @@
package storage
import (
"sync"
"sync/atomic"
"syscall"
"codeberg.org/gruf/go-store/util"
)
// LockFile is our standard lockfile name.
@ -14,7 +11,6 @@ const LockFile = "store.lock"
// Lock represents a filesystem lock to ensure only one storage instance open per path.
type Lock struct {
fd int
wg sync.WaitGroup
st uint32
}
@ -23,7 +19,7 @@ func OpenLock(path string) (*Lock, error) {
var fd int
// Open the file descriptor at path
err := util.RetryOnEINTR(func() (err error) {
err := retryOnEINTR(func() (err error) {
fd, err = syscall.Open(path, defaultFileLockFlags, defaultFilePerms)
return
})
@ -32,7 +28,7 @@ func OpenLock(path string) (*Lock, error) {
}
// Get a flock on the file descriptor
err = util.RetryOnEINTR(func() error {
err = retryOnEINTR(func() error {
return syscall.Flock(fd, syscall.LOCK_EX|syscall.LOCK_NB)
})
if err != nil {
@ -42,28 +38,15 @@ func OpenLock(path string) (*Lock, error) {
return &Lock{fd: fd}, nil
}
// Add will add '1' to the underlying sync.WaitGroup.
func (f *Lock) Add() {
f.wg.Add(1)
}
// Done will decrememnt '1' from the underlying sync.WaitGroup.
func (f *Lock) Done() {
f.wg.Done()
}
// Close will attempt to close the lockfile and file descriptor.
func (f *Lock) Close() error {
var err error
if atomic.CompareAndSwapUint32(&f.st, 0, 1) {
// Wait until done
f.wg.Wait()
// Ensure gets closed
defer syscall.Close(f.fd)
// Call funlock on the file descriptor
err = util.RetryOnEINTR(func() error {
err = retryOnEINTR(func() error {
return syscall.Flock(f.fd, syscall.LOCK_UN|syscall.LOCK_NB)
})
}

228
vendor/codeberg.org/gruf/go-store/v2/storage/memory.go generated vendored Normal file
View file

@ -0,0 +1,228 @@
package storage
import (
"context"
"io"
"sync/atomic"
"codeberg.org/gruf/go-bytes"
"codeberg.org/gruf/go-store/v2/util"
"github.com/cornelk/hashmap"
)
// MemoryStorage is a storage implementation that simply stores key-value
// pairs in a Go map in-memory. The map is protected by a mutex.
type MemoryStorage struct {
ow bool // overwrites
fs *hashmap.Map[string, []byte]
st uint32
}
// OpenMemory opens a new MemoryStorage instance with internal map starting size.
func OpenMemory(size int, overwrites bool) *MemoryStorage {
if size <= 0 {
size = 8
}
return &MemoryStorage{
fs: hashmap.NewSized[string, []byte](uintptr(size)),
ow: overwrites,
}
}
// Clean implements Storage.Clean().
func (st *MemoryStorage) Clean(ctx context.Context) error {
// Check store open
if st.closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
return nil
}
// ReadBytes implements Storage.ReadBytes().
func (st *MemoryStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
// Check store open
if st.closed() {
return nil, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return nil, err
}
// Check for key in store
b, ok := st.fs.Get(key)
if !ok {
return nil, ErrNotFound
}
// Create return copy
return copyb(b), nil
}
// ReadStream implements Storage.ReadStream().
func (st *MemoryStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
// Check store open
if st.closed() {
return nil, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return nil, err
}
// Check for key in store
b, ok := st.fs.Get(key)
if !ok {
return nil, ErrNotFound
}
// Create io.ReadCloser from 'b' copy
r := bytes.NewReader(copyb(b))
return util.NopReadCloser(r), nil
}
// WriteBytes implements Storage.WriteBytes().
func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) error {
// Check store open
if st.closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Check for key that already exists
if _, ok := st.fs.Get(key); ok && !st.ow {
return ErrAlreadyExists
}
// Write key copy to store
st.fs.Set(key, copyb(b))
return nil
}
// WriteStream implements Storage.WriteStream().
func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) error {
// Check store open
if st.closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Check for key that already exists
if _, ok := st.fs.Get(key); ok && !st.ow {
return ErrAlreadyExists
}
// Read all from reader
b, err := io.ReadAll(r)
if err != nil {
return err
}
// Write key to store
st.fs.Set(key, b)
return nil
}
// Stat implements Storage.Stat().
func (st *MemoryStorage) Stat(ctx context.Context, key string) (bool, error) {
// Check store open
if st.closed() {
return false, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return false, err
}
// Check for key in store
_, ok := st.fs.Get(key)
return ok, nil
}
// Remove implements Storage.Remove().
func (st *MemoryStorage) Remove(ctx context.Context, key string) error {
// Check store open
if st.closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
// Attempt to delete key
ok := st.fs.Del(key)
if !ok {
return ErrNotFound
}
return nil
}
// WalkKeys implements Storage.WalkKeys().
func (st *MemoryStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error {
// Check store open
if st.closed() {
return ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
return err
}
var err error
// Nil check func
_ = opts.WalkFn
// Pass each key in map to walk function
st.fs.Range(func(key string, val []byte) bool {
err = opts.WalkFn(ctx, Entry{
Key: key,
Size: int64(len(val)),
})
return (err == nil)
})
return err
}
// Close implements Storage.Close().
func (st *MemoryStorage) Close() error {
atomic.StoreUint32(&st.st, 1)
return nil
}
// closed returns whether MemoryStorage is closed.
func (st *MemoryStorage) closed() bool {
return (atomic.LoadUint32(&st.st) == 1)
}
// copyb returns a copy of byte-slice b.
func copyb(b []byte) []byte {
if b == nil {
return nil
}
p := make([]byte, len(b))
_ = copy(p, b)
return p
}

385
vendor/codeberg.org/gruf/go-store/v2/storage/s3.go generated vendored Normal file
View file

@ -0,0 +1,385 @@
package storage
import (
"bytes"
"context"
"io"
"sync/atomic"
"codeberg.org/gruf/go-store/v2/util"
"github.com/minio/minio-go/v7"
)
// DefaultS3Config is the default S3Storage configuration.
var DefaultS3Config = &S3Config{
CoreOpts: minio.Options{},
GetOpts: minio.GetObjectOptions{},
PutOpts: minio.PutObjectOptions{},
PutChunkSize: 4 * 1024 * 1024, // 4MiB
StatOpts: minio.StatObjectOptions{},
RemoveOpts: minio.RemoveObjectOptions{},
ListSize: 200,
}
// S3Config defines options to be used when opening an S3Storage,
// mostly options for underlying S3 client library.
type S3Config struct {
// CoreOpts are S3 client options passed during initialization.
CoreOpts minio.Options
// GetOpts are S3 client options passed during .Read___() calls.
GetOpts minio.GetObjectOptions
// PutOpts are S3 client options passed during .Write___() calls.
PutOpts minio.PutObjectOptions
// PutChunkSize is the chunk size (in bytes) to use when sending
// a byte stream reader of unknown size as a multi-part object.
PutChunkSize int64
// StatOpts are S3 client options passed during .Stat() calls.
StatOpts minio.StatObjectOptions
// RemoveOpts are S3 client options passed during .Remove() calls.
RemoveOpts minio.RemoveObjectOptions
// ListSize determines how many items to include in each
// list request, made during calls to .WalkKeys().
ListSize int
}
// getS3Config returns a valid S3Config for supplied ptr.
func getS3Config(cfg *S3Config) S3Config {
// If nil, use default
if cfg == nil {
cfg = DefaultS3Config
}
// Assume 0 chunk size == use default
if cfg.PutChunkSize <= 0 {
cfg.PutChunkSize = 4 * 1024 * 1024
}
// Assume 0 list size == use default
if cfg.ListSize <= 0 {
cfg.ListSize = 200
}
// Return owned config copy
return S3Config{
CoreOpts: cfg.CoreOpts,
GetOpts: cfg.GetOpts,
PutOpts: cfg.PutOpts,
StatOpts: cfg.StatOpts,
RemoveOpts: cfg.RemoveOpts,
}
}
// S3Storage is a storage implementation that stores key-value
// pairs in an S3 instance at given endpoint with bucket name.
type S3Storage struct {
client *minio.Core
bucket string
config S3Config
state uint32
}
// OpenS3 opens a new S3Storage instance with given S3 endpoint URL, bucket name and configuration.
func OpenS3(endpoint string, bucket string, cfg *S3Config) (*S3Storage, error) {
// Get checked config
config := getS3Config(cfg)
// Create new S3 client connection
client, err := minio.NewCore(endpoint, &config.CoreOpts)
if err != nil {
return nil, err
}
// Check that provided bucket actually exists
exists, err := client.BucketExists(context.Background(), bucket)
if err != nil {
return nil, err
} else if !exists {
return nil, new_error("bucket does not exist")
}
return &S3Storage{
client: client,
bucket: bucket,
config: config,
}, nil
}
// Client returns access to the underlying S3 client.
func (st *S3Storage) Client() *minio.Core {
return st.client
}
// Clean implements Storage.Clean().
func (st *S3Storage) Clean(ctx context.Context) error {
return nil // nothing to do for S3
}
// ReadBytes implements Storage.ReadBytes().
func (st *S3Storage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
// Fetch object reader from S3 bucket
rc, err := st.ReadStream(ctx, key)
if err != nil {
return nil, err
}
defer rc.Close()
// Read all bytes and return
return io.ReadAll(rc)
}
// ReadStream implements Storage.ReadStream().
func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
// Check storage open
if st.closed() {
return nil, ErrClosed
}
// Fetch object reader from S3 bucket
rc, _, _, err := st.client.GetObject(
ctx,
st.bucket,
key,
st.config.GetOpts,
)
if err != nil {
return nil, transformS3Error(err)
}
return rc, nil
}
// WriteBytes implements Storage.WriteBytes().
func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) error {
return st.WriteStream(ctx, key, util.NewByteReaderSize(value))
}
// WriteStream implements Storage.WriteStream().
func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) error {
// Check storage open
if st.closed() {
return ErrClosed
}
if rs, ok := r.(util.ReaderSize); ok {
// This reader supports providing us the size of
// the encompassed data, allowing us to perform
// a singular .PutObject() call with length.
_, err := st.client.PutObject(
ctx,
st.bucket,
key,
r,
rs.Size(),
"",
"",
st.config.PutOpts,
)
if err != nil {
return transformS3Error(err)
}
return nil
}
// Start a new multipart upload to get ID
uploadID, err := st.client.NewMultipartUpload(
ctx,
st.bucket,
key,
st.config.PutOpts,
)
if err != nil {
return transformS3Error(err)
}
var (
count int
parts []minio.CompletePart
chunk = make([]byte, st.config.PutChunkSize)
rdr = bytes.NewReader(nil)
)
// Note that we do not perform any kind of
// memory pooling of the chunk buffers here.
// Optimal chunking sizes for S3 writes are in
// the orders of megabytes, so letting the GC
// collect these ASAP is much preferred.
loop:
for done := false; !done; {
// Read next chunk into byte buffer
n, err := io.ReadFull(r, chunk)
switch err {
// Successful read
case nil:
// Reached end, buffer empty
case io.EOF:
break loop
// Reached end, but buffer not empty
case io.ErrUnexpectedEOF:
done = true
// All other errors
default:
return err
}
// Reset byte reader
rdr.Reset(chunk[:n])
// Put this object chunk in S3 store
pt, err := st.client.PutObjectPart(
ctx,
st.bucket,
key,
uploadID,
count,
rdr,
st.config.PutChunkSize,
"",
"",
nil,
)
if err != nil {
return err
}
// Append completed part to slice
parts = append(parts, minio.CompletePart{
PartNumber: pt.PartNumber,
ETag: pt.ETag,
ChecksumCRC32: pt.ChecksumCRC32,
ChecksumCRC32C: pt.ChecksumCRC32C,
ChecksumSHA1: pt.ChecksumSHA1,
ChecksumSHA256: pt.ChecksumSHA256,
})
// Iterate part count
count++
}
// Complete this multi-part upload operation
_, err = st.client.CompleteMultipartUpload(
ctx,
st.bucket,
key,
uploadID,
parts,
st.config.PutOpts,
)
if err != nil {
return err
}
return nil
}
// Stat implements Storage.Stat().
func (st *S3Storage) Stat(ctx context.Context, key string) (bool, error) {
// Check storage open
if st.closed() {
return false, ErrClosed
}
// Query object in S3 bucket
_, err := st.client.StatObject(
ctx,
st.bucket,
key,
st.config.StatOpts,
)
if err != nil {
return false, transformS3Error(err)
}
return true, nil
}
// Remove implements Storage.Remove().
func (st *S3Storage) Remove(ctx context.Context, key string) error {
// Check storage open
if st.closed() {
return ErrClosed
}
// S3 returns no error on remove for non-existent keys
if ok, err := st.Stat(ctx, key); err != nil {
return err
} else if !ok {
return ErrNotFound
}
// Remove object from S3 bucket
err := st.client.RemoveObject(
ctx,
st.bucket,
key,
st.config.RemoveOpts,
)
if err != nil {
return transformS3Error(err)
}
return nil
}
// WalkKeys implements Storage.WalkKeys().
func (st *S3Storage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error {
var (
prev string
token string
)
for {
// List the objects in bucket starting at marker
result, err := st.client.ListObjectsV2(
st.bucket,
"",
prev,
token,
"",
st.config.ListSize,
)
if err != nil {
return err
}
// Pass each object through walk func
for _, obj := range result.Contents {
if err := opts.WalkFn(ctx, Entry{
Key: obj.Key,
Size: obj.Size,
}); err != nil {
return err
}
}
// No token means we reached end of bucket
if result.NextContinuationToken == "" {
return nil
}
// Set continue token and prev mark
token = result.NextContinuationToken
prev = result.StartAfter
}
}
// Close implements Storage.Close().
func (st *S3Storage) Close() error {
atomic.StoreUint32(&st.state, 1)
return nil
}
// closed returns whether S3Storage is closed.
func (st *S3Storage) closed() bool {
return (atomic.LoadUint32(&st.state) == 1)
}

View file

@ -1,54 +1,53 @@
package storage
import (
"context"
"io"
)
// StorageEntry defines a key in Storage
type StorageEntry interface {
// Key returns the storage entry's key
Key() string
}
// entry is the simplest possible StorageEntry
type entry string
func (e entry) Key() string {
return string(e)
}
// Storage defines a means of storing and accessing key value pairs
type Storage interface {
// ReadBytes returns the byte value for key in storage
ReadBytes(key string) ([]byte, error)
ReadBytes(ctx context.Context, key string) ([]byte, error)
// ReadStream returns an io.ReadCloser for the value bytes at key in the storage
ReadStream(key string) (io.ReadCloser, error)
ReadStream(ctx context.Context, key string) (io.ReadCloser, error)
// WriteBytes writes the supplied value bytes at key in the storage
WriteBytes(key string, value []byte) error
WriteBytes(ctx context.Context, key string, value []byte) error
// WriteStream writes the bytes from supplied reader at key in the storage
WriteStream(key string, r io.Reader) error
WriteStream(ctx context.Context, key string, r io.Reader) error
// Stat checks if the supplied key is in the storage
Stat(key string) (bool, error)
Stat(ctx context.Context, key string) (bool, error)
// Remove attempts to remove the supplied key-value pair from storage
Remove(key string) error
Remove(ctx context.Context, key string) error
// Close will close the storage, releasing any file locks
Close() error
// Clean removes unused values and unclutters the storage (e.g. removing empty folders)
Clean() error
Clean(ctx context.Context) error
// WalkKeys walks the keys in the storage
WalkKeys(opts WalkKeysOptions) error
WalkKeys(ctx context.Context, opts WalkKeysOptions) error
}
// Entry represents a key in a Storage{} implementation,
// with any associated metadata that may have been set.
type Entry struct {
// Key is this entry's unique storage key.
Key string
// Size is the size of this entry in storage.
// Note that size < 0 indicates unknown.
Size int64
}
// WalkKeysOptions defines how to walk the keys in a storage implementation
type WalkKeysOptions struct {
// WalkFn is the function to apply on each StorageEntry
WalkFn func(StorageEntry)
WalkFn func(context.Context, Entry) error
}

96
vendor/codeberg.org/gruf/go-store/v2/util/io.go generated vendored Normal file
View file

@ -0,0 +1,96 @@
package util
import (
"bytes"
"io"
)
// ReaderSize ...
type ReaderSize interface {
io.Reader
// Size ...
Size() int64
}
// ByteReaderSize ...
type ByteReaderSize struct {
bytes.Reader
sz int64
}
// NewByteReaderSize ...
func NewByteReaderSize(b []byte) *ByteReaderSize {
rs := ByteReaderSize{}
rs.Reset(b)
return &rs
}
// Size implements ReaderSize.Size().
func (rs ByteReaderSize) Size() int64 {
return rs.sz
}
// Reset resets the ReaderSize to be reading from b.
func (rs *ByteReaderSize) Reset(b []byte) {
rs.Reader.Reset(b)
rs.sz = int64(len(b))
}
// NopReadCloser turns a supplied io.Reader into io.ReadCloser with a nop Close() implementation.
func NopReadCloser(r io.Reader) io.ReadCloser {
return &nopReadCloser{r}
}
// NopWriteCloser turns a supplied io.Writer into io.WriteCloser with a nop Close() implementation.
func NopWriteCloser(w io.Writer) io.WriteCloser {
return &nopWriteCloser{w}
}
// ReadCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.ReadCloser.
func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser {
return &callbackReadCloser{
ReadCloser: rc,
callback: cb,
}
}
// WriteCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.WriteCloser.
func WriteCloserWithCallback(wc io.WriteCloser, cb func()) io.WriteCloser {
return &callbackWriteCloser{
WriteCloser: wc,
callback: cb,
}
}
// nopReadCloser turns an io.Reader -> io.ReadCloser with a nop Close().
type nopReadCloser struct{ io.Reader }
func (r *nopReadCloser) Close() error { return nil }
// nopWriteCloser turns an io.Writer -> io.WriteCloser with a nop Close().
type nopWriteCloser struct{ io.Writer }
func (w nopWriteCloser) Close() error { return nil }
// callbackReadCloser allows adding our own custom callback to an io.ReadCloser.
type callbackReadCloser struct {
io.ReadCloser
callback func()
}
func (c *callbackReadCloser) Close() error {
defer c.callback()
return c.ReadCloser.Close()
}
// callbackWriteCloser allows adding our own custom callback to an io.WriteCloser.
type callbackWriteCloser struct {
io.WriteCloser
callback func()
}
func (c *callbackWriteCloser) Close() error {
defer c.callback()
return c.WriteCloser.Close()
}

View file

@ -5,15 +5,15 @@ import (
"codeberg.org/gruf/go-pools"
)
// pathBuilderPool is the global fastpath.Builder pool
// pathBuilderPool is the global fastpath.Builder pool.
var pathBuilderPool = pools.NewPathBuilderPool(512)
// GetPathBuilder fetches a fastpath.Builder object from the pool
// GetPathBuilder fetches a fastpath.Builder object from the pool.
func GetPathBuilder() *fastpath.Builder {
return pathBuilderPool.Get()
}
// PutPathBuilder places supplied fastpath.Builder back in the pool
// PutPathBuilder places supplied fastpath.Builder back in the pool.
func PutPathBuilder(pb *fastpath.Builder) {
pb.Reset()
pathBuilderPool.Put(pb)