mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-10-29 11:42:24 -05:00
[chore] media and emoji refactoring (#3000)
* start updating media manager interface ready for storing attachments / emoji right away
* store emoji and media as uncached immediately, then (re-)cache on Processing{}.Load()
* remove now unused media workers
* fix tests and issues
* fix another test!
* fix emoji activitypub uri setting behaviour, fix remainder of test compilation issues
* fix more tests
* fix (most of) remaining tests, add debouncing to repeatedly failing media / emojis
* whoops, rebase issue
* remove kim's whacky experiments
* do some reshuffling, ensure emoji uri gets set
* ensure marked as not cached on cleanup
* tweaks to media / emoji processing to handle context canceled better
* ensure newly fetched emojis actually get set in returned slice
* use different varnames to be a bit more obvious
* move emoji refresh rate limiting to dereferencer
* add exported dereferencer functions for remote media, use these for recaching in processor
* add check for nil attachment in updateAttachment()
* remove unused emoji and media fields + columns
* see previous commit
* fix old migrations expecting image_updated_at to exists (from copies of old models)
* remove freshness checking code (seems to be broken...)
* fix error arg causing nil ptr exception
* finish documentating functions with comments, slight tweaks to media / emoji deref error logic
* remove some extra unneeded boolean checking
* finish writing documentation (code comments) for exported media manager methods
* undo changes to migration snapshot gtsmodels, updated failing migration to have its own snapshot
* move doesColumnExist() to util.go in migrations package
This commit is contained in:
parent
fa710057c8
commit
21bb324156
48 changed files with 2578 additions and 1926 deletions
|
|
@ -24,14 +24,16 @@ import (
|
|||
"slices"
|
||||
|
||||
"codeberg.org/gruf/go-bytesize"
|
||||
"codeberg.org/gruf/go-errors/v2"
|
||||
errorsv2 "codeberg.org/gruf/go-errors/v2"
|
||||
"codeberg.org/gruf/go-runners"
|
||||
"github.com/h2non/filetype"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/config"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/regexes"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/storage"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/uris"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/util"
|
||||
)
|
||||
|
|
@ -40,7 +42,6 @@ import (
|
|||
// various functions for retrieving data from the process.
|
||||
type ProcessingEmoji struct {
|
||||
emoji *gtsmodel.Emoji // processing emoji details
|
||||
existing bool // indicates whether this is an existing emoji ID being refreshed / recached
|
||||
newPathID string // new emoji path ID to use when being refreshed
|
||||
dataFn DataFunc // load-data function, returns media stream
|
||||
done bool // done is set when process finishes with non ctx canceled type error
|
||||
|
|
@ -49,61 +50,72 @@ type ProcessingEmoji struct {
|
|||
mgr *Manager // mgr instance (access to db / storage)
|
||||
}
|
||||
|
||||
// EmojiID returns the ID of the underlying emoji without blocking processing.
|
||||
func (p *ProcessingEmoji) EmojiID() string {
|
||||
// ID returns the ID of the underlying emoji.
|
||||
func (p *ProcessingEmoji) ID() string {
|
||||
return p.emoji.ID // immutable, safe outside mutex.
|
||||
}
|
||||
|
||||
// LoadEmoji blocks until the static and fullsize image has been processed, and then returns the completed emoji.
|
||||
func (p *ProcessingEmoji) LoadEmoji(ctx context.Context) (*gtsmodel.Emoji, error) {
|
||||
// Attempt to load synchronously.
|
||||
func (p *ProcessingEmoji) Load(ctx context.Context) (*gtsmodel.Emoji, error) {
|
||||
emoji, done, err := p.load(ctx)
|
||||
if err == nil {
|
||||
// No issue, return media.
|
||||
return emoji, nil
|
||||
}
|
||||
|
||||
if !done {
|
||||
// Provided context was cancelled, e.g. request cancelled
|
||||
// early. Queue this item for asynchronous processing.
|
||||
log.Warnf(ctx, "reprocessing emoji %s after canceled ctx", p.emoji.ID)
|
||||
p.mgr.state.Workers.Media.Queue.Push(p.Process)
|
||||
// On a context-canceled error (marked as !done), requeue for loading.
|
||||
p.mgr.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
|
||||
if _, _, err := p.load(ctx); err != nil {
|
||||
log.Errorf(ctx, "error loading emoji: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return nil, err
|
||||
return emoji, err
|
||||
}
|
||||
|
||||
// Process allows the receiving object to fit the runners.WorkerFunc signature. It performs a (blocking) load and logs on error.
|
||||
func (p *ProcessingEmoji) Process(ctx context.Context) {
|
||||
if _, _, err := p.load(ctx); err != nil {
|
||||
log.Errorf(ctx, "error processing emoji: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// load performs a concurrency-safe load of ProcessingEmoji, only marking itself as complete when returned error is NOT a context cancel.
|
||||
func (p *ProcessingEmoji) load(ctx context.Context) (*gtsmodel.Emoji, bool, error) {
|
||||
var (
|
||||
done bool
|
||||
err error
|
||||
)
|
||||
|
||||
// load is the package private form of load() that is wrapped to catch context canceled.
|
||||
func (p *ProcessingEmoji) load(ctx context.Context) (
|
||||
emoji *gtsmodel.Emoji,
|
||||
done bool,
|
||||
err error,
|
||||
) {
|
||||
err = p.proc.Process(func() error {
|
||||
if p.done {
|
||||
if done = p.done; done {
|
||||
// Already proc'd.
|
||||
return p.err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// This is only done when ctx NOT cancelled.
|
||||
done = err == nil || !errors.IsV2(err,
|
||||
done = (err == nil || !errorsv2.IsV2(err,
|
||||
context.Canceled,
|
||||
context.DeadlineExceeded,
|
||||
)
|
||||
))
|
||||
|
||||
if !done {
|
||||
return
|
||||
}
|
||||
|
||||
// Anything from here, we
|
||||
// need to ensure happens
|
||||
// (i.e. no ctx canceled).
|
||||
ctx = gtscontext.WithValues(
|
||||
context.Background(),
|
||||
ctx, // values
|
||||
)
|
||||
|
||||
// On error, clean
|
||||
// downloaded files.
|
||||
if err != nil {
|
||||
p.cleanup(ctx)
|
||||
}
|
||||
|
||||
if !done {
|
||||
return
|
||||
}
|
||||
|
||||
// Update with latest details, whatever happened.
|
||||
e := p.mgr.state.DB.UpdateEmoji(ctx, p.emoji)
|
||||
if e != nil {
|
||||
log.Errorf(ctx, "error updating emoji in db: %v", e)
|
||||
}
|
||||
|
||||
// Store final values.
|
||||
p.done = true
|
||||
p.err = err
|
||||
|
|
@ -111,39 +123,31 @@ func (p *ProcessingEmoji) load(ctx context.Context) (*gtsmodel.Emoji, bool, erro
|
|||
|
||||
// Attempt to store media and calculate
|
||||
// full-size media attachment details.
|
||||
//
|
||||
// This will update p.emoji as it goes.
|
||||
if err = p.store(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Finish processing by reloading media into
|
||||
// memory to get dimension and generate a thumb.
|
||||
//
|
||||
// This will update p.emoji as it goes.
|
||||
if err = p.finish(ctx); err != nil {
|
||||
return err
|
||||
return err //nolint:revive
|
||||
}
|
||||
|
||||
if p.existing {
|
||||
// Existing emoji we're updating, so only update.
|
||||
err = p.mgr.state.DB.UpdateEmoji(ctx, p.emoji)
|
||||
return err
|
||||
}
|
||||
|
||||
// New emoji media, first time caching.
|
||||
err = p.mgr.state.DB.PutEmoji(ctx, p.emoji)
|
||||
return err
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, done, err
|
||||
}
|
||||
|
||||
return p.emoji, done, nil
|
||||
emoji = p.emoji
|
||||
return
|
||||
}
|
||||
|
||||
// store calls the data function attached to p if it hasn't been called yet,
|
||||
// and updates the underlying attachment fields as necessary. It will then stream
|
||||
// bytes from p's reader directly into storage so that it can be retrieved later.
|
||||
func (p *ProcessingEmoji) store(ctx context.Context) error {
|
||||
// Load media from provided data fn.
|
||||
// Load media from provided data fun
|
||||
rc, sz, err := p.dataFn(ctx)
|
||||
if err != nil {
|
||||
return gtserror.Newf("error executing data function: %w", err)
|
||||
|
|
@ -168,8 +172,9 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
|
|||
|
||||
// Check that provided size isn't beyond max. We check beforehand
|
||||
// so that we don't attempt to stream the emoji into storage if not needed.
|
||||
if size := bytesize.Size(sz); sz > 0 && size > maxSize {
|
||||
return gtserror.Newf("given emoji size %s greater than max allowed %s", size, maxSize)
|
||||
if sz > 0 && sz > int64(maxSize) {
|
||||
sz := bytesize.Size(sz) // improves log readability
|
||||
return gtserror.Newf("given emoji size %s greater than max allowed %s", sz, maxSize)
|
||||
}
|
||||
|
||||
// Prepare to read bytes from
|
||||
|
|
@ -196,14 +201,14 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
|
|||
|
||||
// Initial file size was misreported, so we didn't read
|
||||
// fully into hdrBuf. Reslice it to the size we did read.
|
||||
log.Warnf(ctx,
|
||||
"recovered from misreported file size; reported %d; read %d",
|
||||
fileSize, n,
|
||||
)
|
||||
hdrBuf = hdrBuf[:n]
|
||||
fileSize = n
|
||||
p.emoji.ImageFileSize = fileSize
|
||||
}
|
||||
|
||||
// Parse file type info from header buffer.
|
||||
// This should only ever error if the buffer
|
||||
// is empty (ie., the attachment is 0 bytes).
|
||||
info, err := filetype.Match(hdrBuf)
|
||||
if err != nil {
|
||||
return gtserror.Newf("error parsing file type: %w", err)
|
||||
|
|
@ -227,10 +232,13 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
|
|||
pathID = p.emoji.ID
|
||||
}
|
||||
|
||||
// Determine instance account ID from already generated image static path.
|
||||
instanceAccID := regexes.FilePath.FindStringSubmatch(p.emoji.ImageStaticPath)[1]
|
||||
// Determine instance account ID from generated image static path.
|
||||
instanceAccID, ok := getInstanceAccountID(p.emoji.ImageStaticPath)
|
||||
if !ok {
|
||||
return gtserror.Newf("invalid emoji static path; no instance account id: %s", p.emoji.ImageStaticPath)
|
||||
}
|
||||
|
||||
// Calculate emoji file path.
|
||||
// Calculate final media attachment file path.
|
||||
p.emoji.ImagePath = uris.StoragePathForAttachment(
|
||||
instanceAccID,
|
||||
string(TypeEmoji),
|
||||
|
|
@ -239,32 +247,32 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
|
|||
info.Extension,
|
||||
)
|
||||
|
||||
// This shouldn't already exist, but we do a check as it's worth logging.
|
||||
// File shouldn't already exist in storage at this point,
|
||||
// but we do a check as it's worth logging / cleaning up.
|
||||
if have, _ := p.mgr.state.Storage.Has(ctx, p.emoji.ImagePath); have {
|
||||
log.Warnf(ctx, "emoji already exists at storage path: %s", p.emoji.ImagePath)
|
||||
log.Warnf(ctx, "emoji already exists at: %s", p.emoji.ImagePath)
|
||||
|
||||
// Attempt to remove existing emoji at storage path (might be broken / out-of-date)
|
||||
if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath); err != nil {
|
||||
return gtserror.Newf("error removing emoji from storage: %v", err)
|
||||
return gtserror.Newf("error removing emoji %s from storage: %v", p.emoji.ImagePath, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Write the final image reader stream to our storage.
|
||||
wroteSize, err := p.mgr.state.Storage.PutStream(ctx, p.emoji.ImagePath, r)
|
||||
sz, err = p.mgr.state.Storage.PutStream(ctx, p.emoji.ImagePath, r)
|
||||
if err != nil {
|
||||
return gtserror.Newf("error writing emoji to storage: %w", err)
|
||||
}
|
||||
|
||||
// Once again check size in case none was provided previously.
|
||||
if size := bytesize.Size(wroteSize); size > maxSize {
|
||||
if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath); err != nil {
|
||||
log.Errorf(ctx, "error removing too-large-emoji from storage: %v", err)
|
||||
}
|
||||
|
||||
return gtserror.Newf("calculated emoji size %s greater than max allowed %s", size, maxSize)
|
||||
// Perform final size check in case none was
|
||||
// given previously, or size was mis-reported.
|
||||
// (error here will later perform p.cleanup()).
|
||||
if sz > int64(maxSize) {
|
||||
sz := bytesize.Size(sz) // improves log readability
|
||||
return gtserror.Newf("written emoji size %s greater than max allowed %s", sz, maxSize)
|
||||
}
|
||||
|
||||
// Fill in remaining attachment data now it's stored.
|
||||
// Fill in remaining emoji data now it's stored.
|
||||
p.emoji.ImageURL = uris.URIForAttachment(
|
||||
instanceAccID,
|
||||
string(TypeEmoji),
|
||||
|
|
@ -273,14 +281,14 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
|
|||
info.Extension,
|
||||
)
|
||||
p.emoji.ImageContentType = info.MIME.Value
|
||||
p.emoji.ImageFileSize = int(wroteSize)
|
||||
p.emoji.ImageFileSize = int(sz)
|
||||
p.emoji.Cached = util.Ptr(true)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ProcessingEmoji) finish(ctx context.Context) error {
|
||||
// Fetch a stream to the original file in storage.
|
||||
// Get a stream to the original file for further processing.
|
||||
rc, err := p.mgr.state.Storage.GetStream(ctx, p.emoji.ImagePath)
|
||||
if err != nil {
|
||||
return gtserror.Newf("error loading file from storage: %w", err)
|
||||
|
|
@ -293,32 +301,69 @@ func (p *ProcessingEmoji) finish(ctx context.Context) error {
|
|||
return gtserror.Newf("error decoding image: %w", err)
|
||||
}
|
||||
|
||||
// The image should be in-memory by now.
|
||||
// staticImg should be in-memory by
|
||||
// now so we're done with storage.
|
||||
if err := rc.Close(); err != nil {
|
||||
return gtserror.Newf("error closing file: %w", err)
|
||||
}
|
||||
|
||||
// This shouldn't already exist, but we do a check as it's worth logging.
|
||||
// Static img shouldn't exist in storage at this point,
|
||||
// but we do a check as it's worth logging / cleaning up.
|
||||
if have, _ := p.mgr.state.Storage.Has(ctx, p.emoji.ImageStaticPath); have {
|
||||
log.Warnf(ctx, "static emoji already exists at storage path: %s", p.emoji.ImagePath)
|
||||
log.Warnf(ctx, "static emoji already exists at: %s", p.emoji.ImageStaticPath)
|
||||
|
||||
// Attempt to remove static existing emoji at storage path (might be broken / out-of-date)
|
||||
// Attempt to remove existing thumbnail (might be broken / out-of-date).
|
||||
if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImageStaticPath); err != nil {
|
||||
return gtserror.Newf("error removing static emoji from storage: %v", err)
|
||||
return gtserror.Newf("error removing static emoji %s from storage: %v", p.emoji.ImageStaticPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create an emoji PNG encoder stream.
|
||||
// Create emoji PNG encoder stream.
|
||||
enc := staticImg.ToPNG()
|
||||
|
||||
// Stream-encode the PNG static image into storage.
|
||||
// Stream-encode the PNG static emoji image into our storage driver.
|
||||
sz, err := p.mgr.state.Storage.PutStream(ctx, p.emoji.ImageStaticPath, enc)
|
||||
if err != nil {
|
||||
return gtserror.Newf("error stream-encoding static emoji to storage: %w", err)
|
||||
}
|
||||
|
||||
// Set written image size.
|
||||
// Set final written thumb size.
|
||||
p.emoji.ImageStaticFileSize = int(sz)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// cleanup will remove any traces of processing emoji from storage,
|
||||
// and perform any other necessary cleanup steps after failure.
|
||||
func (p *ProcessingEmoji) cleanup(ctx context.Context) {
|
||||
var err error
|
||||
|
||||
if p.emoji.ImagePath != "" {
|
||||
// Ensure emoji file at path is deleted from storage.
|
||||
err = p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath)
|
||||
if err != nil && !storage.IsNotFound(err) {
|
||||
log.Errorf(ctx, "error deleting %s: %v", p.emoji.ImagePath, err)
|
||||
}
|
||||
}
|
||||
|
||||
if p.emoji.ImageStaticPath != "" {
|
||||
// Ensure emoji static file at path is deleted from storage.
|
||||
err = p.mgr.state.Storage.Delete(ctx, p.emoji.ImageStaticPath)
|
||||
if err != nil && !storage.IsNotFound(err) {
|
||||
log.Errorf(ctx, "error deleting %s: %v", p.emoji.ImageStaticPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure marked as not cached.
|
||||
p.emoji.Cached = util.Ptr(false)
|
||||
}
|
||||
|
||||
// getInstanceAccountID determines the instance account ID from
|
||||
// emoji static image storage path. returns false on failure.
|
||||
func getInstanceAccountID(staticPath string) (string, bool) {
|
||||
matches := regexes.FilePath.FindStringSubmatch(staticPath)
|
||||
if len(matches) < 2 {
|
||||
return "", false
|
||||
}
|
||||
return matches[1], true
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue