[feature/performance] support uncaching remote emoji + scheduled cleanup functions (#1987)

This commit is contained in:
kim 2023-07-24 13:14:13 +01:00 committed by GitHub
commit 9eff0d46e4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 1287 additions and 219 deletions

View file

@ -51,12 +51,7 @@ type Manager struct {
state *state.State
}
// NewManager returns a media manager with the given db and underlying storage.
//
// A worker pool will also be initialized for the manager, to ensure that only
// a limited number of media will be processed in parallel. The numbers of workers
// is determined from the $GOMAXPROCS environment variable (usually no. CPU cores).
// See internal/concurrency.NewWorkerPool() documentation for further information.
// NewManager returns a media manager with given state.
func NewManager(state *state.State) *Manager {
m := &Manager{state: state}
return m
@ -159,7 +154,7 @@ func (m *Manager) PreProcessMedia(ctx context.Context, data DataFunc, accountID
return processingMedia, nil
}
// PreProcessMediaRecache refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote.
// PreProcessMediaRecache refetches, reprocesses, and recaches an existing attachment that has been uncached via cleaner pruning.
//
// Note: unlike ProcessMedia, this will NOT queue the media to be asychronously processed.
func (m *Manager) PreProcessMediaRecache(ctx context.Context, data DataFunc, attachmentID string) (*ProcessingMedia, error) {
@ -209,17 +204,18 @@ func (m *Manager) ProcessMedia(ctx context.Context, data DataFunc, accountID str
//
// Note: unlike ProcessEmoji, this will NOT queue the emoji to be asynchronously processed.
func (m *Manager) PreProcessEmoji(ctx context.Context, data DataFunc, shortcode string, emojiID string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) {
instanceAccount, err := m.state.DB.GetInstanceAccount(ctx, "")
if err != nil {
return nil, gtserror.Newf("error fetching this instance account from the db: %s", err)
}
var (
newPathID string
emoji *gtsmodel.Emoji
now = time.Now()
)
// Fetch the local instance account for emoji path generation.
instanceAcc, err := m.state.DB.GetInstanceAccount(ctx, "")
if err != nil {
return nil, gtserror.Newf("error fetching instance account: %w", err)
}
if refresh {
// Look for existing emoji by given ID.
emoji, err = m.state.DB.GetEmojiByID(ctx, emojiID)
@ -261,8 +257,8 @@ func (m *Manager) PreProcessEmoji(ctx context.Context, data DataFunc, shortcode
}
// store + serve static image at new path ID
emoji.ImageStaticURL = uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeStatic), newPathID, mimePng)
emoji.ImageStaticPath = fmt.Sprintf("%s/%s/%s/%s.%s", instanceAccount.ID, TypeEmoji, SizeStatic, newPathID, mimePng)
emoji.ImageStaticURL = uris.GenerateURIForAttachment(instanceAcc.ID, string(TypeEmoji), string(SizeStatic), newPathID, mimePng)
emoji.ImageStaticPath = fmt.Sprintf("%s/%s/%s/%s.%s", instanceAcc.ID, TypeEmoji, SizeStatic, newPathID, mimePng)
emoji.Shortcode = shortcode
emoji.URI = uri
@ -278,12 +274,12 @@ func (m *Manager) PreProcessEmoji(ctx context.Context, data DataFunc, shortcode
Domain: "", // assume our own domain unless told otherwise
ImageRemoteURL: "",
ImageStaticRemoteURL: "",
ImageURL: "", // we don't know yet
ImageStaticURL: uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeStatic), emojiID, mimePng), // all static emojis are encoded as png
ImagePath: "", // we don't know yet
ImageStaticPath: fmt.Sprintf("%s/%s/%s/%s.%s", instanceAccount.ID, TypeEmoji, SizeStatic, emojiID, mimePng), // all static emojis are encoded as png
ImageContentType: "", // we don't know yet
ImageStaticContentType: mimeImagePng, // all static emojis are encoded as png
ImageURL: "", // we don't know yet
ImageStaticURL: uris.GenerateURIForAttachment(instanceAcc.ID, string(TypeEmoji), string(SizeStatic), emojiID, mimePng), // all static emojis are encoded as png
ImagePath: "", // we don't know yet
ImageStaticPath: fmt.Sprintf("%s/%s/%s/%s.%s", instanceAcc.ID, TypeEmoji, SizeStatic, emojiID, mimePng), // all static emojis are encoded as png
ImageContentType: "", // we don't know yet
ImageStaticContentType: mimeImagePng, // all static emojis are encoded as png
ImageFileSize: 0,
ImageStaticFileSize: 0,
Disabled: &disabled,
@ -329,9 +325,8 @@ func (m *Manager) PreProcessEmoji(ctx context.Context, data DataFunc, shortcode
}
processingEmoji := &ProcessingEmoji{
instAccID: instanceAccount.ID,
emoji: emoji,
refresh: refresh,
existing: refresh,
newPathID: newPathID,
dataFn: data,
mgr: m,
@ -340,6 +335,26 @@ func (m *Manager) PreProcessEmoji(ctx context.Context, data DataFunc, shortcode
return processingEmoji, nil
}
// PreProcessEmojiRecache refetches, reprocesses, and recaches an existing emoji that has been uncached via cleaner pruning.
//
// Note: unlike ProcessEmoji, this will NOT queue the emoji to be asychronously processed.
func (m *Manager) PreProcessEmojiRecache(ctx context.Context, data DataFunc, emojiID string) (*ProcessingEmoji, error) {
// get the existing emoji from the database.
emoji, err := m.state.DB.GetEmojiByID(ctx, emojiID)
if err != nil {
return nil, err
}
processingEmoji := &ProcessingEmoji{
emoji: emoji,
dataFn: data,
existing: true, // inidcate recache
mgr: m,
}
return processingEmoji, nil
}
// ProcessEmoji will call PreProcessEmoji, followed by queuing the emoji to be processing in the emoji worker queue.
func (m *Manager) ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) {
// Create a new processing emoji object for this emoji request.

View file

@ -31,16 +31,16 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/regexes"
"github.com/superseriousbusiness/gotosocial/internal/uris"
)
// ProcessingEmoji represents an emoji currently processing. It exposes
// various functions for retrieving data from the process.
type ProcessingEmoji struct {
instAccID string // instance account ID
emoji *gtsmodel.Emoji // processing emoji details
refresh bool // whether this is an existing emoji being refreshed
newPathID string // new emoji path ID to use if refreshed
existing bool // indicates whether this is an existing emoji ID being refreshed / recached
newPathID string // new emoji path ID to use when being refreshed
dataFn DataFunc // load-data function, returns media stream
done bool // done is set when process finishes with non ctx canceled type error
proc runners.Processor // proc helps synchronize only a singular running processing instance
@ -121,24 +121,9 @@ func (p *ProcessingEmoji) load(ctx context.Context) (*gtsmodel.Emoji, bool, erro
return err
}
if p.refresh {
columns := []string{
"image_remote_url",
"image_static_remote_url",
"image_url",
"image_static_url",
"image_path",
"image_static_path",
"image_content_type",
"image_file_size",
"image_static_file_size",
"image_updated_at",
"shortcode",
"uri",
}
// Existing emoji we're refreshing, so only need to update.
err = p.mgr.state.DB.UpdateEmoji(ctx, p.emoji, columns...)
if p.existing {
// Existing emoji we're updating, so only update.
err = p.mgr.state.DB.UpdateEmoji(ctx, p.emoji)
return err
}
@ -217,7 +202,7 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
var pathID string
if p.refresh {
if p.newPathID != "" {
// This is a refreshed emoji with a new
// path ID that this will be stored under.
pathID = p.newPathID
@ -226,10 +211,13 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
pathID = p.emoji.ID
}
// Determine instance account ID from already generated image static path.
instanceAccID := regexes.FilePath.FindStringSubmatch(p.emoji.ImageStaticPath)[1]
// Calculate emoji file path.
p.emoji.ImagePath = fmt.Sprintf(
"%s/%s/%s/%s.%s",
p.instAccID,
instanceAccID,
TypeEmoji,
SizeOriginal,
pathID,
@ -258,12 +246,13 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath); err != nil {
log.Errorf(ctx, "error removing too-large-emoji from storage: %v", err)
}
return gtserror.Newf("calculated emoji size %s greater than max allowed %s", size, maxSize)
}
// Fill in remaining attachment data now it's stored.
p.emoji.ImageURL = uris.GenerateURIForAttachment(
p.instAccID,
instanceAccID,
string(TypeEmoji),
string(SizeOriginal),
pathID,
@ -271,6 +260,10 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
)
p.emoji.ImageContentType = info.MIME.Value
p.emoji.ImageFileSize = int(sz)
p.emoji.Cached = func() *bool {
ok := true
return &ok
}()
return nil
}
@ -297,6 +290,7 @@ func (p *ProcessingEmoji) finish(ctx context.Context) error {
// This shouldn't already exist, but we do a check as it's worth logging.
if have, _ := p.mgr.state.Storage.Has(ctx, p.emoji.ImageStaticPath); have {
log.Warnf(ctx, "static emoji already exists at storage path: %s", p.emoji.ImagePath)
// Attempt to remove static existing emoji at storage path (might be broken / out-of-date)
if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImageStaticPath); err != nil {
return gtserror.Newf("error removing static emoji from storage: %v", err)