mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-11-13 04:57:29 -06:00
upgrade go-store
This commit is contained in:
parent
667e7f112c
commit
f28cf793ee
44 changed files with 653 additions and 4972 deletions
10
vendor/codeberg.org/gruf/go-store/kv/iterator.go
generated
vendored
10
vendor/codeberg.org/gruf/go-store/kv/iterator.go
generated
vendored
|
|
@ -2,6 +2,7 @@ package kv
|
|||
|
||||
import (
|
||||
"codeberg.org/gruf/go-errors"
|
||||
"codeberg.org/gruf/go-mutexes"
|
||||
"codeberg.org/gruf/go-store/storage"
|
||||
)
|
||||
|
||||
|
|
@ -17,10 +18,10 @@ var ErrIteratorClosed = errors.New("store/kv: iterator closed")
|
|||
// have multiple iterators running concurrently
|
||||
type KVIterator struct {
|
||||
store *KVStore // store is the linked KVStore
|
||||
state *mutexes.LockState
|
||||
entries []storage.StorageEntry
|
||||
index int
|
||||
key string
|
||||
onClose func()
|
||||
}
|
||||
|
||||
// Next attempts to set the next key-value pair, the
|
||||
|
|
@ -43,13 +44,10 @@ func (i *KVIterator) Key() string {
|
|||
|
||||
// Release releases the KVIterator and KVStore's read lock
|
||||
func (i *KVIterator) Release() {
|
||||
// Reset key, path, entries
|
||||
i.state.UnlockMap()
|
||||
i.store = nil
|
||||
i.key = ""
|
||||
i.entries = nil
|
||||
|
||||
// Perform requested callback
|
||||
i.onClose()
|
||||
}
|
||||
|
||||
// Value returns the next value from the KVStore
|
||||
|
|
@ -60,5 +58,5 @@ func (i *KVIterator) Value() ([]byte, error) {
|
|||
}
|
||||
|
||||
// Attempt to fetch from store
|
||||
return i.store.get(i.store.mutexMap.RLock, i.key)
|
||||
return i.store.get(i.state.RLock, i.key)
|
||||
}
|
||||
|
|
|
|||
82
vendor/codeberg.org/gruf/go-store/kv/state.go
generated
vendored
82
vendor/codeberg.org/gruf/go-store/kv/state.go
generated
vendored
|
|
@ -2,9 +2,9 @@ package kv
|
|||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"codeberg.org/gruf/go-errors"
|
||||
"codeberg.org/gruf/go-mutexes"
|
||||
)
|
||||
|
||||
var ErrStateClosed = errors.New("store/kv: state closed")
|
||||
|
|
@ -16,61 +16,42 @@ var ErrStateClosed = errors.New("store/kv: state closed")
|
|||
// then the state has zero guarantees
|
||||
type StateRO struct {
|
||||
store *KVStore
|
||||
mutex sync.RWMutex
|
||||
state *mutexes.LockState
|
||||
}
|
||||
|
||||
func (st *StateRO) Get(key string) ([]byte, error) {
|
||||
// Get state read lock
|
||||
st.mutex.RLock()
|
||||
defer st.mutex.RUnlock()
|
||||
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return nil, ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.get(st.store.mutexMap.RLock, key)
|
||||
return st.store.get(st.state.RLock, key)
|
||||
}
|
||||
|
||||
func (st *StateRO) GetStream(key string) (io.ReadCloser, error) {
|
||||
// Get state read lock
|
||||
st.mutex.RLock()
|
||||
defer st.mutex.RUnlock()
|
||||
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return nil, ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.getStream(st.store.mutexMap.RLock, key)
|
||||
return st.store.getStream(st.state.RLock, key)
|
||||
}
|
||||
|
||||
func (st *StateRO) Has(key string) (bool, error) {
|
||||
// Get state read lock
|
||||
st.mutex.RLock()
|
||||
defer st.mutex.RUnlock()
|
||||
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return false, ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.has(st.store.mutexMap.RLock, key)
|
||||
return st.store.has(st.state.RLock, key)
|
||||
}
|
||||
|
||||
func (st *StateRO) Release() {
|
||||
// Get state write lock
|
||||
st.mutex.Lock()
|
||||
defer st.mutex.Unlock()
|
||||
|
||||
// Release the store
|
||||
if st.store != nil {
|
||||
st.store.mutex.RUnlock()
|
||||
st.store = nil
|
||||
}
|
||||
st.state.UnlockMap()
|
||||
st.store = nil
|
||||
}
|
||||
|
||||
// StateRW provides a read-write window to the store. While this
|
||||
|
|
@ -80,101 +61,70 @@ func (st *StateRO) Release() {
|
|||
// then the state has zero guarantees
|
||||
type StateRW struct {
|
||||
store *KVStore
|
||||
mutex sync.RWMutex
|
||||
state *mutexes.LockState
|
||||
}
|
||||
|
||||
func (st *StateRW) Get(key string) ([]byte, error) {
|
||||
// Get state read lock
|
||||
st.mutex.RLock()
|
||||
defer st.mutex.RUnlock()
|
||||
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return nil, ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.get(st.store.mutexMap.RLock, key)
|
||||
return st.store.get(st.state.RLock, key)
|
||||
}
|
||||
|
||||
func (st *StateRW) GetStream(key string) (io.ReadCloser, error) {
|
||||
// Get state read lock
|
||||
st.mutex.RLock()
|
||||
defer st.mutex.RUnlock()
|
||||
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return nil, ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.getStream(st.store.mutexMap.RLock, key)
|
||||
return st.store.getStream(st.state.RLock, key)
|
||||
}
|
||||
|
||||
func (st *StateRW) Put(key string, value []byte) error {
|
||||
// Get state read lock
|
||||
st.mutex.RLock()
|
||||
defer st.mutex.RUnlock()
|
||||
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.put(st.store.mutexMap.Lock, key, value)
|
||||
return st.store.put(st.state.Lock, key, value)
|
||||
}
|
||||
|
||||
func (st *StateRW) PutStream(key string, r io.Reader) error {
|
||||
// Get state read lock
|
||||
st.mutex.RLock()
|
||||
defer st.mutex.RUnlock()
|
||||
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.putStream(st.store.mutexMap.Lock, key, r)
|
||||
return st.store.putStream(st.state.Lock, key, r)
|
||||
}
|
||||
|
||||
func (st *StateRW) Has(key string) (bool, error) {
|
||||
// Get state read lock
|
||||
st.mutex.RLock()
|
||||
defer st.mutex.RUnlock()
|
||||
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return false, ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.has(st.store.mutexMap.RLock, key)
|
||||
return st.store.has(st.state.RLock, key)
|
||||
}
|
||||
|
||||
func (st *StateRW) Delete(key string) error {
|
||||
// Get state read lock
|
||||
st.mutex.RLock()
|
||||
defer st.mutex.RUnlock()
|
||||
|
||||
// Check not closed
|
||||
if st.store == nil {
|
||||
return ErrStateClosed
|
||||
}
|
||||
|
||||
// Pass request to store
|
||||
return st.store.delete(st.store.mutexMap.Lock, key)
|
||||
return st.store.delete(st.state.Lock, key)
|
||||
}
|
||||
|
||||
func (st *StateRW) Release() {
|
||||
// Get state write lock
|
||||
st.mutex.Lock()
|
||||
defer st.mutex.Unlock()
|
||||
|
||||
// Release the store
|
||||
if st.store != nil {
|
||||
st.store.mutex.Unlock()
|
||||
st.store = nil
|
||||
}
|
||||
st.state.UnlockMap()
|
||||
st.store = nil
|
||||
}
|
||||
|
|
|
|||
40
vendor/codeberg.org/gruf/go-store/kv/store.go
generated
vendored
40
vendor/codeberg.org/gruf/go-store/kv/store.go
generated
vendored
|
|
@ -2,7 +2,6 @@ package kv
|
|||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"codeberg.org/gruf/go-mutexes"
|
||||
"codeberg.org/gruf/go-store/storage"
|
||||
|
|
@ -11,9 +10,8 @@ import (
|
|||
|
||||
// KVStore is a very simple, yet performant key-value store
|
||||
type KVStore struct {
|
||||
mutexMap mutexes.MutexMap // mutexMap is a map of keys to mutexes to protect file access
|
||||
mutex sync.RWMutex // mutex is the total store mutex
|
||||
storage storage.Storage // storage is the underlying storage
|
||||
mutex mutexes.MutexMap // mutex is a map of keys to mutexes to protect file access
|
||||
storage storage.Storage // storage is the underlying storage
|
||||
}
|
||||
|
||||
func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) {
|
||||
|
|
@ -47,26 +45,19 @@ func OpenStorage(storage storage.Storage) (*KVStore, error) {
|
|||
|
||||
// Return new KVStore
|
||||
return &KVStore{
|
||||
mutexMap: mutexes.NewMap(mutexes.NewRW),
|
||||
mutex: sync.RWMutex{},
|
||||
storage: storage,
|
||||
mutex: mutexes.NewMap(-1),
|
||||
storage: storage,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// RLock acquires a read-lock on supplied key, returning unlock function.
|
||||
func (st *KVStore) RLock(key string) (runlock func()) {
|
||||
st.mutex.RLock()
|
||||
runlock = st.mutexMap.RLock(key)
|
||||
st.mutex.RUnlock()
|
||||
return runlock
|
||||
return st.mutex.RLock(key)
|
||||
}
|
||||
|
||||
// Lock acquires a write-lock on supplied key, returning unlock function.
|
||||
func (st *KVStore) Lock(key string) (unlock func()) {
|
||||
st.mutex.Lock()
|
||||
unlock = st.mutexMap.Lock(key)
|
||||
st.mutex.Unlock()
|
||||
return unlock
|
||||
return st.mutex.Lock(key)
|
||||
}
|
||||
|
||||
// Get fetches the bytes for supplied key in the store
|
||||
|
|
@ -167,7 +158,7 @@ func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) {
|
|||
}
|
||||
|
||||
// Get store read lock
|
||||
st.mutex.RLock()
|
||||
state := st.mutex.RLockMap()
|
||||
|
||||
// Setup the walk keys function
|
||||
entries := []storage.StorageEntry{}
|
||||
|
|
@ -184,24 +175,24 @@ func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) {
|
|||
// Walk keys in the storage
|
||||
err := st.storage.WalkKeys(storage.WalkKeysOptions{WalkFn: walkFn})
|
||||
if err != nil {
|
||||
st.mutex.RUnlock()
|
||||
state.UnlockMap()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Return new iterator
|
||||
return &KVIterator{
|
||||
store: st,
|
||||
state: state,
|
||||
entries: entries,
|
||||
index: -1,
|
||||
key: "",
|
||||
onClose: st.mutex.RUnlock,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Read provides a read-only window to the store, holding it in a read-locked state until release
|
||||
func (st *KVStore) Read() *StateRO {
|
||||
st.mutex.RLock()
|
||||
return &StateRO{store: st}
|
||||
state := st.mutex.RLockMap()
|
||||
return &StateRO{store: st, state: state}
|
||||
}
|
||||
|
||||
// ReadFn provides a read-only window to the store, holding it in a read-locked state until fn return.
|
||||
|
|
@ -216,8 +207,8 @@ func (st *KVStore) ReadFn(fn func(*StateRO)) {
|
|||
|
||||
// Update provides a read-write window to the store, holding it in a write-locked state until release
|
||||
func (st *KVStore) Update() *StateRW {
|
||||
st.mutex.Lock()
|
||||
return &StateRW{store: st}
|
||||
state := st.mutex.LockMap()
|
||||
return &StateRW{store: st, state: state}
|
||||
}
|
||||
|
||||
// UpdateFn provides a read-write window to the store, holding it in a write-locked state until fn return.
|
||||
|
|
@ -229,3 +220,8 @@ func (st *KVStore) UpdateFn(fn func(*StateRW)) {
|
|||
// Pass to fn
|
||||
fn(state)
|
||||
}
|
||||
|
||||
// Close will close the underlying storage, the mutex map locking (e.g. RLock(), Lock() will still work).
|
||||
func (st *KVStore) Close() error {
|
||||
return st.storage.Close()
|
||||
}
|
||||
|
|
|
|||
86
vendor/codeberg.org/gruf/go-store/storage/block.go
generated
vendored
86
vendor/codeberg.org/gruf/go-store/storage/block.go
generated
vendored
|
|
@ -1,6 +1,7 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
|
|
@ -13,7 +14,6 @@ import (
|
|||
"codeberg.org/gruf/go-hashenc"
|
||||
"codeberg.org/gruf/go-pools"
|
||||
"codeberg.org/gruf/go-store/util"
|
||||
"github.com/zeebo/blake3"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -77,7 +77,7 @@ func getBlockConfig(cfg *BlockConfig) BlockConfig {
|
|||
|
||||
// BlockStorage is a Storage implementation that stores input data as chunks on
|
||||
// a filesystem. Each value is chunked into blocks of configured size and these
|
||||
// blocks are stored with name equal to their base64-encoded BLAKE3 hash-sum. A
|
||||
// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A
|
||||
// "node" file is finally created containing an array of hashes contained within
|
||||
// this value
|
||||
type BlockStorage struct {
|
||||
|
|
@ -87,7 +87,7 @@ type BlockStorage struct {
|
|||
config BlockConfig // cfg is the supplied configuration for this store
|
||||
hashPool sync.Pool // hashPool is this store's hashEncoder pool
|
||||
bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool
|
||||
lock *LockableFile // lock is the opened lockfile for this storage instance
|
||||
lock *Lock // lock is the opened lockfile for this storage instance
|
||||
|
||||
// NOTE:
|
||||
// BlockStorage does not need to lock each of the underlying block files
|
||||
|
|
@ -140,11 +140,9 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
|
|||
}
|
||||
|
||||
// Open and acquire storage lock for path
|
||||
lock, err := OpenLock(pb.Join(path, LockFile))
|
||||
lock, err := OpenLock(pb.Join(path, lockFile))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if err := lock.Lock(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Figure out the largest size for bufpool slices
|
||||
|
|
@ -174,14 +172,23 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
|
|||
|
||||
// Clean implements storage.Clean()
|
||||
func (st *BlockStorage) Clean() error {
|
||||
nodes := map[string]*node{}
|
||||
// Track open
|
||||
st.lock.Add()
|
||||
defer st.lock.Done()
|
||||
|
||||
// Check if open
|
||||
if st.lock.Closed() {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
// Acquire path builder
|
||||
pb := util.GetPathBuilder()
|
||||
defer util.PutPathBuilder(pb)
|
||||
|
||||
// Walk nodes dir for entries
|
||||
nodes := map[string]*node{}
|
||||
onceErr := errors.OnceError{}
|
||||
|
||||
// Walk nodes dir for entries
|
||||
err := util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) {
|
||||
// Only deal with regular files
|
||||
if !fsentry.Type().IsRegular() {
|
||||
|
|
@ -303,6 +310,7 @@ func (st *BlockStorage) ReadBytes(key string) ([]byte, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
// Read all bytes and return
|
||||
return io.ReadAll(rc)
|
||||
|
|
@ -316,9 +324,19 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Track open
|
||||
st.lock.Add()
|
||||
|
||||
// Check if open
|
||||
if st.lock.Closed() {
|
||||
st.lock.Done()
|
||||
return nil, ErrClosed
|
||||
}
|
||||
|
||||
// Attempt to open RO file
|
||||
file, err := open(npath, defaultFileROFlags)
|
||||
if err != nil {
|
||||
st.lock.Done()
|
||||
return nil, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
|
@ -338,14 +356,16 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
|
|||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
st.lock.Done()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Return new block reader
|
||||
return util.NopReadCloser(&blockReader{
|
||||
// Prepare block reader and return
|
||||
rc := util.NopReadCloser(&blockReader{
|
||||
storage: st,
|
||||
node: &node,
|
||||
}), nil
|
||||
}) // we wrap the blockreader to decr lockfile waitgroup
|
||||
return util.ReadCloserWithCallback(rc, st.lock.Done), nil
|
||||
}
|
||||
|
||||
func (st *BlockStorage) readBlock(key string) ([]byte, error) {
|
||||
|
|
@ -383,6 +403,15 @@ func (st *BlockStorage) WriteStream(key string, r io.Reader) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Track open
|
||||
st.lock.Add()
|
||||
defer st.lock.Done()
|
||||
|
||||
// Check if open
|
||||
if st.lock.Closed() {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
// Check if this exists
|
||||
ok, err := stat(key)
|
||||
if err != nil {
|
||||
|
|
@ -567,6 +596,15 @@ func (st *BlockStorage) Stat(key string) (bool, error) {
|
|||
return false, err
|
||||
}
|
||||
|
||||
// Track open
|
||||
st.lock.Add()
|
||||
defer st.lock.Done()
|
||||
|
||||
// Check if open
|
||||
if st.lock.Closed() {
|
||||
return false, ErrClosed
|
||||
}
|
||||
|
||||
// Check for file on disk
|
||||
return stat(kpath)
|
||||
}
|
||||
|
|
@ -579,18 +617,35 @@ func (st *BlockStorage) Remove(key string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Track open
|
||||
st.lock.Add()
|
||||
defer st.lock.Done()
|
||||
|
||||
// Check if open
|
||||
if st.lock.Closed() {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
// Attempt to remove file
|
||||
return os.Remove(kpath)
|
||||
}
|
||||
|
||||
// Close implements Storage.Close()
|
||||
func (st *BlockStorage) Close() error {
|
||||
defer st.lock.Close()
|
||||
return st.lock.Unlock()
|
||||
return st.lock.Close()
|
||||
}
|
||||
|
||||
// WalkKeys implements Storage.WalkKeys()
|
||||
func (st *BlockStorage) WalkKeys(opts WalkKeysOptions) error {
|
||||
// Track open
|
||||
st.lock.Add()
|
||||
defer st.lock.Done()
|
||||
|
||||
// Check if open
|
||||
if st.lock.Closed() {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
// Acquire path builder
|
||||
pb := util.GetPathBuilder()
|
||||
defer util.PutPathBuilder(pb)
|
||||
|
|
@ -800,7 +855,7 @@ var (
|
|||
|
||||
// encodedHashLen is the once-calculated encoded hash-sum length
|
||||
encodedHashLen = base64Encoding.EncodedLen(
|
||||
blake3.New().Size(),
|
||||
sha256.New().Size(),
|
||||
)
|
||||
)
|
||||
|
||||
|
|
@ -812,9 +867,8 @@ type hashEncoder struct {
|
|||
|
||||
// newHashEncoder returns a new hashEncoder instance
|
||||
func newHashEncoder() *hashEncoder {
|
||||
hash := blake3.New()
|
||||
return &hashEncoder{
|
||||
henc: hashenc.New(hash, base64Encoding),
|
||||
henc: hashenc.New(sha256.New(), base64Encoding),
|
||||
ebuf: make([]byte, encodedHashLen),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
67
vendor/codeberg.org/gruf/go-store/storage/disk.go
generated
vendored
67
vendor/codeberg.org/gruf/go-store/storage/disk.go
generated
vendored
|
|
@ -71,7 +71,7 @@ type DiskStorage struct {
|
|||
path string // path is the root path of this store
|
||||
bufp pools.BufferPool // bufp is the buffer pool for this DiskStorage
|
||||
config DiskConfig // cfg is the supplied configuration for this store
|
||||
lock *LockableFile // lock is the opened lockfile for this storage instance
|
||||
lock *Lock // lock is the opened lockfile for this storage instance
|
||||
}
|
||||
|
||||
// OpenFile opens a DiskStorage instance for given folder path and configuration
|
||||
|
|
@ -118,11 +118,9 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
|
|||
}
|
||||
|
||||
// Open and acquire storage lock for path
|
||||
lock, err := OpenLock(pb.Join(path, LockFile))
|
||||
lock, err := OpenLock(pb.Join(path, lockFile))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if err := lock.Lock(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Return new DiskStorage
|
||||
|
|
@ -136,6 +134,11 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
|
|||
|
||||
// Clean implements Storage.Clean()
|
||||
func (st *DiskStorage) Clean() error {
|
||||
st.lock.Add()
|
||||
defer st.lock.Done()
|
||||
if st.lock.Closed() {
|
||||
return ErrClosed
|
||||
}
|
||||
return util.CleanDirs(st.path)
|
||||
}
|
||||
|
||||
|
|
@ -160,9 +163,18 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Track open
|
||||
st.lock.Add()
|
||||
|
||||
// Check if open
|
||||
if st.lock.Closed() {
|
||||
return nil, ErrClosed
|
||||
}
|
||||
|
||||
// Attempt to open file (replace ENOENT with our own)
|
||||
file, err := open(kpath, defaultFileROFlags)
|
||||
if err != nil {
|
||||
st.lock.Done()
|
||||
return nil, errSwapNotFound(err)
|
||||
}
|
||||
|
||||
|
|
@ -170,12 +182,14 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) {
|
|||
cFile, err := st.config.Compression.Reader(file)
|
||||
if err != nil {
|
||||
file.Close() // close this here, ignore error
|
||||
st.lock.Done()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wrap compressor to ensure file close
|
||||
return util.ReadCloserWithCallback(cFile, func() {
|
||||
file.Close()
|
||||
st.lock.Done()
|
||||
}), nil
|
||||
}
|
||||
|
||||
|
|
@ -192,6 +206,15 @@ func (st *DiskStorage) WriteStream(key string, r io.Reader) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Track open
|
||||
st.lock.Add()
|
||||
defer st.lock.Done()
|
||||
|
||||
// Check if open
|
||||
if st.lock.Closed() {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
// Ensure dirs leading up to file exist
|
||||
err = os.MkdirAll(path.Dir(kpath), defaultDirPerms)
|
||||
if err != nil {
|
||||
|
|
@ -242,6 +265,15 @@ func (st *DiskStorage) Stat(key string) (bool, error) {
|
|||
return false, err
|
||||
}
|
||||
|
||||
// Track open
|
||||
st.lock.Add()
|
||||
defer st.lock.Done()
|
||||
|
||||
// Check if open
|
||||
if st.lock.Closed() {
|
||||
return false, ErrClosed
|
||||
}
|
||||
|
||||
// Check for file on disk
|
||||
return stat(kpath)
|
||||
}
|
||||
|
|
@ -254,18 +286,35 @@ func (st *DiskStorage) Remove(key string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Track open
|
||||
st.lock.Add()
|
||||
defer st.lock.Done()
|
||||
|
||||
// Check if open
|
||||
if st.lock.Closed() {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
// Attempt to remove file
|
||||
return os.Remove(kpath)
|
||||
}
|
||||
|
||||
// Close implements Storage.Close()
|
||||
func (st *DiskStorage) Close() error {
|
||||
defer st.lock.Close()
|
||||
return st.lock.Unlock()
|
||||
return st.lock.Close()
|
||||
}
|
||||
|
||||
// WalkKeys implements Storage.WalkKeys()
|
||||
func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error {
|
||||
// Track open
|
||||
st.lock.Add()
|
||||
defer st.lock.Done()
|
||||
|
||||
// Check if open
|
||||
if st.lock.Closed() {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
// Acquire path builder
|
||||
pb := util.GetPathBuilder()
|
||||
defer util.PutPathBuilder(pb)
|
||||
|
|
@ -286,13 +335,13 @@ func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error {
|
|||
|
||||
// filepath checks and returns a formatted filepath for given key
|
||||
func (st *DiskStorage) filepath(key string) (string, error) {
|
||||
// Calculate transformed key path
|
||||
key = st.config.Transform.KeyToPath(key)
|
||||
|
||||
// Acquire path builder
|
||||
pb := util.GetPathBuilder()
|
||||
defer util.PutPathBuilder(pb)
|
||||
|
||||
// Calculate transformed key path
|
||||
key = st.config.Transform.KeyToPath(key)
|
||||
|
||||
// Generated joined root path
|
||||
pb.AppendString(st.path)
|
||||
pb.AppendString(key)
|
||||
|
|
|
|||
14
vendor/codeberg.org/gruf/go-store/storage/errors.go
generated
vendored
14
vendor/codeberg.org/gruf/go-store/storage/errors.go
generated
vendored
|
|
@ -19,6 +19,9 @@ func (e errorString) Extend(s string, a ...interface{}) errorString {
|
|||
}
|
||||
|
||||
var (
|
||||
// ErrClosed is returned on operations on a closed storage
|
||||
ErrClosed = errorString("store/storage: closed")
|
||||
|
||||
// ErrNotFound is the error returned when a key cannot be found in storage
|
||||
ErrNotFound = errorString("store/storage: key not found")
|
||||
|
||||
|
|
@ -39,6 +42,9 @@ var (
|
|||
|
||||
// errCorruptNodes is returned when nodes with missing blocks are found during a BlockStorage clean
|
||||
errCorruptNodes = errorString("store/storage: corrupted nodes")
|
||||
|
||||
// ErrAlreadyLocked is returned on fail opening a storage lockfile
|
||||
ErrAlreadyLocked = errorString("store/storage: storage lock already open")
|
||||
)
|
||||
|
||||
// errSwapNoop performs no error swaps
|
||||
|
|
@ -61,3 +67,11 @@ func errSwapExist(err error) error {
|
|||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// errSwapUnavailable swaps syscall.EAGAIN for ErrAlreadyLocked
|
||||
func errSwapUnavailable(err error) error {
|
||||
if err == syscall.EAGAIN {
|
||||
return ErrAlreadyLocked
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
9
vendor/codeberg.org/gruf/go-store/storage/fs.go
generated
vendored
9
vendor/codeberg.org/gruf/go-store/storage/fs.go
generated
vendored
|
|
@ -8,11 +8,14 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
defaultDirPerms = 0755
|
||||
defaultFilePerms = 0644
|
||||
// default file permission bits
|
||||
defaultDirPerms = 0755
|
||||
defaultFilePerms = 0644
|
||||
|
||||
// default file open flags
|
||||
defaultFileROFlags = syscall.O_RDONLY
|
||||
defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR
|
||||
defaultFileLockFlags = syscall.O_RDONLY | syscall.O_EXCL | syscall.O_CREAT
|
||||
defaultFileLockFlags = syscall.O_RDONLY | syscall.O_CREAT
|
||||
)
|
||||
|
||||
// NOTE:
|
||||
|
|
|
|||
83
vendor/codeberg.org/gruf/go-store/storage/lock.go
generated
vendored
83
vendor/codeberg.org/gruf/go-store/storage/lock.go
generated
vendored
|
|
@ -1,38 +1,81 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
|
||||
"codeberg.org/gruf/go-store/util"
|
||||
)
|
||||
|
||||
// LockFile is our standard lockfile name.
|
||||
const LockFile = "store.lock"
|
||||
// lockFile is our standard lockfile name.
|
||||
var lockFile = "store.lock"
|
||||
|
||||
type LockableFile struct {
|
||||
*os.File
|
||||
// IsLockKey returns whether storage key is our lockfile.
|
||||
func IsLockKey(key string) bool {
|
||||
return key == lockFile
|
||||
}
|
||||
|
||||
// Lock represents a filesystem lock to ensure only one storage instance open per path.
|
||||
type Lock struct {
|
||||
fd int
|
||||
wg sync.WaitGroup
|
||||
st uint32
|
||||
}
|
||||
|
||||
// OpenLock opens a lockfile at path.
|
||||
func OpenLock(path string) (*LockableFile, error) {
|
||||
file, err := open(path, defaultFileLockFlags)
|
||||
func OpenLock(path string) (*Lock, error) {
|
||||
var fd int
|
||||
|
||||
// Open the file descriptor at path
|
||||
err := util.RetryOnEINTR(func() (err error) {
|
||||
fd, err = syscall.Open(path, defaultFileLockFlags, defaultFilePerms)
|
||||
return
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &LockableFile{file}, nil
|
||||
}
|
||||
|
||||
func (f *LockableFile) Lock() error {
|
||||
return f.flock(syscall.LOCK_EX | syscall.LOCK_NB)
|
||||
}
|
||||
|
||||
func (f *LockableFile) Unlock() error {
|
||||
return f.flock(syscall.LOCK_UN | syscall.LOCK_NB)
|
||||
}
|
||||
|
||||
func (f *LockableFile) flock(how int) error {
|
||||
return util.RetryOnEINTR(func() error {
|
||||
return syscall.Flock(int(f.Fd()), how)
|
||||
// Get a flock on the file descriptor
|
||||
err = util.RetryOnEINTR(func() error {
|
||||
return syscall.Flock(fd, syscall.LOCK_EX|syscall.LOCK_NB)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errSwapUnavailable(err)
|
||||
}
|
||||
|
||||
return &Lock{fd: fd}, nil
|
||||
}
|
||||
|
||||
// Add will add '1' to the underlying sync.WaitGroup.
|
||||
func (f *Lock) Add() {
|
||||
f.wg.Add(1)
|
||||
}
|
||||
|
||||
// Done will decrememnt '1' from the underlying sync.WaitGroup.
|
||||
func (f *Lock) Done() {
|
||||
f.wg.Done()
|
||||
}
|
||||
|
||||
// Close will attempt to close the lockfile and file descriptor.
|
||||
func (f *Lock) Close() error {
|
||||
var err error
|
||||
if atomic.CompareAndSwapUint32(&f.st, 0, 1) {
|
||||
// Wait until done
|
||||
f.wg.Wait()
|
||||
|
||||
// Ensure gets closed
|
||||
defer syscall.Close(f.fd)
|
||||
|
||||
// Call funlock on the file descriptor
|
||||
err = util.RetryOnEINTR(func() error {
|
||||
return syscall.Flock(f.fd, syscall.LOCK_UN|syscall.LOCK_NB)
|
||||
})
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Closed will return whether this lockfile has been closed (and unlocked).
|
||||
func (f *Lock) Closed() bool {
|
||||
return (atomic.LoadUint32(&f.st) == 1)
|
||||
}
|
||||
|
|
|
|||
78
vendor/codeberg.org/gruf/go-store/storage/memory.go
generated
vendored
78
vendor/codeberg.org/gruf/go-store/storage/memory.go
generated
vendored
|
|
@ -14,6 +14,7 @@ type MemoryStorage struct {
|
|||
ow bool // overwrites
|
||||
fs map[string][]byte
|
||||
mu sync.Mutex
|
||||
st uint32
|
||||
}
|
||||
|
||||
// OpenMemory opens a new MemoryStorage instance with internal map of 'size'.
|
||||
|
|
@ -27,13 +28,26 @@ func OpenMemory(size int, overwrites bool) *MemoryStorage {
|
|||
|
||||
// Clean implements Storage.Clean().
|
||||
func (st *MemoryStorage) Clean() error {
|
||||
st.mu.Lock()
|
||||
defer st.mu.Unlock()
|
||||
if st.st == 1 {
|
||||
return ErrClosed
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReadBytes implements Storage.ReadBytes().
|
||||
func (st *MemoryStorage) ReadBytes(key string) ([]byte, error) {
|
||||
// Safely check store
|
||||
// Lock storage
|
||||
st.mu.Lock()
|
||||
|
||||
// Check store open
|
||||
if st.st == 1 {
|
||||
st.mu.Unlock()
|
||||
return nil, ErrClosed
|
||||
}
|
||||
|
||||
// Check for key
|
||||
b, ok := st.fs[key]
|
||||
st.mu.Unlock()
|
||||
|
||||
|
|
@ -48,8 +62,16 @@ func (st *MemoryStorage) ReadBytes(key string) ([]byte, error) {
|
|||
|
||||
// ReadStream implements Storage.ReadStream().
|
||||
func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) {
|
||||
// Safely check store
|
||||
// Lock storage
|
||||
st.mu.Lock()
|
||||
|
||||
// Check store open
|
||||
if st.st == 1 {
|
||||
st.mu.Unlock()
|
||||
return nil, ErrClosed
|
||||
}
|
||||
|
||||
// Check for key
|
||||
b, ok := st.fs[key]
|
||||
st.mu.Unlock()
|
||||
|
||||
|
|
@ -66,19 +88,24 @@ func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) {
|
|||
|
||||
// WriteBytes implements Storage.WriteBytes().
|
||||
func (st *MemoryStorage) WriteBytes(key string, b []byte) error {
|
||||
// Safely check store
|
||||
// Lock storage
|
||||
st.mu.Lock()
|
||||
defer st.mu.Unlock()
|
||||
|
||||
// Check store open
|
||||
if st.st == 1 {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
_, ok := st.fs[key]
|
||||
|
||||
// Check for already exist
|
||||
if ok && !st.ow {
|
||||
st.mu.Unlock()
|
||||
return ErrAlreadyExists
|
||||
}
|
||||
|
||||
// Write + unlock
|
||||
st.fs[key] = bytes.Copy(b)
|
||||
st.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -96,43 +123,66 @@ func (st *MemoryStorage) WriteStream(key string, r io.Reader) error {
|
|||
|
||||
// Stat implements Storage.Stat().
|
||||
func (st *MemoryStorage) Stat(key string) (bool, error) {
|
||||
// Lock storage
|
||||
st.mu.Lock()
|
||||
defer st.mu.Unlock()
|
||||
|
||||
// Check store open
|
||||
if st.st == 1 {
|
||||
return false, ErrClosed
|
||||
}
|
||||
|
||||
// Check for key
|
||||
_, ok := st.fs[key]
|
||||
st.mu.Unlock()
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
// Remove implements Storage.Remove().
|
||||
func (st *MemoryStorage) Remove(key string) error {
|
||||
// Safely check store
|
||||
// Lock storage
|
||||
st.mu.Lock()
|
||||
_, ok := st.fs[key]
|
||||
defer st.mu.Unlock()
|
||||
|
||||
// Check in store
|
||||
// Check store open
|
||||
if st.st == 1 {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
// Check for key
|
||||
_, ok := st.fs[key]
|
||||
if !ok {
|
||||
st.mu.Unlock()
|
||||
return ErrNotFound
|
||||
}
|
||||
|
||||
// Delete + unlock
|
||||
// Remove from store
|
||||
delete(st.fs, key)
|
||||
st.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close implements Storage.Close().
|
||||
func (st *MemoryStorage) Close() error {
|
||||
st.mu.Lock()
|
||||
st.st = 1
|
||||
st.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// WalkKeys implements Storage.WalkKeys().
|
||||
func (st *MemoryStorage) WalkKeys(opts WalkKeysOptions) error {
|
||||
// Safely walk storage keys
|
||||
// Lock storage
|
||||
st.mu.Lock()
|
||||
defer st.mu.Unlock()
|
||||
|
||||
// Check store open
|
||||
if st.st == 1 {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
// Walk store keys
|
||||
for key := range st.fs {
|
||||
opts.WalkFn(entry(key))
|
||||
}
|
||||
st.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue