update dependencies (#296)

This commit is contained in:
tobi 2021-11-13 12:29:08 +01:00 committed by GitHub
commit 829a934d23
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
124 changed files with 2453 additions and 1588 deletions

9
vendor/codeberg.org/gruf/go-store/LICENSE generated vendored Normal file
View file

@ -0,0 +1,9 @@
MIT License
Copyright (c) 2021 gruf
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

64
vendor/codeberg.org/gruf/go-store/kv/iterator.go generated vendored Normal file
View file

@ -0,0 +1,64 @@
package kv
import (
"codeberg.org/gruf/go-errors"
"codeberg.org/gruf/go-store/storage"
)
var ErrIteratorClosed = errors.New("store/kv: iterator closed")
// KVIterator provides a read-only iterator to all the key-value
// pairs in a KVStore. While the iterator is open the store is read
// locked, you MUST release the iterator when you are finished with
// it.
//
// Please note:
// - individual iterators are NOT concurrency safe, though it is safe to
// have multiple iterators running concurrently
type KVIterator struct {
store *KVStore // store is the linked KVStore
entries []storage.StorageEntry
index int
key string
onClose func()
}
// Next attempts to set the next key-value pair, the
// return value is if there was another pair remaining
func (i *KVIterator) Next() bool {
next := i.index + 1
if next >= len(i.entries) {
i.key = ""
return false
}
i.key = i.entries[next].Key()
i.index = next
return true
}
// Key returns the next key from the store
func (i *KVIterator) Key() string {
return i.key
}
// Release releases the KVIterator and KVStore's read lock
func (i *KVIterator) Release() {
// Reset key, path, entries
i.store = nil
i.key = ""
i.entries = nil
// Perform requested callback
i.onClose()
}
// Value returns the next value from the KVStore
func (i *KVIterator) Value() ([]byte, error) {
// Check store isn't closed
if i.store == nil {
return nil, ErrIteratorClosed
}
// Attempt to fetch from store
return i.store.get(i.key)
}

125
vendor/codeberg.org/gruf/go-store/kv/state.go generated vendored Normal file
View file

@ -0,0 +1,125 @@
package kv
import (
"io"
"codeberg.org/gruf/go-errors"
)
var ErrStateClosed = errors.New("store/kv: state closed")
// StateRO provides a read-only window to the store. While this
// state is active during the Read() function window, the entire
// store will be read-locked. The state is thread-safe for concurrent
// use UNTIL the moment that your supplied function to Read() returns,
// then the state has zero guarantees
type StateRO struct {
store *KVStore
}
func (st *StateRO) Get(key string) ([]byte, error) {
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
return st.store.get(key)
}
func (st *StateRO) GetStream(key string) (io.ReadCloser, error) {
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
return st.store.getStream(key)
}
func (st *StateRO) Has(key string) (bool, error) {
// Check not closed
if st.store == nil {
return false, ErrStateClosed
}
// Pass request to store
return st.store.has(key)
}
func (st *StateRO) close() {
st.store = nil
}
// StateRW provides a read-write window to the store. While this
// state is active during the Update() function window, the entire
// store will be locked. The state is thread-safe for concurrent
// use UNTIL the moment that your supplied function to Update() returns,
// then the state has zero guarantees
type StateRW struct {
store *KVStore
}
func (st *StateRW) Get(key string) ([]byte, error) {
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
return st.store.get(key)
}
func (st *StateRW) GetStream(key string) (io.ReadCloser, error) {
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
return st.store.getStream(key)
}
func (st *StateRW) Put(key string, value []byte) error {
// Check not closed
if st.store == nil {
return ErrStateClosed
}
// Pass request to store
return st.store.put(key, value)
}
func (st *StateRW) PutStream(key string, r io.Reader) error {
// Check not closed
if st.store == nil {
return ErrStateClosed
}
// Pass request to store
return st.store.putStream(key, r)
}
func (st *StateRW) Has(key string) (bool, error) {
// Check not closed
if st.store == nil {
return false, ErrStateClosed
}
// Pass request to store
return st.store.has(key)
}
func (st *StateRW) Delete(key string) error {
// Check not closed
if st.store == nil {
return ErrStateClosed
}
// Pass request to store
return st.store.delete(key)
}
func (st *StateRW) close() {
st.store = nil
}

243
vendor/codeberg.org/gruf/go-store/kv/store.go generated vendored Normal file
View file

@ -0,0 +1,243 @@
package kv
import (
"io"
"sync"
"codeberg.org/gruf/go-mutexes"
"codeberg.org/gruf/go-store/storage"
"codeberg.org/gruf/go-store/util"
)
// KVStore is a very simple, yet performant key-value store
type KVStore struct {
mutexMap mutexes.MutexMap // mutexMap is a map of keys to mutexes to protect file access
mutex sync.RWMutex // mutex is the total store mutex
storage storage.Storage // storage is the underlying storage
}
func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) {
// Attempt to open disk storage
storage, err := storage.OpenFile(path, cfg)
if err != nil {
return nil, err
}
// Return new KVStore
return OpenStorage(storage)
}
func OpenBlock(path string, cfg *storage.BlockConfig) (*KVStore, error) {
// Attempt to open block storage
storage, err := storage.OpenBlock(path, cfg)
if err != nil {
return nil, err
}
// Return new KVStore
return OpenStorage(storage)
}
func OpenStorage(storage storage.Storage) (*KVStore, error) {
// Perform initial storage clean
err := storage.Clean()
if err != nil {
return nil, err
}
// Return new KVStore
return &KVStore{
mutexMap: mutexes.NewMap(mutexes.NewRW),
mutex: sync.RWMutex{},
storage: storage,
}, nil
}
// Get fetches the bytes for supplied key in the store
func (st *KVStore) Get(key string) ([]byte, error) {
// Acquire store read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Pass to unprotected fn
return st.get(key)
}
func (st *KVStore) get(key string) ([]byte, error) {
// Acquire read lock for key
runlock := st.mutexMap.RLock(key)
defer runlock()
// Read file bytes
return st.storage.ReadBytes(key)
}
// GetStream fetches a ReadCloser for the bytes at the supplied key location in the store
func (st *KVStore) GetStream(key string) (io.ReadCloser, error) {
// Acquire store read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Pass to unprotected fn
return st.getStream(key)
}
func (st *KVStore) getStream(key string) (io.ReadCloser, error) {
// Acquire read lock for key
runlock := st.mutexMap.RLock(key)
// Attempt to open stream for read
rd, err := st.storage.ReadStream(key)
if err != nil {
runlock()
return nil, err
}
// Wrap readcloser in our own callback closer
return util.ReadCloserWithCallback(rd, runlock), nil
}
// Put places the bytes at the supplied key location in the store
func (st *KVStore) Put(key string, value []byte) error {
// Acquire store write lock
st.mutex.Lock()
defer st.mutex.Unlock()
// Pass to unprotected fn
return st.put(key, value)
}
func (st *KVStore) put(key string, value []byte) error {
// Acquire write lock for key
unlock := st.mutexMap.Lock(key)
defer unlock()
// Write file bytes
return st.storage.WriteBytes(key, value)
}
// PutStream writes the bytes from the supplied Reader at the supplied key location in the store
func (st *KVStore) PutStream(key string, r io.Reader) error {
// Acquire store write lock
st.mutex.Lock()
defer st.mutex.Unlock()
// Pass to unprotected fn
return st.putStream(key, r)
}
func (st *KVStore) putStream(key string, r io.Reader) error {
// Acquire write lock for key
unlock := st.mutexMap.Lock(key)
defer unlock()
// Write file stream
return st.storage.WriteStream(key, r)
}
// Has checks whether the supplied key exists in the store
func (st *KVStore) Has(key string) (bool, error) {
// Acquire store read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Pass to unprotected fn
return st.has(key)
}
func (st *KVStore) has(key string) (bool, error) {
// Acquire read lock for key
runlock := st.mutexMap.RLock(key)
defer runlock()
// Stat file on disk
return st.storage.Stat(key)
}
// Delete removes the supplied key-value pair from the store
func (st *KVStore) Delete(key string) error {
// Acquire store write lock
st.mutex.Lock()
defer st.mutex.Unlock()
// Pass to unprotected fn
return st.delete(key)
}
func (st *KVStore) delete(key string) error {
// Acquire write lock for key
unlock := st.mutexMap.Lock(key)
defer unlock()
// Remove file from disk
return st.storage.Remove(key)
}
// Iterator returns an Iterator for key-value pairs in the store, using supplied match function
func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) {
// If no function, match all
if matchFn == nil {
matchFn = func(string) bool { return true }
}
// Get store read lock
st.mutex.RLock()
// Setup the walk keys function
entries := []storage.StorageEntry{}
walkFn := func(entry storage.StorageEntry) {
// Ignore unmatched entries
if !matchFn(entry.Key()) {
return
}
// Add to entries
entries = append(entries, entry)
}
// Walk keys in the storage
err := st.storage.WalkKeys(&storage.WalkKeysOptions{WalkFn: walkFn})
if err != nil {
st.mutex.RUnlock()
return nil, err
}
// Return new iterator
return &KVIterator{
store: st,
entries: entries,
index: -1,
key: "",
onClose: st.mutex.RUnlock,
}, nil
}
// Read provides a read-only window to the store, holding it in a read-locked state until
// the supplied function returns
func (st *KVStore) Read(do func(*StateRO)) {
// Get store read lock
st.mutex.RLock()
defer st.mutex.RUnlock()
// Create new store state (defer close)
state := &StateRO{store: st}
defer state.close()
// Pass state
do(state)
}
// Update provides a read-write window to the store, holding it in a read-write-locked state
// until the supplied functions returns
func (st *KVStore) Update(do func(*StateRW)) {
// Get store lock
st.mutex.Lock()
defer st.mutex.Unlock()
// Create new store state (defer close)
state := &StateRW{store: st}
defer state.close()
// Pass state
do(state)
}

797
vendor/codeberg.org/gruf/go-store/storage/block.go generated vendored Normal file
View file

@ -0,0 +1,797 @@
package storage
import (
"crypto/sha256"
"io"
"io/fs"
"os"
"strings"
"sync"
"syscall"
"codeberg.org/gruf/go-bytes"
"codeberg.org/gruf/go-errors"
"codeberg.org/gruf/go-hashenc"
"codeberg.org/gruf/go-pools"
"codeberg.org/gruf/go-store/util"
)
var (
nodePathPrefix = "node/"
blockPathPrefix = "block/"
)
// DefaultBlockConfig is the default BlockStorage configuration
var DefaultBlockConfig = &BlockConfig{
BlockSize: 1024 * 16,
WriteBufSize: 4096,
Overwrite: false,
Compression: NoCompression(),
}
// BlockConfig defines options to be used when opening a BlockStorage
type BlockConfig struct {
// BlockSize is the chunking size to use when splitting and storing blocks of data
BlockSize int
// WriteBufSize is the buffer size to use when writing file streams (PutStream)
WriteBufSize int
// Overwrite allows overwriting values of stored keys in the storage
Overwrite bool
// Compression is the Compressor to use when reading / writing files, default is no compression
Compression Compressor
}
// getBlockConfig returns a valid BlockConfig for supplied ptr
func getBlockConfig(cfg *BlockConfig) BlockConfig {
// If nil, use default
if cfg == nil {
cfg = DefaultBlockConfig
}
// Assume nil compress == none
if cfg.Compression == nil {
cfg.Compression = NoCompression()
}
// Assume 0 chunk size == use default
if cfg.BlockSize < 1 {
cfg.BlockSize = DefaultBlockConfig.BlockSize
}
// Assume 0 buf size == use default
if cfg.WriteBufSize < 1 {
cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize
}
// Return owned config copy
return BlockConfig{
BlockSize: cfg.BlockSize,
WriteBufSize: cfg.WriteBufSize,
Overwrite: cfg.Overwrite,
Compression: cfg.Compression,
}
}
// BlockStorage is a Storage implementation that stores input data as chunks on
// a filesystem. Each value is chunked into blocks of configured size and these
// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A
// "node" file is finally created containing an array of hashes contained within
// this value
type BlockStorage struct {
path string // path is the root path of this store
blockPath string // blockPath is the joined root path + block path prefix
nodePath string // nodePath is the joined root path + node path prefix
config BlockConfig // cfg is the supplied configuration for this store
hashPool sync.Pool // hashPool is this store's hashEncoder pool
bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool
// NOTE:
// BlockStorage does not need to lock each of the underlying block files
// as the filename itself directly relates to the contents. If there happens
// to be an overwrite, it will just be of the same data since the filename is
// the hash of the data.
}
// OpenBlock opens a BlockStorage instance for given folder path and configuration
func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
// Clean provided path, ensure ends in '/' (should
// be dir, this helps with file path trimming later)
path = pb.Clean(path) + "/"
// Get checked config
config := getBlockConfig(cfg)
// Attempt to open path
file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
if err != nil {
// If not a not-exist error, return
if !os.IsNotExist(err) {
return nil, err
}
// Attempt to make store path dirs
err = os.MkdirAll(path, defaultDirPerms)
if err != nil {
return nil, err
}
// Reopen dir now it's been created
file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
if err != nil {
return nil, err
}
}
defer file.Close()
// Double check this is a dir (NOT a file!)
stat, err := file.Stat()
if err != nil {
return nil, err
} else if !stat.IsDir() {
return nil, errPathIsFile
}
// Figure out the largest size for bufpool slices
bufSz := encodedHashLen
if bufSz < config.BlockSize {
bufSz = config.BlockSize
}
if bufSz < config.WriteBufSize {
bufSz = config.WriteBufSize
}
// Return new BlockStorage
return &BlockStorage{
path: path,
blockPath: pb.Join(path, blockPathPrefix),
nodePath: pb.Join(path, nodePathPrefix),
config: config,
hashPool: sync.Pool{
New: func() interface{} {
return newHashEncoder()
},
},
bufpool: pools.NewBufferPool(bufSz),
}, nil
}
// Clean implements storage.Clean()
func (st *BlockStorage) Clean() error {
nodes := map[string]*node{}
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
// Walk nodes dir for entries
onceErr := errors.OnceError{}
err := util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) {
// Only deal with regular files
if !fsentry.Type().IsRegular() {
return
}
// Stop if we hit error previously
if onceErr.IsSet() {
return
}
// Get joined node path name
npath = pb.Join(npath, fsentry.Name())
// Attempt to open RO file
file, err := open(npath, defaultFileROFlags)
if err != nil {
onceErr.Store(err)
return
}
defer file.Close()
// Alloc new Node + acquire hash buffer for writes
hbuf := st.bufpool.Get()
defer st.bufpool.Put(hbuf)
hbuf.Guarantee(encodedHashLen)
node := node{}
// Write file contents to node
_, err = io.CopyBuffer(
&nodeWriter{
node: &node,
buf: hbuf,
},
file,
nil,
)
if err != nil {
onceErr.Store(err)
return
}
// Append to nodes slice
nodes[fsentry.Name()] = &node
})
// Handle errors (though nodePath may not have been created yet)
if err != nil && !os.IsNotExist(err) {
return err
} else if onceErr.IsSet() {
return onceErr.Load()
}
// Walk blocks dir for entries
onceErr.Reset()
err = util.WalkDir(pb, st.blockPath, func(bpath string, fsentry fs.DirEntry) {
// Only deal with regular files
if !fsentry.Type().IsRegular() {
return
}
// Stop if we hit error previously
if onceErr.IsSet() {
return
}
inUse := false
for key, node := range nodes {
if node.removeHash(fsentry.Name()) {
if len(node.hashes) < 1 {
// This node contained hash, and after removal is now empty.
// Remove this node from our tracked nodes slice
delete(nodes, key)
}
inUse = true
}
}
// Block hash is used by node
if inUse {
return
}
// Get joined block path name
bpath = pb.Join(bpath, fsentry.Name())
// Remove this unused block path
err := os.Remove(bpath)
if err != nil {
onceErr.Store(err)
return
}
})
// Handle errors (though blockPath may not have been created yet)
if err != nil && !os.IsNotExist(err) {
return err
} else if onceErr.IsSet() {
return onceErr.Load()
}
// If there are nodes left at this point, they are corrupt
// (i.e. they're referencing block hashes that don't exist)
if len(nodes) > 0 {
nodeKeys := []string{}
for key := range nodes {
nodeKeys = append(nodeKeys, key)
}
return errCorruptNodes.Extend("%v", nodeKeys)
}
return nil
}
// ReadBytes implements Storage.ReadBytes()
func (st *BlockStorage) ReadBytes(key string) ([]byte, error) {
// Get stream reader for key
rc, err := st.ReadStream(key)
if err != nil {
return nil, err
}
// Read all bytes and return
return io.ReadAll(rc)
}
// ReadStream implements Storage.ReadStream()
func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
// Get node file path for key
npath, err := st.nodePathForKey(key)
if err != nil {
return nil, err
}
// Attempt to open RO file
file, err := open(npath, defaultFileROFlags)
if err != nil {
return nil, err
}
defer file.Close()
// Acquire hash buffer for writes
hbuf := st.bufpool.Get()
defer st.bufpool.Put(hbuf)
// Write file contents to node
node := node{}
_, err = io.CopyBuffer(
&nodeWriter{
node: &node,
buf: hbuf,
},
file,
nil,
)
if err != nil {
return nil, err
}
// Return new block reader
return util.NopReadCloser(&blockReader{
storage: st,
node: &node,
}), nil
}
func (st *BlockStorage) readBlock(key string) ([]byte, error) {
// Get block file path for key
bpath := st.blockPathForKey(key)
// Attempt to open RO file
file, err := open(bpath, defaultFileROFlags)
if err != nil {
return nil, err
}
defer file.Close()
// Wrap the file in a compressor
cFile, err := st.config.Compression.Reader(file)
if err != nil {
return nil, err
}
defer cFile.Close()
// Read the entire file
return io.ReadAll(cFile)
}
// WriteBytes implements Storage.WriteBytes()
func (st *BlockStorage) WriteBytes(key string, value []byte) error {
return st.WriteStream(key, bytes.NewReader(value))
}
// WriteStream implements Storage.WriteStream()
func (st *BlockStorage) WriteStream(key string, r io.Reader) error {
// Get node file path for key
npath, err := st.nodePathForKey(key)
if err != nil {
return err
}
// Check if this exists
ok, err := stat(key)
if err != nil {
return err
}
// Check if we allow overwrites
if ok && !st.config.Overwrite {
return ErrAlreadyExists
}
// Ensure nodes dir (and any leading up to) exists
err = os.MkdirAll(st.nodePath, defaultDirPerms)
if err != nil {
return err
}
// Ensure blocks dir (and any leading up to) exists
err = os.MkdirAll(st.blockPath, defaultDirPerms)
if err != nil {
return err
}
// Alloc new node
node := node{}
// Acquire HashEncoder
hc := st.hashPool.Get().(*hashEncoder)
defer st.hashPool.Put(hc)
// Create new waitgroup and OnceError for
// goroutine error tracking and propagating
wg := sync.WaitGroup{}
onceErr := errors.OnceError{}
loop:
for !onceErr.IsSet() {
// Fetch new buffer for this loop
buf := st.bufpool.Get()
buf.Grow(st.config.BlockSize)
// Read next chunk
n, err := io.ReadFull(r, buf.B)
switch err {
case nil, io.ErrUnexpectedEOF:
// do nothing
case io.EOF:
st.bufpool.Put(buf)
break loop
default:
st.bufpool.Put(buf)
return err
}
// Hash the encoded data
sum := hc.EncodeSum(buf.B)
// Append to the node's hashes
node.hashes = append(node.hashes, sum.String())
// If already on disk, skip
has, err := st.statBlock(sum.StringPtr())
if err != nil {
st.bufpool.Put(buf)
return err
} else if has {
st.bufpool.Put(buf)
continue loop
}
// Write in separate goroutine
wg.Add(1)
go func() {
// Defer buffer release + signal done
defer func() {
st.bufpool.Put(buf)
wg.Done()
}()
// Write block to store at hash
err = st.writeBlock(sum.StringPtr(), buf.B[:n])
if err != nil {
onceErr.Store(err)
return
}
}()
// We reached EOF
if n < buf.Len() {
break loop
}
}
// Wait, check errors
wg.Wait()
if onceErr.IsSet() {
return onceErr.Load()
}
// If no hashes created, return
if len(node.hashes) < 1 {
return errNoHashesWritten
}
// Prepare to swap error if need-be
errSwap := errSwapNoop
// Build file RW flags
// NOTE: we performed an initial check for
// this before writing blocks, but if
// the utilizer of this storage didn't
// correctly mutex protect this key then
// someone may have beaten us to the
// punch at writing the node file.
flags := defaultFileRWFlags
if !st.config.Overwrite {
flags |= syscall.O_EXCL
// Catch + replace err exist
errSwap = errSwapExist
}
// Attempt to open RW file
file, err := open(npath, flags)
if err != nil {
return errSwap(err)
}
defer file.Close()
// Acquire write buffer
buf := st.bufpool.Get()
defer st.bufpool.Put(buf)
buf.Grow(st.config.WriteBufSize)
// Finally, write data to file
_, err = io.CopyBuffer(file, &nodeReader{node: &node}, nil)
return err
}
// writeBlock writes the block with hash and supplied value to the filesystem
func (st *BlockStorage) writeBlock(hash string, value []byte) error {
// Get block file path for key
bpath := st.blockPathForKey(hash)
// Attempt to open RW file
file, err := open(bpath, defaultFileRWFlags)
if err != nil {
if err == ErrAlreadyExists {
err = nil /* race issue describe in struct NOTE */
}
return err
}
defer file.Close()
// Wrap the file in a compressor
cFile, err := st.config.Compression.Writer(file)
if err != nil {
return err
}
defer cFile.Close()
// Write value to file
_, err = cFile.Write(value)
return err
}
// statBlock checks for existence of supplied block hash
func (st *BlockStorage) statBlock(hash string) (bool, error) {
return stat(st.blockPathForKey(hash))
}
// Stat implements Storage.Stat()
func (st *BlockStorage) Stat(key string) (bool, error) {
// Get node file path for key
kpath, err := st.nodePathForKey(key)
if err != nil {
return false, err
}
// Check for file on disk
return stat(kpath)
}
// Remove implements Storage.Remove()
func (st *BlockStorage) Remove(key string) error {
// Get node file path for key
kpath, err := st.nodePathForKey(key)
if err != nil {
return err
}
// Attempt to remove file
return os.Remove(kpath)
}
// WalkKeys implements Storage.WalkKeys()
func (st *BlockStorage) WalkKeys(opts *WalkKeysOptions) error {
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
// Walk dir for entries
return util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) {
// Only deal with regular files
if fsentry.Type().IsRegular() {
opts.WalkFn(entry(fsentry.Name()))
}
})
}
// nodePathForKey calculates the node file path for supplied key
func (st *BlockStorage) nodePathForKey(key string) (string, error) {
// Path separators are illegal
if strings.Contains(key, "/") {
return "", ErrInvalidKey
}
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
// Return joined + cleaned node-path
return pb.Join(st.nodePath, key), nil
}
// blockPathForKey calculates the block file path for supplied hash
func (st *BlockStorage) blockPathForKey(hash string) string {
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
return pb.Join(st.blockPath, hash)
}
// hashSeparator is the separating byte between block hashes
const hashSeparator = byte(':')
// node represents the contents of a node file in storage
type node struct {
hashes []string
}
// removeHash attempts to remove supplied block hash from the node's hash array
func (n *node) removeHash(hash string) bool {
haveDropped := false
for i := 0; i < len(n.hashes); {
if n.hashes[i] == hash {
// Drop this hash from slice
n.hashes = append(n.hashes[:i], n.hashes[i+1:]...)
haveDropped = true
} else {
// Continue iter
i++
}
}
return haveDropped
}
// nodeReader is an io.Reader implementation for the node file representation,
// which is useful when calculated node file is being written to the store
type nodeReader struct {
node *node
idx int
last int
}
func (r *nodeReader) Read(b []byte) (int, error) {
n := 0
// '-1' means we missed writing
// hash separator on last iteration
if r.last == -1 {
b[n] = hashSeparator
n++
r.last = 0
}
for r.idx < len(r.node.hashes) {
hash := r.node.hashes[r.idx]
// Copy into buffer + update read count
m := copy(b[n:], hash[r.last:])
n += m
// If incomplete copy, return here
if m < len(hash)-r.last {
r.last = m
return n, nil
}
// Check we can write last separator
if n == len(b) {
r.last = -1
return n, nil
}
// Write separator, iter, reset
b[n] = hashSeparator
n++
r.idx++
r.last = 0
}
// We reached end of hashes
return n, io.EOF
}
// nodeWriter is an io.Writer implementation for the node file representation,
// which is useful when calculated node file is being read from the store
type nodeWriter struct {
node *node
buf *bytes.Buffer
}
func (w *nodeWriter) Write(b []byte) (int, error) {
n := 0
for {
// Find next hash separator position
idx := bytes.IndexByte(b[n:], hashSeparator)
if idx == -1 {
// Check we shouldn't be expecting it
if w.buf.Len() > encodedHashLen {
return n, errInvalidNode
}
// Write all contents to buffer
w.buf.Write(b[n:])
return len(b), nil
}
// Found hash separator, write
// current buf contents to Node hashes
w.buf.Write(b[n : n+idx])
n += idx + 1
if w.buf.Len() != encodedHashLen {
return n, errInvalidNode
}
// Append to hashes & reset
w.node.hashes = append(w.node.hashes, w.buf.String())
w.buf.Reset()
}
}
// blockReader is an io.Reader implementation for the combined, linked block
// data contained with a node file. Basically, this allows reading value data
// from the store for a given node file
type blockReader struct {
storage *BlockStorage
node *node
buf []byte
prev int
}
func (r *blockReader) Read(b []byte) (int, error) {
n := 0
// Data left in buf, copy as much as we
// can into supplied read buffer
if r.prev < len(r.buf)-1 {
n += copy(b, r.buf[r.prev:])
r.prev += n
if n >= len(b) {
return n, nil
}
}
for {
// Check we have any hashes left
if len(r.node.hashes) < 1 {
return n, io.EOF
}
// Get next key from slice
key := r.node.hashes[0]
r.node.hashes = r.node.hashes[1:]
// Attempt to fetch next batch of data
var err error
r.buf, err = r.storage.readBlock(key)
if err != nil {
return n, err
}
r.prev = 0
// Copy as much as can from new buffer
m := copy(b[n:], r.buf)
r.prev += m
n += m
// If we hit end of supplied buf, return
if n >= len(b) {
return n, nil
}
}
}
// hashEncoder is a HashEncoder with built-in encode buffer
type hashEncoder struct {
henc hashenc.HashEncoder
ebuf []byte
}
// encodedHashLen is the once-calculated encoded hash-sum length
var encodedHashLen = hashenc.Base64().EncodedLen(
sha256.New().Size(),
)
// newHashEncoder returns a new hashEncoder instance
func newHashEncoder() *hashEncoder {
hash := sha256.New()
enc := hashenc.Base64()
return &hashEncoder{
henc: hashenc.New(hash, enc),
ebuf: make([]byte, enc.EncodedLen(hash.Size())),
}
}
// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum()
func (henc *hashEncoder) EncodeSum(src []byte) bytes.Bytes {
henc.henc.EncodeSum(henc.ebuf, src)
return bytes.ToBytes(henc.ebuf)
}

104
vendor/codeberg.org/gruf/go-store/storage/compressor.go generated vendored Normal file
View file

@ -0,0 +1,104 @@
package storage
import (
"compress/gzip"
"compress/zlib"
"io"
"codeberg.org/gruf/go-store/util"
"github.com/golang/snappy"
)
// Compressor defines a means of compressing/decompressing values going into a key-value store
type Compressor interface {
// Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader
Reader(io.Reader) (io.ReadCloser, error)
// Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer
Writer(io.Writer) (io.WriteCloser, error)
}
type gzipCompressor struct {
level int
}
// GZipCompressor returns a new Compressor that implements GZip at default compression level
func GZipCompressor() Compressor {
return GZipCompressorLevel(gzip.DefaultCompression)
}
// GZipCompressorLevel returns a new Compressor that implements GZip at supplied compression level
func GZipCompressorLevel(level int) Compressor {
return &gzipCompressor{
level: level,
}
}
func (c *gzipCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
return gzip.NewReader(r)
}
func (c *gzipCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
return gzip.NewWriterLevel(w, c.level)
}
type zlibCompressor struct {
level int
dict []byte
}
// ZLibCompressor returns a new Compressor that implements ZLib at default compression level
func ZLibCompressor() Compressor {
return ZLibCompressorLevelDict(zlib.DefaultCompression, nil)
}
// ZLibCompressorLevel returns a new Compressor that implements ZLib at supplied compression level
func ZLibCompressorLevel(level int) Compressor {
return ZLibCompressorLevelDict(level, nil)
}
// ZLibCompressorLevelDict returns a new Compressor that implements ZLib at supplied compression level with supplied dict
func ZLibCompressorLevelDict(level int, dict []byte) Compressor {
return &zlibCompressor{
level: level,
dict: dict,
}
}
func (c *zlibCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
return zlib.NewReaderDict(r, c.dict)
}
func (c *zlibCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
return zlib.NewWriterLevelDict(w, c.level, c.dict)
}
type snappyCompressor struct{}
// SnappyCompressor returns a new Compressor that implements Snappy
func SnappyCompressor() Compressor {
return &snappyCompressor{}
}
func (c *snappyCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
return util.NopReadCloser(snappy.NewReader(r)), nil
}
func (c *snappyCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
return snappy.NewBufferedWriter(w), nil
}
type nopCompressor struct{}
// NoCompression is a Compressor that simply does nothing
func NoCompression() Compressor {
return &nopCompressor{}
}
func (c *nopCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
return util.NopReadCloser(r), nil
}
func (c *nopCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
return util.NopWriteCloser(w), nil
}

291
vendor/codeberg.org/gruf/go-store/storage/disk.go generated vendored Normal file
View file

@ -0,0 +1,291 @@
package storage
import (
"io"
"io/fs"
"os"
"path"
"syscall"
"codeberg.org/gruf/go-bytes"
"codeberg.org/gruf/go-pools"
"codeberg.org/gruf/go-store/util"
)
// DefaultDiskConfig is the default DiskStorage configuration
var DefaultDiskConfig = &DiskConfig{
Overwrite: true,
WriteBufSize: 4096,
Transform: NopTransform(),
Compression: NoCompression(),
}
// DiskConfig defines options to be used when opening a DiskStorage
type DiskConfig struct {
// Transform is the supplied key<-->path KeyTransform
Transform KeyTransform
// WriteBufSize is the buffer size to use when writing file streams (PutStream)
WriteBufSize int
// Overwrite allows overwriting values of stored keys in the storage
Overwrite bool
// Compression is the Compressor to use when reading / writing files, default is no compression
Compression Compressor
}
// getDiskConfig returns a valid DiskConfig for supplied ptr
func getDiskConfig(cfg *DiskConfig) DiskConfig {
// If nil, use default
if cfg == nil {
cfg = DefaultDiskConfig
}
// Assume nil transform == none
if cfg.Transform == nil {
cfg.Transform = NopTransform()
}
// Assume nil compress == none
if cfg.Compression == nil {
cfg.Compression = NoCompression()
}
// Assume 0 buf size == use default
if cfg.WriteBufSize < 1 {
cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize
}
// Return owned config copy
return DiskConfig{
Transform: cfg.Transform,
WriteBufSize: cfg.WriteBufSize,
Overwrite: cfg.Overwrite,
Compression: cfg.Compression,
}
}
// DiskStorage is a Storage implementation that stores directly to a filesystem
type DiskStorage struct {
path string // path is the root path of this store
dots int // dots is the "dotdot" count for the root store path
bufp pools.BufferPool // bufp is the buffer pool for this DiskStorage
config DiskConfig // cfg is the supplied configuration for this store
}
// OpenFile opens a DiskStorage instance for given folder path and configuration
func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
// Clean provided path, ensure ends in '/' (should
// be dir, this helps with file path trimming later)
path = pb.Clean(path) + "/"
// Get checked config
config := getDiskConfig(cfg)
// Attempt to open dir path
file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
if err != nil {
// If not a not-exist error, return
if !os.IsNotExist(err) {
return nil, err
}
// Attempt to make store path dirs
err = os.MkdirAll(path, defaultDirPerms)
if err != nil {
return nil, err
}
// Reopen dir now it's been created
file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
if err != nil {
return nil, err
}
}
defer file.Close()
// Double check this is a dir (NOT a file!)
stat, err := file.Stat()
if err != nil {
return nil, err
} else if !stat.IsDir() {
return nil, errPathIsFile
}
// Return new DiskStorage
return &DiskStorage{
path: path,
dots: util.CountDotdots(path),
bufp: pools.NewBufferPool(config.WriteBufSize),
config: config,
}, nil
}
// Clean implements Storage.Clean()
func (st *DiskStorage) Clean() error {
return util.CleanDirs(st.path)
}
// ReadBytes implements Storage.ReadBytes()
func (st *DiskStorage) ReadBytes(key string) ([]byte, error) {
// Get stream reader for key
rc, err := st.ReadStream(key)
if err != nil {
return nil, err
}
defer rc.Close()
// Read all bytes and return
return io.ReadAll(rc)
}
// ReadStream implements Storage.ReadStream()
func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) {
// Get file path for key
kpath, err := st.filepath(key)
if err != nil {
return nil, err
}
// Attempt to open file (replace ENOENT with our own)
file, err := open(kpath, defaultFileROFlags)
if err != nil {
return nil, errSwapNotFound(err)
}
// Wrap the file in a compressor
cFile, err := st.config.Compression.Reader(file)
if err != nil {
file.Close() // close this here, ignore error
return nil, err
}
// Wrap compressor to ensure file close
return util.ReadCloserWithCallback(cFile, func() {
file.Close()
}), nil
}
// WriteBytes implements Storage.WriteBytes()
func (st *DiskStorage) WriteBytes(key string, value []byte) error {
return st.WriteStream(key, bytes.NewReader(value))
}
// WriteStream implements Storage.WriteStream()
func (st *DiskStorage) WriteStream(key string, r io.Reader) error {
// Get file path for key
kpath, err := st.filepath(key)
if err != nil {
return err
}
// Ensure dirs leading up to file exist
err = os.MkdirAll(path.Dir(kpath), defaultDirPerms)
if err != nil {
return err
}
// Prepare to swap error if need-be
errSwap := errSwapNoop
// Build file RW flags
flags := defaultFileRWFlags
if !st.config.Overwrite {
flags |= syscall.O_EXCL
// Catch + replace err exist
errSwap = errSwapExist
}
// Attempt to open file
file, err := open(kpath, flags)
if err != nil {
return errSwap(err)
}
defer file.Close()
// Wrap the file in a compressor
cFile, err := st.config.Compression.Writer(file)
if err != nil {
return err
}
defer cFile.Close()
// Acquire write buffer
buf := st.bufp.Get()
defer st.bufp.Put(buf)
buf.Grow(st.config.WriteBufSize)
// Copy reader to file
_, err = io.CopyBuffer(cFile, r, buf.B)
return err
}
// Stat implements Storage.Stat()
func (st *DiskStorage) Stat(key string) (bool, error) {
// Get file path for key
kpath, err := st.filepath(key)
if err != nil {
return false, err
}
// Check for file on disk
return stat(kpath)
}
// Remove implements Storage.Remove()
func (st *DiskStorage) Remove(key string) error {
// Get file path for key
kpath, err := st.filepath(key)
if err != nil {
return err
}
// Attempt to remove file
return os.Remove(kpath)
}
// WalkKeys implements Storage.WalkKeys()
func (st *DiskStorage) WalkKeys(opts *WalkKeysOptions) error {
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
// Walk dir for entries
return util.WalkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) {
// Only deal with regular files
if fsentry.Type().IsRegular() {
// Get full item path (without root)
kpath = pb.Join(kpath, fsentry.Name())[len(st.path):]
// Perform provided walk function
opts.WalkFn(entry(st.config.Transform.PathToKey(kpath)))
}
})
}
// filepath checks and returns a formatted filepath for given key
func (st *DiskStorage) filepath(key string) (string, error) {
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
// Calculate transformed key path
key = st.config.Transform.KeyToPath(key)
// Generated joined root path
pb.AppendString(st.path)
pb.AppendString(key)
// If path is dir traversal, and traverses FURTHER
// than store root, this is an error
if util.CountDotdots(pb.StringPtr()) > st.dots {
return "", ErrInvalidKey
}
return pb.String(), nil
}

63
vendor/codeberg.org/gruf/go-store/storage/errors.go generated vendored Normal file
View file

@ -0,0 +1,63 @@
package storage
import (
"fmt"
"syscall"
)
// errorString is our own simple error type
type errorString string
// Error implements error
func (e errorString) Error() string {
return string(e)
}
// Extend appends extra information to an errorString
func (e errorString) Extend(s string, a ...interface{}) errorString {
return errorString(string(e) + ": " + fmt.Sprintf(s, a...))
}
var (
// ErrNotFound is the error returned when a key cannot be found in storage
ErrNotFound = errorString("store/storage: key not found")
// ErrAlreadyExist is the error returned when a key already exists in storage
ErrAlreadyExists = errorString("store/storage: key already exists")
// ErrInvalidkey is the error returned when an invalid key is passed to storage
ErrInvalidKey = errorString("store/storage: invalid key")
// errPathIsFile is returned when a path for a disk config is actually a file
errPathIsFile = errorString("store/storage: path is file")
// errNoHashesWritten is returned when no blocks are written for given input value
errNoHashesWritten = errorString("storage/storage: no hashes written")
// errInvalidNode is returned when read on an invalid node in the store is attempted
errInvalidNode = errorString("store/storage: invalid node")
// errCorruptNodes is returned when nodes with missing blocks are found during a BlockStorage clean
errCorruptNodes = errorString("store/storage: corrupted nodes")
)
// errSwapNoop performs no error swaps
func errSwapNoop(err error) error {
return err
}
// ErrSwapNotFound swaps syscall.ENOENT for ErrNotFound
func errSwapNotFound(err error) error {
if err == syscall.ENOENT {
return ErrNotFound
}
return err
}
// errSwapExist swaps syscall.EEXIST for ErrAlreadyExists
func errSwapExist(err error) error {
if err == syscall.EEXIST {
return ErrAlreadyExists
}
return err
}

48
vendor/codeberg.org/gruf/go-store/storage/fs.go generated vendored Normal file
View file

@ -0,0 +1,48 @@
package storage
import (
"os"
"syscall"
"codeberg.org/gruf/go-store/util"
)
const (
defaultDirPerms = 0755
defaultFilePerms = 0644
defaultFileROFlags = syscall.O_RDONLY
defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR
defaultFileLockFlags = syscall.O_RDONLY | syscall.O_EXCL | syscall.O_CREAT
)
// NOTE:
// These functions are for opening storage files,
// not necessarily for e.g. initial setup (OpenFile)
// open should not be called directly
func open(path string, flags int) (*os.File, error) {
var fd int
err := util.RetryOnEINTR(func() (err error) {
fd, err = syscall.Open(path, flags, defaultFilePerms)
return
})
if err != nil {
return nil, err
}
return os.NewFile(uintptr(fd), path), nil
}
// stat checks for a file on disk
func stat(path string) (bool, error) {
var stat syscall.Stat_t
err := util.RetryOnEINTR(func() error {
return syscall.Stat(path, &stat)
})
if err != nil {
if err == syscall.ENOENT {
err = nil
}
return false, err
}
return true, nil
}

34
vendor/codeberg.org/gruf/go-store/storage/lock.go generated vendored Normal file
View file

@ -0,0 +1,34 @@
package storage
import (
"os"
"syscall"
"codeberg.org/gruf/go-store/util"
)
type lockableFile struct {
*os.File
}
func openLock(path string) (*lockableFile, error) {
file, err := open(path, defaultFileLockFlags)
if err != nil {
return nil, err
}
return &lockableFile{file}, nil
}
func (f *lockableFile) lock() error {
return f.flock(syscall.LOCK_EX | syscall.LOCK_NB)
}
func (f *lockableFile) unlock() error {
return f.flock(syscall.LOCK_UN | syscall.LOCK_NB)
}
func (f *lockableFile) flock(how int) error {
return util.RetryOnEINTR(func() error {
return syscall.Flock(int(f.Fd()), how)
})
}

51
vendor/codeberg.org/gruf/go-store/storage/storage.go generated vendored Normal file
View file

@ -0,0 +1,51 @@
package storage
import (
"io"
)
// StorageEntry defines a key in Storage
type StorageEntry interface {
// Key returns the storage entry's key
Key() string
}
// entry is the simplest possible StorageEntry
type entry string
func (e entry) Key() string {
return string(e)
}
// Storage defines a means of storing and accessing key value pairs
type Storage interface {
// Clean removes unused values and unclutters the storage (e.g. removing empty folders)
Clean() error
// ReadBytes returns the byte value for key in storage
ReadBytes(key string) ([]byte, error)
// ReadStream returns an io.ReadCloser for the value bytes at key in the storage
ReadStream(key string) (io.ReadCloser, error)
// WriteBytes writes the supplied value bytes at key in the storage
WriteBytes(key string, value []byte) error
// WriteStream writes the bytes from supplied reader at key in the storage
WriteStream(key string, r io.Reader) error
// Stat checks if the supplied key is in the storage
Stat(key string) (bool, error)
// Remove attempts to remove the supplied key-value pair from storage
Remove(key string) error
// WalkKeys walks the keys in the storage
WalkKeys(opts *WalkKeysOptions) error
}
// WalkKeysOptions defines how to walk the keys in a storage implementation
type WalkKeysOptions struct {
// WalkFn is the function to apply on each StorageEntry
WalkFn func(StorageEntry)
}

25
vendor/codeberg.org/gruf/go-store/storage/transform.go generated vendored Normal file
View file

@ -0,0 +1,25 @@
package storage
// KeyTransform defines a method of converting store keys to storage paths (and vice-versa)
type KeyTransform interface {
// KeyToPath converts a supplied key to storage path
KeyToPath(string) string
// PathToKey converts a supplied storage path to key
PathToKey(string) string
}
type nopKeyTransform struct{}
// NopTransform returns a nop key transform (i.e. key = path)
func NopTransform() KeyTransform {
return &nopKeyTransform{}
}
func (t *nopKeyTransform) KeyToPath(key string) string {
return key
}
func (t *nopKeyTransform) PathToKey(path string) string {
return path
}

105
vendor/codeberg.org/gruf/go-store/util/fs.go generated vendored Normal file
View file

@ -0,0 +1,105 @@
package util
import (
"io/fs"
"os"
"strings"
"syscall"
"codeberg.org/gruf/go-fastpath"
)
var dotdot = "../"
// CountDotdots returns the number of "dot-dots" (../) in a cleaned filesystem path
func CountDotdots(path string) int {
if !strings.HasSuffix(path, dotdot) {
return 0
}
return strings.Count(path, dotdot)
}
// WalkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry
func WalkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry)) error {
// Read supplied dir path
dirEntries, err := os.ReadDir(path)
if err != nil {
return err
}
// Iter entries
for _, entry := range dirEntries {
// Pass to walk fn
walkFn(path, entry)
// Recurse dir entries
if entry.IsDir() {
err = WalkDir(pb, pb.Join(path, entry.Name()), walkFn)
if err != nil {
return err
}
}
}
return nil
}
// CleanDirs traverses the dir tree of the supplied path, removing any folders with zero children
func CleanDirs(path string) error {
// Acquire builder
pb := GetPathBuilder()
defer PutPathBuilder(pb)
// Get dir entries
entries, err := os.ReadDir(path)
if err != nil {
return err
}
// Recurse dirs
for _, entry := range entries {
if entry.IsDir() {
err := cleanDirs(pb, pb.Join(path, entry.Name()))
if err != nil {
return err
}
}
}
return nil
}
// cleanDirs performs the actual dir cleaning logic for the exported version
func cleanDirs(pb *fastpath.Builder, path string) error {
// Get dir entries
entries, err := os.ReadDir(path)
if err != nil {
return err
}
// If no entries, delete
if len(entries) < 1 {
return os.Remove(path)
}
// Recurse dirs
for _, entry := range entries {
if entry.IsDir() {
err := cleanDirs(pb, pb.Join(path, entry.Name()))
if err != nil {
return err
}
}
}
return nil
}
// RetryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received
func RetryOnEINTR(do func() error) error {
for {
err := do()
if err == syscall.EINTR {
continue
}
return err
}
}

42
vendor/codeberg.org/gruf/go-store/util/io.go generated vendored Normal file
View file

@ -0,0 +1,42 @@
package util
import "io"
// NopReadCloser turns a supplied io.Reader into io.ReadCloser with a nop Close() implementation
func NopReadCloser(r io.Reader) io.ReadCloser {
return &nopReadCloser{r}
}
// NopWriteCloser turns a supplied io.Writer into io.WriteCloser with a nop Close() implementation
func NopWriteCloser(w io.Writer) io.WriteCloser {
return &nopWriteCloser{w}
}
// ReadCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.ReadCloser
func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser {
return &callbackReadCloser{
ReadCloser: rc,
callback: cb,
}
}
// nopReadCloser turns an io.Reader -> io.ReadCloser with a nop Close()
type nopReadCloser struct{ io.Reader }
func (r *nopReadCloser) Close() error { return nil }
// nopWriteCloser turns an io.Writer -> io.WriteCloser with a nop Close()
type nopWriteCloser struct{ io.Writer }
func (w nopWriteCloser) Close() error { return nil }
// callbackReadCloser allows adding our own custom callback to an io.ReadCloser
type callbackReadCloser struct {
io.ReadCloser
callback func()
}
func (c *callbackReadCloser) Close() error {
defer c.callback()
return c.ReadCloser.Close()
}

20
vendor/codeberg.org/gruf/go-store/util/pool.go generated vendored Normal file
View file

@ -0,0 +1,20 @@
package util
import (
"codeberg.org/gruf/go-fastpath"
"codeberg.org/gruf/go-pools"
)
// pathBuilderPool is the global fastpath.Builder pool
var pathBuilderPool = pools.NewPathBuilderPool(512)
// GetPathBuilder fetches a fastpath.Builder object from the pool
func GetPathBuilder() *fastpath.Builder {
return pathBuilderPool.Get()
}
// PutPathBuilder places supplied fastpath.Builder back in the pool
func PutPathBuilder(pb *fastpath.Builder) {
pb.Reset()
pathBuilderPool.Put(pb)
}