[chore] tidy up media manager, add calling func to errors, build-script improvements (#1835)

* media manager tidy-up: de-interface and remove unused PostDataFunc

Signed-off-by: kim <grufwub@gmail.com>

* remove last traces of media.Manager being an interface

Signed-off-by: kim <grufwub@gmail.com>

* update error to provide caller, allow tuneable via build tags

Signed-off-by: kim <grufwub@gmail.com>

* remove kim-specific build script changes

Signed-off-by: kim <grufwub@gmail.com>

* fix merge conflicts

Signed-off-by: kim <grufwub@gmail.com>

* update build-script to support externally setting build variables

Signed-off-by: kim <grufwub@gmail.com>

---------

Signed-off-by: kim <grufwub@gmail.com>
This commit is contained in:
kim 2023-05-28 13:08:35 +01:00 committed by GitHub
commit 5faeb4de20
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
64 changed files with 444 additions and 392 deletions

View file

@ -21,8 +21,10 @@ import (
"context"
"errors"
"fmt"
"io"
"time"
"codeberg.org/gruf/go-iotools"
"codeberg.org/gruf/go-runners"
"codeberg.org/gruf/go-sched"
"codeberg.org/gruf/go-store/v2/storage"
@ -47,112 +49,7 @@ var SupportedEmojiMIMETypes = []string{
mimeImagePng,
}
// Manager provides an interface for managing media: parsing, storing, and retrieving media objects like photos, videos, and gifs.
type Manager interface {
/*
PROCESSING FUNCTIONS
*/
// PreProcessMedia begins the process of decoding and storing the given data as an attachment.
// It will return a pointer to a ProcessingMedia struct upon which further actions can be performed, such as getting
// the finished media, thumbnail, attachment, etc.
//
// data should be a function that the media manager can call to return a reader containing the media data.
//
// postData will be called after data has been called; it can be used to clean up any remaining resources.
// The provided function can be nil, in which case it will not be executed.
//
// accountID should be the account that the media belongs to.
//
// ai is optional and can be nil. Any additional information about the attachment provided will be put in the database.
//
// Note: unlike ProcessMedia, this will NOT queue the media to be asynchronously processed.
PreProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error)
// PreProcessMediaRecache refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote.
//
// Note: unlike ProcessMedia, this will NOT queue the media to be asychronously processed.
PreProcessMediaRecache(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error)
// ProcessMedia will call PreProcessMedia, followed by queuing the media to be processing in the media worker queue.
ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error)
// PreProcessEmoji begins the process of decoding and storing the given data as an emoji.
// It will return a pointer to a ProcessingEmoji struct upon which further actions can be performed, such as getting
// the finished media, thumbnail, attachment, etc.
//
// data should be a function that the media manager can call to return a reader containing the emoji data.
//
// postData will be called after data has been called; it can be used to clean up any remaining resources.
// The provided function can be nil, in which case it will not be executed.
//
// shortcode should be the emoji shortcode without the ':'s around it.
//
// id is the database ID that should be used to store the emoji.
//
// uri is the ActivityPub URI/ID of the emoji.
//
// ai is optional and can be nil. Any additional information about the emoji provided will be put in the database.
//
// Note: unlike ProcessEmoji, this will NOT queue the emoji to be asynchronously processed.
PreProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error)
// ProcessEmoji will call PreProcessEmoji, followed by queuing the emoji to be processing in the emoji worker queue.
ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error)
/*
PRUNING/UNCACHING FUNCTIONS
*/
// PruneAll runs all of the below pruning/uncacheing functions, and then cleans up any resulting
// empty directories from the storage driver. It can be called as a shortcut for calling the below
// pruning functions one by one.
//
// If blocking is true, then any errors encountered during the prune will be combined + returned to
// the caller. If blocking is false, the prune is run in the background and errors are just logged
// instead.
PruneAll(ctx context.Context, mediaCacheRemoteDays int, blocking bool) error
// UncacheRemote uncaches all remote media attachments older than the given amount of days.
//
// In this context, uncacheing means deleting media files from storage and marking the attachment
// as cached=false in the database.
//
// If 'dry' is true, then only a dry run will be performed: nothing will actually be changed.
//
// The returned int is the amount of media that was/would be uncached by this function.
UncacheRemote(ctx context.Context, olderThanDays int, dry bool) (int, error)
// PruneUnusedRemote prunes unused/out of date headers and avatars cached on this instance.
//
// The returned int is the amount of media that was pruned by this function.
PruneUnusedRemote(ctx context.Context, dry bool) (int, error)
// PruneUnusedLocal prunes unused media attachments that were uploaded by
// a user on this instance, but never actually attached to a status, or attached but
// later detached.
//
// The returned int is the amount of media that was pruned by this function.
PruneUnusedLocal(ctx context.Context, dry bool) (int, error)
// PruneOrphaned prunes files that exist in storage but which do not have a corresponding
// entry in the database.
//
// If dry is true, then nothing will be changed, only the amount that *would* be removed
// is returned to the caller.
PruneOrphaned(ctx context.Context, dry bool) (int, error)
/*
REFETCHING FUNCTIONS
Useful when data loss has occurred.
*/
// RefetchEmojis iterates through remote emojis (for the given domain, or all if domain is empty string).
//
// For each emoji, the manager will check whether both the full size and static images are present in storage.
// If not, the manager will refetch and reprocess full size and static images for the emoji.
//
// The provided DereferenceMedia function will be used when it's necessary to refetch something this way.
RefetchEmojis(ctx context.Context, domain string, dereferenceMedia DereferenceMedia) (int, error)
}
type manager struct {
type Manager struct {
state *state.State
}
@ -162,13 +59,24 @@ type manager struct {
// a limited number of media will be processed in parallel. The numbers of workers
// is determined from the $GOMAXPROCS environment variable (usually no. CPU cores).
// See internal/concurrency.NewWorkerPool() documentation for further information.
func NewManager(state *state.State) Manager {
m := &manager{state: state}
func NewManager(state *state.State) *Manager {
m := &Manager{state: state}
scheduleCleanupJobs(m)
return m
}
func (m *manager) PreProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
// PreProcessMedia begins the process of decoding and storing the given data as an attachment.
// It will return a pointer to a ProcessingMedia struct upon which further actions can be performed, such as getting
// the finished media, thumbnail, attachment, etc.
//
// data should be a function that the media manager can call to return a reader containing the media data.
//
// accountID should be the account that the media belongs to.
//
// ai is optional and can be nil. Any additional information about the attachment provided will be put in the database.
//
// Note: unlike ProcessMedia, this will NOT queue the media to be asynchronously processed.
func (m *Manager) PreProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
id, err := id.NewRandomULID()
if err != nil {
return nil, err
@ -248,14 +156,16 @@ func (m *manager) PreProcessMedia(ctx context.Context, data DataFunc, postData P
processingMedia := &ProcessingMedia{
media: attachment,
dataFn: data,
postFn: postData,
mgr: m,
}
return processingMedia, nil
}
func (m *manager) PreProcessMediaRecache(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) {
// PreProcessMediaRecache refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote.
//
// Note: unlike ProcessMedia, this will NOT queue the media to be asychronously processed.
func (m *Manager) PreProcessMediaRecache(ctx context.Context, data DataFunc, attachmentID string) (*ProcessingMedia, error) {
// get the existing attachment from database.
attachment, err := m.state.DB.GetAttachmentByID(ctx, attachmentID)
if err != nil {
@ -265,7 +175,6 @@ func (m *manager) PreProcessMediaRecache(ctx context.Context, data DataFunc, pos
processingMedia := &ProcessingMedia{
media: attachment,
dataFn: data,
postFn: postData,
recache: true, // indicate it's a recache
mgr: m,
}
@ -273,9 +182,10 @@ func (m *manager) PreProcessMediaRecache(ctx context.Context, data DataFunc, pos
return processingMedia, nil
}
func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
// ProcessMedia will call PreProcessMedia, followed by queuing the media to be processing in the media worker queue.
func (m *Manager) ProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
// Create a new processing media object for this media request.
media, err := m.PreProcessMedia(ctx, data, postData, accountID, ai)
media, err := m.PreProcessMedia(ctx, data, accountID, ai)
if err != nil {
return nil, err
}
@ -286,7 +196,22 @@ func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, postData Post
return media, nil
}
func (m *manager) PreProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, emojiID string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) {
// PreProcessEmoji begins the process of decoding and storing the given data as an emoji.
// It will return a pointer to a ProcessingEmoji struct upon which further actions can be performed, such as getting
// the finished media, thumbnail, attachment, etc.
//
// data should be a function that the media manager can call to return a reader containing the emoji data.
//
// shortcode should be the emoji shortcode without the ':'s around it.
//
// id is the database ID that should be used to store the emoji.
//
// uri is the ActivityPub URI/ID of the emoji.
//
// ai is optional and can be nil. Any additional information about the emoji provided will be put in the database.
//
// Note: unlike ProcessEmoji, this will NOT queue the emoji to be asynchronously processed.
func (m *Manager) PreProcessEmoji(ctx context.Context, data DataFunc, shortcode string, emojiID string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) {
instanceAccount, err := m.state.DB.GetInstanceAccount(ctx, "")
if err != nil {
return nil, fmt.Errorf("preProcessEmoji: error fetching this instance account from the db: %s", err)
@ -299,36 +224,38 @@ func (m *manager) PreProcessEmoji(ctx context.Context, data DataFunc, postData P
)
if refresh {
// Look for existing emoji by given ID.
emoji, err = m.state.DB.GetEmojiByID(ctx, emojiID)
if err != nil {
return nil, fmt.Errorf("preProcessEmoji: error fetching emoji to refresh from the db: %s", err)
}
// if this is a refresh, we will end up with new images
// stored for this emoji, so we can use the postData function
// stored for this emoji, so we can use an io.Closer callback
// to perform clean up of the old images from storage
originalPostData := postData
originalData := data
originalImagePath := emoji.ImagePath
originalImageStaticPath := emoji.ImageStaticPath
postData = func(innerCtx context.Context) error {
// trigger the original postData function if it was provided
if originalPostData != nil {
if err := originalPostData(innerCtx); err != nil {
return err
data = func(ctx context.Context) (io.ReadCloser, int64, error) {
// Call original data func.
rc, sz, err := originalData(ctx)
if err != nil {
return nil, 0, err
}
// Wrap closer to cleanup old data.
c := iotools.CloserCallback(rc, func() {
if err := m.state.Storage.Delete(ctx, originalImagePath); err != nil && !errors.Is(err, storage.ErrNotFound) {
log.Errorf(ctx, "error removing old emoji %s@%s from storage: %v", emoji.Shortcode, emoji.Domain, err)
}
}
l := log.WithContext(ctx).
WithField("shortcode@domain", emoji.Shortcode+"@"+emoji.Domain)
l.Debug("postData: cleaning up old emoji files for refreshed emoji")
if err := m.state.Storage.Delete(innerCtx, originalImagePath); err != nil && !errors.Is(err, storage.ErrNotFound) {
l.Errorf("postData: error cleaning up old emoji image at %s for refreshed emoji: %s", originalImagePath, err)
}
if err := m.state.Storage.Delete(innerCtx, originalImageStaticPath); err != nil && !errors.Is(err, storage.ErrNotFound) {
l.Errorf("postData: error cleaning up old emoji static image at %s for refreshed emoji: %s", originalImageStaticPath, err)
}
if err := m.state.Storage.Delete(ctx, originalImageStaticPath); err != nil && !errors.Is(err, storage.ErrNotFound) {
log.Errorf(ctx, "error removing old static emoji %s@%s from storage: %v", emoji.Shortcode, emoji.Domain, err)
}
})
return nil
// Return newly wrapped readcloser and size.
return iotools.ReadCloser(rc, c), sz, nil
}
newPathID, err = id.NewRandomULID()
@ -410,16 +337,16 @@ func (m *manager) PreProcessEmoji(ctx context.Context, data DataFunc, postData P
refresh: refresh,
newPathID: newPathID,
dataFn: data,
postFn: postData,
mgr: m,
}
return processingEmoji, nil
}
func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) {
// ProcessEmoji will call PreProcessEmoji, followed by queuing the emoji to be processing in the emoji worker queue.
func (m *Manager) ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) {
// Create a new processing emoji object for this emoji request.
emoji, err := m.PreProcessEmoji(ctx, data, postData, shortcode, id, uri, ai, refresh)
emoji, err := m.PreProcessEmoji(ctx, data, shortcode, id, uri, ai, refresh)
if err != nil {
return nil, err
}
@ -430,7 +357,7 @@ func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData Post
return emoji, nil
}
func scheduleCleanupJobs(m *manager) {
func scheduleCleanupJobs(m *Manager) {
const day = time.Hour * 24
// Calculate closest midnight.

View file

@ -55,7 +55,7 @@ func (suite *ManagerTestSuite) TestEmojiProcessBlocking() {
emojiID := "01GDQ9G782X42BAMFASKP64343"
emojiURI := "http://localhost:8080/emoji/01GDQ9G782X42BAMFASKP64343"
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, nil, "rainbow_test", emojiID, emojiURI, nil, false)
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, "rainbow_test", emojiID, emojiURI, nil, false)
suite.NoError(err)
// do a blocking call to fetch the emoji
@ -125,7 +125,7 @@ func (suite *ManagerTestSuite) TestEmojiProcessBlockingRefresh() {
emojiID := emojiToUpdate.ID
emojiURI := emojiToUpdate.URI
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, nil, "yell", emojiID, emojiURI, &media.AdditionalEmojiInfo{
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, "yell", emojiID, emojiURI, &media.AdditionalEmojiInfo{
CreatedAt: &emojiToUpdate.CreatedAt,
Domain: &emojiToUpdate.Domain,
ImageRemoteURL: &newImageRemoteURL,
@ -209,7 +209,7 @@ func (suite *ManagerTestSuite) TestEmojiProcessBlockingTooLarge() {
emojiID := "01GDQ9G782X42BAMFASKP64343"
emojiURI := "http://localhost:8080/emoji/01GDQ9G782X42BAMFASKP64343"
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, nil, "big_panda", emojiID, emojiURI, nil, false)
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, "big_panda", emojiID, emojiURI, nil, false)
suite.NoError(err)
// do a blocking call to fetch the emoji
@ -233,7 +233,7 @@ func (suite *ManagerTestSuite) TestEmojiProcessBlockingTooLargeNoSizeGiven() {
emojiID := "01GDQ9G782X42BAMFASKP64343"
emojiURI := "http://localhost:8080/emoji/01GDQ9G782X42BAMFASKP64343"
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, nil, "big_panda", emojiID, emojiURI, nil, false)
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, "big_panda", emojiID, emojiURI, nil, false)
suite.NoError(err)
// do a blocking call to fetch the emoji
@ -258,7 +258,7 @@ func (suite *ManagerTestSuite) TestEmojiProcessBlockingNoFileSizeGiven() {
emojiURI := "http://localhost:8080/emoji/01GDQ9G782X42BAMFASKP64343"
// process the media with no additional info provided
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, nil, "rainbow_test", emojiID, emojiURI, nil, false)
processingEmoji, err := suite.manager.ProcessEmoji(ctx, data, "rainbow_test", emojiID, emojiURI, nil, false)
suite.NoError(err)
// do a blocking call to fetch the emoji
@ -319,7 +319,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -391,7 +391,7 @@ func (suite *ManagerTestSuite) TestSlothVineProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -467,7 +467,7 @@ func (suite *ManagerTestSuite) TestLongerMp4ProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -543,7 +543,7 @@ func (suite *ManagerTestSuite) TestBirdnestMp4ProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -621,7 +621,7 @@ func (suite *ManagerTestSuite) TestNotAnMp4ProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// pre processing should go fine but...
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// we should get an error while loading
@ -646,7 +646,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingNoContentLengthGiven
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -719,7 +719,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingReadCloser() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -791,7 +791,7 @@ func (suite *ManagerTestSuite) TestPngNoAlphaChannelProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -863,7 +863,7 @@ func (suite *ManagerTestSuite) TestPngAlphaChannelProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -932,18 +932,10 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithCallback() {
return io.NopCloser(bytes.NewBuffer(b)), int64(len(b)), nil
}
// test the callback function by setting a simple boolean
var calledPostData bool
postData := func(_ context.Context) error {
calledPostData = true
return nil
}
suite.False(calledPostData) // not called yet (obvs)
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, postData, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
@ -953,9 +945,6 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithCallback() {
suite.NoError(err)
suite.NotNil(attachment)
// the post data callback should have been called
suite.True(calledPostData)
// make sure it's got the stuff set on it that we expect
// the attachment ID and accountID we expect
suite.Equal(attachmentID, attachment.ID)
@ -1019,7 +1008,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessAsync() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
@ -1101,7 +1090,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegQueueSpamming() {
inProcess := []*media.ProcessingMedia{}
for i := 0; i < spam; i++ {
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
inProcess = append(inProcess, processingMedia)
}
@ -1202,7 +1191,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithDiskStorage() {
suite.manager = diskManager
// process the media with no additional info provided
processingMedia, err := diskManager.ProcessMedia(ctx, data, nil, accountID, nil)
processingMedia, err := diskManager.ProcessMedia(ctx, data, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()

View file

@ -35,7 +35,7 @@ type MediaStandardTestSuite struct {
db db.DB
storage *storage.Driver
state state.State
manager media.Manager
manager *media.Manager
transportController transport.Controller
testAttachments map[string]*gtsmodel.MediaAttachment
testAccounts map[string]*gtsmodel.Account

View file

@ -36,16 +36,15 @@ import (
// ProcessingEmoji represents an emoji currently processing. It exposes
// various functions for retrieving data from the process.
type ProcessingEmoji struct {
instAccID string // instance account ID
emoji *gtsmodel.Emoji // processing emoji details
refresh bool // whether this is an existing emoji being refreshed
newPathID string // new emoji path ID to use if refreshed
dataFn DataFunc // load-data function, returns media stream
postFn PostDataCallbackFunc // post data callback function
done bool // done is set when process finishes with non ctx canceled type error
proc runners.Processor // proc helps synchronize only a singular running processing instance
err error // error stores permanent error value when done
mgr *manager // mgr instance (access to db / storage)
instAccID string // instance account ID
emoji *gtsmodel.Emoji // processing emoji details
refresh bool // whether this is an existing emoji being refreshed
newPathID string // new emoji path ID to use if refreshed
dataFn DataFunc // load-data function, returns media stream
done bool // done is set when process finishes with non ctx canceled type error
proc runners.Processor // proc helps synchronize only a singular running processing instance
err error // error stores permanent error value when done
mgr *Manager // mgr instance (access to db / storage)
}
// EmojiID returns the ID of the underlying emoji without blocking processing.
@ -158,17 +157,6 @@ func (p *ProcessingEmoji) load(ctx context.Context) (*gtsmodel.Emoji, bool, erro
// and updates the underlying attachment fields as necessary. It will then stream
// bytes from p's reader directly into storage so that it can be retrieved later.
func (p *ProcessingEmoji) store(ctx context.Context) error {
defer func() {
if p.postFn == nil {
return
}
// Ensure post callback gets called.
if err := p.postFn(ctx); err != nil {
log.Errorf(ctx, "error executing postdata function: %v", err)
}
}()
// Load media from provided data fn.
rc, sz, err := p.dataFn(ctx)
if err != nil {

View file

@ -40,12 +40,11 @@ import (
type ProcessingMedia struct {
media *gtsmodel.MediaAttachment // processing media attachment details
dataFn DataFunc // load-data function, returns media stream
postFn PostDataCallbackFunc // post data callback function
recache bool // recaching existing (uncached) media
done bool // done is set when process finishes with non ctx canceled type error
proc runners.Processor // proc helps synchronize only a singular running processing instance
err error // error stores permanent error value when done
mgr *manager // mgr instance (access to db / storage)
mgr *Manager // mgr instance (access to db / storage)
}
// AttachmentID returns the ID of the underlying media attachment without blocking processing.
@ -143,17 +142,6 @@ func (p *ProcessingMedia) load(ctx context.Context) (*gtsmodel.MediaAttachment,
// and updates the underlying attachment fields as necessary. It will then stream
// bytes from p's reader directly into storage so that it can be retrieved later.
func (p *ProcessingMedia) store(ctx context.Context) error {
defer func() {
if p.postFn == nil {
return
}
// ensure post callback gets called.
if err := p.postFn(ctx); err != nil {
log.Errorf(ctx, "error executing postdata function: %v", err)
}
}()
// Load media from provided data fun
rc, sz, err := p.dataFn(ctx)
if err != nil {

View file

@ -37,7 +37,14 @@ const (
unusedLocalAttachmentDays = 3 // Number of days to keep local media in storage if not attached to a status.
)
func (m *manager) PruneAll(ctx context.Context, mediaCacheRemoteDays int, blocking bool) error {
// PruneAll runs all of the below pruning/uncacheing functions, and then cleans up any resulting
// empty directories from the storage driver. It can be called as a shortcut for calling the below
// pruning functions one by one.
//
// If blocking is true, then any errors encountered during the prune will be combined + returned to
// the caller. If blocking is false, the prune is run in the background and errors are just logged
// instead.
func (m *Manager) PruneAll(ctx context.Context, mediaCacheRemoteDays int, blocking bool) error {
const dry = false
f := func(innerCtx context.Context) error {
@ -93,7 +100,10 @@ func (m *manager) PruneAll(ctx context.Context, mediaCacheRemoteDays int, blocki
return nil
}
func (m *manager) PruneUnusedRemote(ctx context.Context, dry bool) (int, error) {
// PruneUnusedRemote prunes unused/out of date headers and avatars cached on this instance.
//
// The returned int is the amount of media that was pruned by this function.
func (m *Manager) PruneUnusedRemote(ctx context.Context, dry bool) (int, error) {
var (
totalPruned int
maxID string
@ -152,7 +162,12 @@ func (m *manager) PruneUnusedRemote(ctx context.Context, dry bool) (int, error)
return totalPruned, nil
}
func (m *manager) PruneOrphaned(ctx context.Context, dry bool) (int, error) {
// PruneOrphaned prunes files that exist in storage but which do not have a corresponding
// entry in the database.
//
// If dry is true, then nothing will be changed, only the amount that *would* be removed
// is returned to the caller.
func (m *Manager) PruneOrphaned(ctx context.Context, dry bool) (int, error) {
// Emojis are stored under the instance account, so we
// need the ID of the instance account for the next part.
instanceAccount, err := m.state.DB.GetInstanceAccount(ctx, "")
@ -200,7 +215,7 @@ func (m *manager) PruneOrphaned(ctx context.Context, dry bool) (int, error) {
return m.removeFiles(ctx, orphanedKeys...)
}
func (m *manager) orphaned(ctx context.Context, key string, instanceAccountID string) (bool, error) {
func (m *Manager) orphaned(ctx context.Context, key string, instanceAccountID string) (bool, error) {
pathParts := regexes.FilePath.FindStringSubmatch(key)
if len(pathParts) != 6 {
// This doesn't match our expectations so
@ -239,7 +254,15 @@ func (m *manager) orphaned(ctx context.Context, key string, instanceAccountID st
return orphaned, nil
}
func (m *manager) UncacheRemote(ctx context.Context, olderThanDays int, dry bool) (int, error) {
// UncacheRemote uncaches all remote media attachments older than the given amount of days.
//
// In this context, uncacheing means deleting media files from storage and marking the attachment
// as cached=false in the database.
//
// If 'dry' is true, then only a dry run will be performed: nothing will actually be changed.
//
// The returned int is the amount of media that was/would be uncached by this function.
func (m *Manager) UncacheRemote(ctx context.Context, olderThanDays int, dry bool) (int, error) {
if olderThanDays < 0 {
return 0, nil
}
@ -276,7 +299,12 @@ func (m *manager) UncacheRemote(ctx context.Context, olderThanDays int, dry bool
return totalPruned, nil
}
func (m *manager) PruneUnusedLocal(ctx context.Context, dry bool) (int, error) {
// PruneUnusedLocal prunes unused media attachments that were uploaded by
// a user on this instance, but never actually attached to a status, or attached but
// later detached.
//
// The returned int is the amount of media that was pruned by this function.
func (m *Manager) PruneUnusedLocal(ctx context.Context, dry bool) (int, error) {
olderThan := time.Now().Add(-time.Hour * 24 * time.Duration(unusedLocalAttachmentDays))
if dry {
@ -313,7 +341,7 @@ func (m *manager) PruneUnusedLocal(ctx context.Context, dry bool) (int, error) {
Handy little helpers
*/
func (m *manager) deleteAttachment(ctx context.Context, attachment *gtsmodel.MediaAttachment) error {
func (m *Manager) deleteAttachment(ctx context.Context, attachment *gtsmodel.MediaAttachment) error {
if _, err := m.removeFiles(ctx, attachment.File.Path, attachment.Thumbnail.Path); err != nil {
return err
}
@ -322,7 +350,7 @@ func (m *manager) deleteAttachment(ctx context.Context, attachment *gtsmodel.Med
return m.state.DB.DeleteAttachment(ctx, attachment.ID)
}
func (m *manager) uncacheAttachment(ctx context.Context, attachment *gtsmodel.MediaAttachment) error {
func (m *Manager) uncacheAttachment(ctx context.Context, attachment *gtsmodel.MediaAttachment) error {
if _, err := m.removeFiles(ctx, attachment.File.Path, attachment.Thumbnail.Path); err != nil {
return err
}
@ -332,7 +360,7 @@ func (m *manager) uncacheAttachment(ctx context.Context, attachment *gtsmodel.Me
return m.state.DB.UpdateAttachment(ctx, attachment, "cached")
}
func (m *manager) removeFiles(ctx context.Context, keys ...string) (int, error) {
func (m *Manager) removeFiles(ctx context.Context, keys ...string) (int, error) {
errs := make(gtserror.MultiError, 0, len(keys))
for _, key := range keys {

View file

@ -312,7 +312,7 @@ func (suite *PruneTestSuite) TestUncacheAndRecache() {
testStatusAttachment,
testHeader,
} {
processingRecache, err := suite.manager.PreProcessMediaRecache(ctx, data, nil, original.ID)
processingRecache, err := suite.manager.PreProcessMediaRecache(ctx, data, original.ID)
suite.NoError(err)
// synchronously load the recached attachment

View file

@ -32,7 +32,13 @@ import (
type DereferenceMedia func(ctx context.Context, iri *url.URL) (io.ReadCloser, int64, error)
func (m *manager) RefetchEmojis(ctx context.Context, domain string, dereferenceMedia DereferenceMedia) (int, error) {
// RefetchEmojis iterates through remote emojis (for the given domain, or all if domain is empty string).
//
// For each emoji, the manager will check whether both the full size and static images are present in storage.
// If not, the manager will refetch and reprocess full size and static images for the emoji.
//
// The provided DereferenceMedia function will be used when it's necessary to refetch something this way.
func (m *Manager) RefetchEmojis(ctx context.Context, domain string, dereferenceMedia DereferenceMedia) (int, error) {
// normalize domain
if domain == "" {
domain = db.EmojiAllDomains
@ -107,7 +113,7 @@ func (m *manager) RefetchEmojis(ctx context.Context, domain string, dereferenceM
return dereferenceMedia(ctx, emojiImageIRI)
}
processingEmoji, err := m.PreProcessEmoji(ctx, dataFunc, nil, emoji.Shortcode, emoji.ID, emoji.URI, &AdditionalEmojiInfo{
processingEmoji, err := m.PreProcessEmoji(ctx, dataFunc, emoji.Shortcode, emoji.ID, emoji.URI, &AdditionalEmojiInfo{
Domain: &emoji.Domain,
ImageRemoteURL: &emoji.ImageRemoteURL,
ImageStaticRemoteURL: &emoji.ImageStaticRemoteURL,
@ -131,7 +137,7 @@ func (m *manager) RefetchEmojis(ctx context.Context, domain string, dereferenceM
return totalRefetched, nil
}
func (m *manager) emojiRequiresRefetch(ctx context.Context, emoji *gtsmodel.Emoji) (bool, error) {
func (m *Manager) emojiRequiresRefetch(ctx context.Context, emoji *gtsmodel.Emoji) (bool, error) {
if has, err := m.state.Storage.Has(ctx, emoji.ImagePath); err != nil {
return false, err
} else if !has {

View file

@ -110,9 +110,3 @@ type AdditionalEmojiInfo struct {
// DataFunc represents a function used to retrieve the raw bytes of a piece of media.
type DataFunc func(ctx context.Context) (reader io.ReadCloser, fileSize int64, err error)
// PostDataCallbackFunc represents a function executed after the DataFunc has been executed,
// and the returned reader has been read. It can be used to clean up any remaining resources.
//
// This can be set to nil, and will then not be executed.
type PostDataCallbackFunc func(ctx context.Context) error