[bugfix] Fix remote media pruning failing if media already gone (#548)

* fix error check of prune to allow missing files

* update go-store library, add test for pruning item with db entry but no file

Signed-off-by: kim <grufwub@gmail.com>

* remove now-unneccessary error check

Signed-off-by: kim <grufwub@gmail.com>

Co-authored-by: kim <grufwub@gmail.com>
This commit is contained in:
tobi 2022-05-08 19:49:45 +02:00 committed by GitHub
commit 5004e0a9da
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
50 changed files with 4682 additions and 1785 deletions

View file

@ -1,7 +1,8 @@
package kv
import (
"codeberg.org/gruf/go-errors"
"errors"
"codeberg.org/gruf/go-mutexes"
"codeberg.org/gruf/go-store/storage"
)

View file

@ -1,9 +1,9 @@
package kv
import (
"errors"
"io"
"codeberg.org/gruf/go-errors"
"codeberg.org/gruf/go-mutexes"
)

View file

@ -1,7 +1,9 @@
package storage
import (
"bytes"
"crypto/sha256"
"fmt"
"io"
"io/fs"
"os"
@ -9,8 +11,9 @@ import (
"sync"
"syscall"
"codeberg.org/gruf/go-bytes"
"codeberg.org/gruf/go-errors"
"codeberg.org/gruf/go-byteutil"
"codeberg.org/gruf/go-errors/v2"
"codeberg.org/gruf/go-fastcopy"
"codeberg.org/gruf/go-hashenc"
"codeberg.org/gruf/go-pools"
"codeberg.org/gruf/go-store/util"
@ -34,6 +37,9 @@ type BlockConfig struct {
// BlockSize is the chunking size to use when splitting and storing blocks of data
BlockSize int
// ReadBufSize is the buffer size to use when reading node files
ReadBufSize int
// WriteBufSize is the buffer size to use when writing file streams (PutStream)
WriteBufSize int
@ -81,13 +87,14 @@ func getBlockConfig(cfg *BlockConfig) BlockConfig {
// "node" file is finally created containing an array of hashes contained within
// this value
type BlockStorage struct {
path string // path is the root path of this store
blockPath string // blockPath is the joined root path + block path prefix
nodePath string // nodePath is the joined root path + node path prefix
config BlockConfig // cfg is the supplied configuration for this store
hashPool sync.Pool // hashPool is this store's hashEncoder pool
bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool
lock *Lock // lock is the opened lockfile for this storage instance
path string // path is the root path of this store
blockPath string // blockPath is the joined root path + block path prefix
nodePath string // nodePath is the joined root path + node path prefix
config BlockConfig // cfg is the supplied configuration for this store
hashPool sync.Pool // hashPool is this store's hashEncoder pool
bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool
cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool
lock *Lock // lock is the opened lockfile for this storage instance
// NOTE:
// BlockStorage does not need to lock each of the underlying block files
@ -154,8 +161,8 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
bufSz = config.WriteBufSize
}
// Return new BlockStorage
return &BlockStorage{
// Prepare BlockStorage
st := &BlockStorage{
path: path,
blockPath: pb.Join(path, blockPathPrefix),
nodePath: pb.Join(path, nodePathPrefix),
@ -167,7 +174,12 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
},
bufpool: pools.NewBufferPool(bufSz),
lock: lock,
}, nil
}
// Set copypool buffer size
st.cppool.Buffer(config.ReadBufSize)
return st, nil
}
// Clean implements storage.Clean()
@ -297,7 +309,7 @@ func (st *BlockStorage) Clean() error {
for key := range nodes {
nodeKeys = append(nodeKeys, key)
}
return errCorruptNodes.Extend("%v", nodeKeys)
return fmt.Errorf("store/storage: corrupted nodes: %v", nodeKeys)
}
return nil
@ -337,7 +349,7 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
file, err := open(npath, defaultFileROFlags)
if err != nil {
st.lock.Done()
return nil, err
return nil, errSwapNotFound(err)
}
defer file.Close()
@ -347,13 +359,12 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
// Write file contents to node
node := node{}
_, err = io.CopyBuffer(
_, err = st.cppool.Copy(
&nodeWriter{
node: &node,
buf: hbuf,
},
file,
nil,
)
if err != nil {
st.lock.Done()
@ -375,14 +386,14 @@ func (st *BlockStorage) readBlock(key string) ([]byte, error) {
// Attempt to open RO file
file, err := open(bpath, defaultFileROFlags)
if err != nil {
return nil, err
return nil, wrap(errCorruptNode, err)
}
defer file.Close()
// Wrap the file in a compressor
cFile, err := st.config.Compression.Reader(file)
if err != nil {
return nil, err
return nil, wrap(errCorruptNode, err)
}
defer cFile.Close()
@ -470,10 +481,10 @@ loop:
sum := hc.EncodeSum(buf.B)
// Append to the node's hashes
node.hashes = append(node.hashes, sum.String())
node.hashes = append(node.hashes, sum)
// If already on disk, skip
has, err := st.statBlock(sum.StringPtr())
has, err := st.statBlock(sum)
if err != nil {
st.bufpool.Put(buf)
return err
@ -497,7 +508,7 @@ loop:
}()
// Write block to store at hash
err = st.writeBlock(sum.StringPtr(), buf.B[:n])
err = st.writeBlock(sum, buf.B[:n])
if err != nil {
onceErr.Store(err)
return
@ -564,7 +575,7 @@ func (st *BlockStorage) writeBlock(hash string, value []byte) error {
// Attempt to open RW file
file, err := open(bpath, defaultFileRWFlags)
if err != nil {
if err == ErrAlreadyExists {
if err == syscall.EEXIST {
err = nil /* race issue describe in struct NOTE */
}
return err
@ -626,8 +637,12 @@ func (st *BlockStorage) Remove(key string) error {
return ErrClosed
}
// Attempt to remove file
return os.Remove(kpath)
// Remove at path (we know this is file)
if err := unlink(kpath); err != nil {
return errSwapNotFound(err)
}
return nil
}
// Close implements Storage.Close()
@ -762,7 +777,7 @@ func (r *nodeReader) Read(b []byte) (int, error) {
// which is useful when calculated node file is being read from the store
type nodeWriter struct {
node *node
buf *bytes.Buffer
buf *byteutil.Buffer
}
func (w *nodeWriter) Write(b []byte) (int, error) {
@ -874,7 +889,7 @@ func newHashEncoder() *hashEncoder {
}
// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum()
func (henc *hashEncoder) EncodeSum(src []byte) bytes.Bytes {
func (henc *hashEncoder) EncodeSum(src []byte) string {
henc.henc.EncodeSum(henc.ebuf, src)
return bytes.ToBytes(henc.ebuf)
return string(henc.ebuf)
}

View file

@ -318,8 +318,12 @@ func (st *DiskStorage) Remove(key string) error {
return ErrClosed
}
// Attempt to remove file
return os.Remove(kpath)
// Remove at path (we know this is file)
if err := unlink(kpath); err != nil {
return errSwapNotFound(err)
}
return nil
}
// Close implements Storage.Close()

View file

@ -1,52 +1,65 @@
package storage
import (
"fmt"
"errors"
"syscall"
)
// errorString is our own simple error type
type errorString string
// Error implements error
func (e errorString) Error() string {
return string(e)
}
// Extend appends extra information to an errorString
func (e errorString) Extend(s string, a ...interface{}) errorString {
return errorString(string(e) + ": " + fmt.Sprintf(s, a...))
}
var (
// ErrClosed is returned on operations on a closed storage
ErrClosed = errorString("store/storage: closed")
ErrClosed = errors.New("store/storage: closed")
// ErrNotFound is the error returned when a key cannot be found in storage
ErrNotFound = errorString("store/storage: key not found")
ErrNotFound = errors.New("store/storage: key not found")
// ErrAlreadyExist is the error returned when a key already exists in storage
ErrAlreadyExists = errorString("store/storage: key already exists")
ErrAlreadyExists = errors.New("store/storage: key already exists")
// ErrInvalidkey is the error returned when an invalid key is passed to storage
ErrInvalidKey = errorString("store/storage: invalid key")
// errPathIsFile is returned when a path for a disk config is actually a file
errPathIsFile = errorString("store/storage: path is file")
// errNoHashesWritten is returned when no blocks are written for given input value
errNoHashesWritten = errorString("storage/storage: no hashes written")
// errInvalidNode is returned when read on an invalid node in the store is attempted
errInvalidNode = errorString("store/storage: invalid node")
// errCorruptNodes is returned when nodes with missing blocks are found during a BlockStorage clean
errCorruptNodes = errorString("store/storage: corrupted nodes")
ErrInvalidKey = errors.New("store/storage: invalid key")
// ErrAlreadyLocked is returned on fail opening a storage lockfile
ErrAlreadyLocked = errorString("store/storage: storage lock already open")
ErrAlreadyLocked = errors.New("store/storage: storage lock already open")
// errPathIsFile is returned when a path for a disk config is actually a file
errPathIsFile = errors.New("store/storage: path is file")
// errNoHashesWritten is returned when no blocks are written for given input value
errNoHashesWritten = errors.New("storage/storage: no hashes written")
// errInvalidNode is returned when read on an invalid node in the store is attempted
errInvalidNode = errors.New("store/storage: invalid node")
// errCorruptNode is returned when a block fails to be opened / read during read of a node.
errCorruptNode = errors.New("store/storage: corrupted node")
)
// wrappedError allows wrapping together an inner with outer error.
type wrappedError struct {
inner error
outer error
}
// wrap will return a new wrapped error from given inner and outer errors.
func wrap(outer, inner error) *wrappedError {
return &wrappedError{
inner: inner,
outer: outer,
}
}
func (e *wrappedError) Is(target error) bool {
return e.outer == target || e.inner == target
}
func (e *wrappedError) Error() string {
return e.outer.Error() + ": " + e.inner.Error()
}
func (e *wrappedError) Unwrap() error {
return e.inner
}
// errSwapNoop performs no error swaps
func errSwapNoop(err error) error {
return err

View file

@ -9,8 +9,8 @@ import (
const (
// default file permission bits
defaultDirPerms = 0755
defaultFilePerms = 0644
defaultDirPerms = 0o755
defaultFilePerms = 0o644
// default file open flags
defaultFileROFlags = syscall.O_RDONLY
@ -22,7 +22,7 @@ const (
// These functions are for opening storage files,
// not necessarily for e.g. initial setup (OpenFile)
// open should not be called directly
// open should not be called directly.
func open(path string, flags int) (*os.File, error) {
var fd int
err := util.RetryOnEINTR(func() (err error) {
@ -35,7 +35,7 @@ func open(path string, flags int) (*os.File, error) {
return os.NewFile(uintptr(fd), path), nil
}
// stat checks for a file on disk
// stat checks for a file on disk.
func stat(path string) (bool, error) {
var stat syscall.Stat_t
err := util.RetryOnEINTR(func() error {
@ -49,3 +49,17 @@ func stat(path string) (bool, error) {
}
return true, nil
}
// unlink removes a file (not dir!) on disk.
func unlink(path string) error {
return util.RetryOnEINTR(func() error {
return syscall.Unlink(path)
})
}
// rmdir removes a dir (not file!) on disk.
func rmdir(path string) error {
return util.RetryOnEINTR(func() error {
return syscall.Rmdir(path)
})
}