[feature] Clean up/uncache remote media (#407)

* Add whereNotEmptyAndNotNull

* Add GetRemoteOlderThanDays

* Add GetRemoteOlderThanDays

* Add PruneRemote to Manager interface

* Start implementing PruneRemote

* add new attachment + status to tests

* fix up and test GetRemoteOlderThan

* fix bad import

* PruneRemote: return number pruned

* add Cached column to mediaattachment

* update + test pruneRemote

* update mediaTest

* use Cached column

* upstep bun to latest version

* embed structs in mediaAttachment

* migrate mediaAttachment to new format

* don't default cached to true

* select only remote media

* update db dependencies

* step bun back to last working version

* update pruneRemote to use Cached field

* fix storage path of test attachments

* add recache logic to manager

* fix trimmed aspect ratio

* test prune and recache

* return errwithcode

* tidy up different paths for emoji vs attachment

* fix incorrect thumbnail type being stored

* expose TransportController to media processor

* implement tee-ing recached content

* add thoughts of dog to test fedi attachments

* test get remote files

* add comment on PruneRemote

* add postData cleanup to recache

* test thumbnail fetching

* add incredible diagram

* go mod tidy

* buffer pipes for recache streaming

* test for client stops reading after 1kb

* add media-remote-cache-days to config

* add cron package

* wrap logrus so it's available to cron

* start and stop cron jobs gracefully
This commit is contained in:
tobi 2022-03-07 11:08:26 +01:00 committed by GitHub
commit 07727753b9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
424 changed files with 637100 additions and 176498 deletions

View file

@ -21,11 +21,16 @@ package media
import (
"context"
"errors"
"fmt"
"runtime"
"time"
"codeberg.org/gruf/go-runners"
"codeberg.org/gruf/go-store/kv"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/db"
)
@ -61,6 +66,12 @@ type Manager interface {
//
// ai is optional and can be nil. Any additional information about the emoji provided will be put in the database.
ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error)
// RecacheMedia refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote.
RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error)
// PruneRemote prunes all remote media cached on this instance that's older than the given amount of days.
// 'Pruning' in this context means removing the locally stored data of the attachment (both thumbnail and full size),
// and setting 'cached' to false on the associated attachment.
PruneRemote(ctx context.Context, olderThanDays int) (int, error)
// NumWorkers returns the total number of workers available to this manager.
NumWorkers() int
// QueueSize returns the total capacity of the queue.
@ -76,11 +87,12 @@ type Manager interface {
}
type manager struct {
db db.DB
storage *kv.KVStore
pool runners.WorkerPool
numWorkers int
queueSize int
db db.DB
storage *kv.KVStore
pool runners.WorkerPool
stopCronJobs func() error
numWorkers int
queueSize int
}
// NewManager returns a media manager with the given db and underlying storage.
@ -97,8 +109,10 @@ type manager struct {
// For a 4 core machine, this will be 2 workers, and a queue length of 20.
// For a single or 2-core machine, the media manager will get 1 worker, and a queue of length 10.
func NewManager(database db.DB, storage *kv.KVStore) (Manager, error) {
numWorkers := runtime.NumCPU() / 2
// configure the worker pool
// make sure we always have at least 1 worker even on single-core machines
numWorkers := runtime.NumCPU() / 2
if numWorkers == 0 {
numWorkers = 1
}
@ -112,11 +126,61 @@ func NewManager(database db.DB, storage *kv.KVStore) (Manager, error) {
queueSize: queueSize,
}
// start the worker pool
if start := m.pool.Start(); !start {
return nil, errors.New("could not start worker pool")
}
logrus.Debugf("started media manager worker pool with %d workers and queue capacity of %d", numWorkers, queueSize)
// start remote cache cleanup cronjob if configured
cacheCleanupDays := viper.GetInt(config.Keys.MediaRemoteCacheDays)
if cacheCleanupDays != 0 {
// we need a way of cancelling running jobs if the media manager is told to stop
pruneCtx, pruneCancel := context.WithCancel(context.Background())
// create a new cron instance and add a function to it
c := cron.New(cron.WithLogger(&logrusWrapper{}))
pruneFunc := func() {
begin := time.Now()
pruned, err := m.PruneRemote(pruneCtx, cacheCleanupDays)
if err != nil {
logrus.Errorf("media manager: error pruning remote cache: %s", err)
return
}
logrus.Infof("media manager: pruned %d remote cache entries in %s", pruned, time.Since(begin))
}
// run every night
entryID, err := c.AddFunc("@midnight", pruneFunc)
if err != nil {
pruneCancel()
return nil, fmt.Errorf("error starting media manager remote cache cleanup job: %s", err)
}
// since we're running a cron job, we should define how the manager should stop them
m.stopCronJobs = func() error {
// try to stop any jobs gracefully by waiting til they're finished
cronCtx := c.Stop()
select {
case <-cronCtx.Done():
logrus.Infof("media manager: cron finished jobs and stopped gracefully")
case <-time.After(1 * time.Minute):
logrus.Infof("media manager: cron didn't stop after 60 seconds, will force close")
break
}
// whether the job is finished neatly or we had to wait a minute, cancel the context on the prune job
pruneCancel()
return nil
}
// now start all the cron stuff we've lined up
c.Start()
logrus.Infof("started media manager remote cache cleanup job: will run next at %s", c.Entry(entryID).Next)
}
return m, nil
}
@ -168,6 +232,30 @@ func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData Post
return processingEmoji, nil
}
func (m *manager) RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) {
processingRecache, err := m.preProcessRecache(ctx, data, postData, attachmentID)
if err != nil {
return nil, err
}
logrus.Tracef("RecacheMedia: about to enqueue recache with attachmentID %s, queue length is %d", processingRecache.AttachmentID(), m.pool.Queue())
m.pool.Enqueue(func(innerCtx context.Context) {
select {
case <-innerCtx.Done():
// if the inner context is done that means the worker pool is closing, so we should just return
return
default:
// start loading the media already for the caller's convenience
if _, err := processingRecache.LoadAttachment(innerCtx); err != nil {
logrus.Errorf("RecacheMedia: error processing recache with attachmentID %s: %s", processingRecache.AttachmentID(), err)
}
}
})
logrus.Tracef("RecacheMedia: succesfully queued recache with attachmentID %s, queue length is %d", processingRecache.AttachmentID(), m.pool.Queue())
return processingRecache, nil
}
func (m *manager) NumWorkers() int {
return m.numWorkers
}
@ -186,10 +274,15 @@ func (m *manager) ActiveWorkers() int {
func (m *manager) Stop() error {
logrus.Info("stopping media manager worker pool")
stopped := m.pool.Stop()
if !stopped {
return errors.New("could not stop media manager worker pool")
}
if m.stopCronJobs != nil { // only defined if cron jobs are actually running
logrus.Info("stopping media manager cache cleanup jobs")
return m.stopCronJobs()
}
return nil
}

View file

@ -31,7 +31,7 @@ import (
"codeberg.org/gruf/go-store/kv"
"codeberg.org/gruf/go-store/storage"
"github.com/stretchr/testify/suite"
gtsmodel "github.com/superseriousbusiness/gotosocial/internal/db/bundb/migrations/20211113114307_init"
gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/media"
)

View file

@ -22,6 +22,7 @@ import (
"codeberg.org/gruf/go-store/kv"
"github.com/stretchr/testify/suite"
"github.com/superseriousbusiness/gotosocial/internal/db"
gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/testrig"
)
@ -29,9 +30,10 @@ import (
type MediaStandardTestSuite struct {
suite.Suite
db db.DB
storage *kv.KVStore
manager media.Manager
db db.DB
storage *kv.KVStore
manager media.Manager
testAttachments map[string]*gtsmodel.MediaAttachment
}
func (suite *MediaStandardTestSuite) SetupSuite() {
@ -45,6 +47,7 @@ func (suite *MediaStandardTestSuite) SetupSuite() {
func (suite *MediaStandardTestSuite) SetupTest() {
testrig.StandardStorageSetup(suite.storage, "../../testrig/media")
testrig.StandardDBSetup(suite.db, nil)
suite.testAttachments = testrig.NewTestAttachments()
suite.manager = testrig.NewTestMediaManager(suite.db, suite.storage)
}

View file

@ -66,6 +66,9 @@ type ProcessingMedia struct {
// track whether this media has already been put in the databse
insertedInDB bool
// true if this is a recache, false if it's brand new media
recache bool
}
// AttachmentID returns the ID of the underlying media attachment without blocking processing.
@ -93,8 +96,16 @@ func (p *ProcessingMedia) LoadAttachment(ctx context.Context) (*gtsmodel.MediaAt
// store the result in the database before returning it
if !p.insertedInDB {
if err := p.database.Put(ctx, p.attachment); err != nil {
return nil, err
if p.recache {
// if it's a recache we should only need to update
if err := p.database.UpdateByPrimaryKey(ctx, p.attachment); err != nil {
return nil, err
}
} else {
// otherwise we need to really PUT it
if err := p.database.Put(ctx, p.attachment); err != nil {
return nil, err
}
}
p.insertedInDB = true
}
@ -305,6 +316,7 @@ func (p *ProcessingMedia) store(ctx context.Context) error {
if err := p.storage.PutStream(p.attachment.File.Path, clean); err != nil {
return fmt.Errorf("store: error storing stream: %s", err)
}
p.attachment.Cached = true
// if the original reader is a readcloser, close it since we're done with it now
if rc, ok := reader.(io.ReadCloser); ok {
@ -360,6 +372,7 @@ func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, postData P
Thumbnail: thumbnail,
Avatar: false,
Header: false,
Cached: false,
}
// check if we have additional info to add to the attachment,
@ -418,3 +431,24 @@ func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, postData P
return processingMedia, nil
}
func (m *manager) preProcessRecache(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) {
// get the existing attachment
attachment, err := m.db.GetAttachmentByID(ctx, attachmentID)
if err != nil {
return nil, err
}
processingMedia := &ProcessingMedia{
attachment: attachment,
data: data,
postData: postData,
thumbState: int32(received),
fullSizeState: int32(received),
database: m.db,
storage: m.storage,
recache: true, // indicate it's a recache
}
return processingMedia, nil
}

View file

@ -0,0 +1,96 @@
/*
GoToSocial
Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package media
import (
"context"
"fmt"
"time"
"codeberg.org/gruf/go-store/storage"
"github.com/sirupsen/logrus"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
)
// amount of media attachments to select at a time from the db when pruning
const selectPruneLimit = 20
func (m *manager) PruneRemote(ctx context.Context, olderThanDays int) (int, error) {
var totalPruned int
// convert days into a duration string
olderThanHoursString := fmt.Sprintf("%dh", olderThanDays*24)
// parse the duration string into a duration
olderThanHours, err := time.ParseDuration(olderThanHoursString)
if err != nil {
return totalPruned, fmt.Errorf("PruneRemote: %d", err)
}
// 'subtract' that from the time now to give our threshold
olderThan := time.Now().Add(-olderThanHours)
logrus.Infof("PruneRemote: pruning media older than %s", olderThan)
// select 20 attachments at a time and prune them
for attachments, err := m.db.GetRemoteOlderThan(ctx, olderThan, selectPruneLimit); err == nil && len(attachments) != 0; attachments, err = m.db.GetRemoteOlderThan(ctx, olderThan, selectPruneLimit) {
// use the age of the oldest attachment (the last one in the slice) as the next 'older than' value
l := len(attachments)
logrus.Tracef("PruneRemote: got %d attachments older than %s", l, olderThan)
olderThan = attachments[l-1].CreatedAt
// prune each attachment
for _, attachment := range attachments {
if err := m.PruneOne(ctx, attachment); err != nil {
return totalPruned, err
}
totalPruned++
}
}
// make sure we don't have a real error when we leave the loop
if err != nil && err != db.ErrNoEntries {
return totalPruned, err
}
logrus.Infof("PruneRemote: finished pruning remote media: pruned %d entries", totalPruned)
return totalPruned, nil
}
func (m *manager) PruneOne(ctx context.Context, attachment *gtsmodel.MediaAttachment) error {
if attachment.File.Path != "" {
// delete the full size attachment from storage
logrus.Tracef("PruneOne: deleting %s", attachment.File.Path)
if err := m.storage.Delete(attachment.File.Path); err != nil && err != storage.ErrNotFound {
return err
}
attachment.Cached = false
}
if attachment.Thumbnail.Path != "" {
// delete the thumbnail from storage
logrus.Tracef("PruneOne: deleting %s", attachment.Thumbnail.Path)
if err := m.storage.Delete(attachment.Thumbnail.Path); err != nil && err != storage.ErrNotFound {
return err
}
attachment.Cached = false
}
// update the attachment to reflect that we no longer have it cached
return m.db.UpdateByPrimaryKey(ctx, attachment)
}

View file

@ -0,0 +1,111 @@
/*
GoToSocial
Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package media_test
import (
"bytes"
"context"
"io"
"os"
"testing"
"codeberg.org/gruf/go-store/storage"
"github.com/stretchr/testify/suite"
)
type PruneRemoteTestSuite struct {
MediaStandardTestSuite
}
func (suite *PruneRemoteTestSuite) TestPruneRemote() {
testAttachment := suite.testAttachments["remote_account_1_status_1_attachment_1"]
suite.True(testAttachment.Cached)
totalPruned, err := suite.manager.PruneRemote(context.Background(), 1)
suite.NoError(err)
suite.Equal(1, totalPruned)
prunedAttachment, err := suite.db.GetAttachmentByID(context.Background(), testAttachment.ID)
suite.NoError(err)
// the media should no longer be cached
suite.False(prunedAttachment.Cached)
}
func (suite *PruneRemoteTestSuite) TestPruneRemoteTwice() {
totalPruned, err := suite.manager.PruneRemote(context.Background(), 1)
suite.NoError(err)
suite.Equal(1, totalPruned)
// final prune should prune nothing, since the first prune already happened
totalPrunedAgain, err := suite.manager.PruneRemote(context.Background(), 1)
suite.NoError(err)
suite.Equal(0, totalPrunedAgain)
}
func (suite *PruneRemoteTestSuite) TestPruneAndRecache() {
ctx := context.Background()
testAttachment := suite.testAttachments["remote_account_1_status_1_attachment_1"]
totalPruned, err := suite.manager.PruneRemote(ctx, 1)
suite.NoError(err)
suite.Equal(1, totalPruned)
// media should no longer be stored
_, err = suite.storage.Get(testAttachment.File.Path)
suite.Error(err)
suite.ErrorIs(err, storage.ErrNotFound)
_, err = suite.storage.Get(testAttachment.Thumbnail.Path)
suite.Error(err)
suite.ErrorIs(err, storage.ErrNotFound)
// now recache the image....
data := func(_ context.Context) (io.Reader, int, error) {
// load bytes from a test image
b, err := os.ReadFile("../../testrig/media/thoughtsofdog-original.jpeg")
if err != nil {
panic(err)
}
return bytes.NewBuffer(b), len(b), nil
}
processingRecache, err := suite.manager.RecacheMedia(ctx, data, nil, testAttachment.ID)
suite.NoError(err)
// synchronously load the recached attachment
recachedAttachment, err := processingRecache.LoadAttachment(ctx)
suite.NoError(err)
suite.NotNil(recachedAttachment)
// recachedAttachment should be basically the same as the old attachment
suite.True(recachedAttachment.Cached)
suite.Equal(testAttachment.ID, recachedAttachment.ID)
suite.Equal(testAttachment.File.Path, recachedAttachment.File.Path) // file should be stored in the same place
suite.Equal(testAttachment.Thumbnail.Path, recachedAttachment.Thumbnail.Path) // as should the thumbnail
suite.EqualValues(testAttachment.FileMeta, recachedAttachment.FileMeta) // and the filemeta should be the same
// recached files should be back in storage
_, err = suite.storage.Get(recachedAttachment.File.Path)
suite.NoError(err)
_, err = suite.storage.Get(recachedAttachment.Thumbnail.Path)
suite.NoError(err)
}
func TestPruneRemoteTestSuite(t *testing.T) {
suite.Run(t, &PruneRemoteTestSuite{})
}

View file

@ -23,6 +23,7 @@ import (
"fmt"
"github.com/h2non/filetype"
"github.com/sirupsen/logrus"
)
// parseContentType parses the MIME content type from a file, returning it as a string in the form (eg., "image/jpeg").
@ -103,3 +104,17 @@ func ParseMediaSize(s string) (Size, error) {
}
return "", fmt.Errorf("%s not a recognized MediaSize", s)
}
// logrusWrapper is just a util for passing the logrus logger into the cron logging system.
type logrusWrapper struct {
}
// Info logs routine messages about cron's operation.
func (l *logrusWrapper) Info(msg string, keysAndValues ...interface{}) {
logrus.Info("media manager cron logger: ", msg, keysAndValues)
}
// Error logs an error condition.
func (l *logrusWrapper) Error(err error, msg string, keysAndValues ...interface{}) {
logrus.Error("media manager cron logger: ", err, msg, keysAndValues)
}