mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-10-30 22:02:25 -05:00 
			
		
		
		
	[feature] Update media manager to use internal/worker package (#543)
* update media manager to use internal/worker package, update worker with better logging Signed-off-by: kim <grufwub@gmail.com> * fix Queue() trace log message format operators Signed-off-by: kim <grufwub@gmail.com> * update media manager comment to match updated worker implementation Signed-off-by: kim <grufwub@gmail.com>
This commit is contained in:
		
					parent
					
						
							
								807a8e1cf6
							
						
					
				
			
			
				commit
				
					
						a561ef3541
					
				
			
		
					 3 changed files with 84 additions and 130 deletions
				
			
		|  | @ -20,18 +20,16 @@ package media | |||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"runtime" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"codeberg.org/gruf/go-runners" | ||||
| 	"codeberg.org/gruf/go-store/kv" | ||||
| 	"github.com/robfig/cron/v3" | ||||
| 	"github.com/sirupsen/logrus" | ||||
| 	"github.com/spf13/viper" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/config" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/db" | ||||
| 	"github.com/superseriousbusiness/gotosocial/internal/worker" | ||||
| ) | ||||
| 
 | ||||
| // Manager provides an interface for managing media: parsing, storing, and retrieving media objects like photos, videos, and gifs. | ||||
|  | @ -72,14 +70,6 @@ type Manager interface { | |||
| 	// 'Pruning' in this context means removing the locally stored data of the attachment (both thumbnail and full size), | ||||
| 	// and setting 'cached' to false on the associated attachment. | ||||
| 	PruneRemote(ctx context.Context, olderThanDays int) (int, error) | ||||
| 	// NumWorkers returns the total number of workers available to this manager. | ||||
| 	NumWorkers() int | ||||
| 	// QueueSize returns the total capacity of the queue. | ||||
| 	QueueSize() int | ||||
| 	// JobsQueued returns the number of jobs currently in the task queue. | ||||
| 	JobsQueued() int | ||||
| 	// ActiveWorkers returns the number of workers currently performing jobs. | ||||
| 	ActiveWorkers() int | ||||
| 	// Stop stops the underlying worker pool of the manager. It should be called | ||||
| 	// when closing GoToSocial in order to cleanly finish any in-progress jobs. | ||||
| 	// It will block until workers are finished processing. | ||||
|  | @ -89,47 +79,54 @@ type Manager interface { | |||
| type manager struct { | ||||
| 	db           db.DB | ||||
| 	storage      *kv.KVStore | ||||
| 	pool         runners.WorkerPool | ||||
| 	emojiWorker  *worker.Worker[*ProcessingEmoji] | ||||
| 	mediaWorker  *worker.Worker[*ProcessingMedia] | ||||
| 	stopCronJobs func() error | ||||
| 	numWorkers   int | ||||
| 	queueSize    int | ||||
| } | ||||
| 
 | ||||
| // NewManager returns a media manager with the given db and underlying storage. | ||||
| // | ||||
| // A worker pool will also be initialized for the manager, to ensure that only | ||||
| // a limited number of media will be processed in parallel. | ||||
| // | ||||
| // The number of workers will be the number of CPUs available to the Go runtime, | ||||
| // divided by 2 (rounding down, but always at least 1). | ||||
| // | ||||
| // The length of the queue will be the number of workers multiplied by 10. | ||||
| // | ||||
| // So for an 8 core machine, the media manager will get 4 workers, and a queue of length 40. | ||||
| // For a 4 core machine, this will be 2 workers, and a queue length of 20. | ||||
| // For a single or 2-core machine, the media manager will get 1 worker, and a queue of length 10. | ||||
| // a limited number of media will be processed in parallel. The numbers of workers | ||||
| // is determined from the $GOMAXPROCS environment variable (usually no. CPU cores). | ||||
| // See internal/worker.New() documentation for further information. | ||||
| func NewManager(database db.DB, storage *kv.KVStore) (Manager, error) { | ||||
| 	// configure the worker pool | ||||
| 	// make sure we always have at least 1 worker even on single-core machines | ||||
| 	numWorkers := runtime.NumCPU() / 2 | ||||
| 	if numWorkers == 0 { | ||||
| 		numWorkers = 1 | ||||
| 	} | ||||
| 	queueSize := numWorkers * 10 | ||||
| 
 | ||||
| 	m := &manager{ | ||||
| 		db:         database, | ||||
| 		storage:    storage, | ||||
| 		pool:       runners.NewWorkerPool(numWorkers, queueSize), | ||||
| 		numWorkers: numWorkers, | ||||
| 		queueSize:  queueSize, | ||||
| 		db:      database, | ||||
| 		storage: storage, | ||||
| 	} | ||||
| 
 | ||||
| 	// start the worker pool | ||||
| 	if start := m.pool.Start(); !start { | ||||
| 		return nil, errors.New("could not start worker pool") | ||||
| 	// Prepare the media worker pool | ||||
| 	m.mediaWorker = worker.New[*ProcessingMedia](-1, 10) | ||||
| 	m.mediaWorker.SetProcessor(func(ctx context.Context, media *ProcessingMedia) error { | ||||
| 		if err := ctx.Err(); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		if _, err := media.LoadAttachment(ctx); err != nil { | ||||
| 			return fmt.Errorf("error loading media %s: %v", media.AttachmentID(), err) | ||||
| 		} | ||||
| 		return nil | ||||
| 	}) | ||||
| 
 | ||||
| 	// Prepare the emoji worker pool | ||||
| 	m.emojiWorker = worker.New[*ProcessingEmoji](-1, 10) | ||||
| 	m.emojiWorker.SetProcessor(func(ctx context.Context, emoji *ProcessingEmoji) error { | ||||
| 		if err := ctx.Err(); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		if _, err := emoji.LoadEmoji(ctx); err != nil { | ||||
| 			return fmt.Errorf("error loading emoji %s: %v", emoji.EmojiID(), err) | ||||
| 		} | ||||
| 		return nil | ||||
| 	}) | ||||
| 
 | ||||
| 	// Start the worker pools | ||||
| 	if err := m.mediaWorker.Start(); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if err := m.emojiWorker.Start(); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	logrus.Debugf("started media manager worker pool with %d workers and queue capacity of %d", numWorkers, queueSize) | ||||
| 
 | ||||
| 	// start remote cache cleanup cronjob if configured | ||||
| 	cacheCleanupDays := viper.GetInt(config.Keys.MediaRemoteCacheDays) | ||||
|  | @ -188,22 +185,7 @@ func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, postData Post | |||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	logrus.Tracef("ProcessMedia: about to enqueue media with attachmentID %s, queue length is %d", processingMedia.AttachmentID(), m.pool.Queue()) | ||||
| 	m.pool.Enqueue(func(innerCtx context.Context) { | ||||
| 		select { | ||||
| 		case <-innerCtx.Done(): | ||||
| 			// if the inner context is done that means the worker pool is closing, so we should just return | ||||
| 			return | ||||
| 		default: | ||||
| 			// start loading the media already for the caller's convenience | ||||
| 			if _, err := processingMedia.LoadAttachment(innerCtx); err != nil { | ||||
| 				logrus.Errorf("ProcessMedia: error processing media with attachmentID %s: %s", processingMedia.AttachmentID(), err) | ||||
| 			} | ||||
| 		} | ||||
| 	}) | ||||
| 	logrus.Tracef("ProcessMedia: succesfully queued media with attachmentID %s, queue length is %d", processingMedia.AttachmentID(), m.pool.Queue()) | ||||
| 
 | ||||
| 	m.mediaWorker.Queue(processingMedia) | ||||
| 	return processingMedia, nil | ||||
| } | ||||
| 
 | ||||
|  | @ -212,22 +194,7 @@ func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData Post | |||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	logrus.Tracef("ProcessEmoji: about to enqueue emoji with id %s, queue length is %d", processingEmoji.EmojiID(), m.pool.Queue()) | ||||
| 	m.pool.Enqueue(func(innerCtx context.Context) { | ||||
| 		select { | ||||
| 		case <-innerCtx.Done(): | ||||
| 			// if the inner context is done that means the worker pool is closing, so we should just return | ||||
| 			return | ||||
| 		default: | ||||
| 			// start loading the emoji already for the caller's convenience | ||||
| 			if _, err := processingEmoji.LoadEmoji(innerCtx); err != nil { | ||||
| 				logrus.Errorf("ProcessEmoji: error processing emoji with id %s: %s", processingEmoji.EmojiID(), err) | ||||
| 			} | ||||
| 		} | ||||
| 	}) | ||||
| 	logrus.Tracef("ProcessEmoji: succesfully queued emoji with id %s, queue length is %d", processingEmoji.EmojiID(), m.pool.Queue()) | ||||
| 
 | ||||
| 	m.emojiWorker.Queue(processingEmoji) | ||||
| 	return processingEmoji, nil | ||||
| } | ||||
| 
 | ||||
|  | @ -236,51 +203,26 @@ func (m *manager) RecacheMedia(ctx context.Context, data DataFunc, postData Post | |||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	logrus.Tracef("RecacheMedia: about to enqueue recache with attachmentID %s, queue length is %d", processingRecache.AttachmentID(), m.pool.Queue()) | ||||
| 	m.pool.Enqueue(func(innerCtx context.Context) { | ||||
| 		select { | ||||
| 		case <-innerCtx.Done(): | ||||
| 			// if the inner context is done that means the worker pool is closing, so we should just return | ||||
| 			return | ||||
| 		default: | ||||
| 			// start loading the media already for the caller's convenience | ||||
| 			if _, err := processingRecache.LoadAttachment(innerCtx); err != nil { | ||||
| 				logrus.Errorf("RecacheMedia: error processing recache with attachmentID %s: %s", processingRecache.AttachmentID(), err) | ||||
| 			} | ||||
| 		} | ||||
| 	}) | ||||
| 	logrus.Tracef("RecacheMedia: succesfully queued recache with attachmentID %s, queue length is %d", processingRecache.AttachmentID(), m.pool.Queue()) | ||||
| 
 | ||||
| 	m.mediaWorker.Queue(processingRecache) | ||||
| 	return processingRecache, nil | ||||
| } | ||||
| 
 | ||||
| func (m *manager) NumWorkers() int { | ||||
| 	return m.numWorkers | ||||
| } | ||||
| 
 | ||||
| func (m *manager) QueueSize() int { | ||||
| 	return m.queueSize | ||||
| } | ||||
| 
 | ||||
| func (m *manager) JobsQueued() int { | ||||
| 	return m.pool.Queue() | ||||
| } | ||||
| 
 | ||||
| func (m *manager) ActiveWorkers() int { | ||||
| 	return m.pool.Workers() | ||||
| } | ||||
| 
 | ||||
| func (m *manager) Stop() error { | ||||
| 	logrus.Info("stopping media manager worker pool") | ||||
| 	if !m.pool.Stop() { | ||||
| 		return errors.New("could not stop media manager worker pool") | ||||
| 	// Stop media and emoji worker pools | ||||
| 	mediaErr := m.mediaWorker.Stop() | ||||
| 	emojiErr := m.emojiWorker.Stop() | ||||
| 
 | ||||
| 	var cronErr error | ||||
| 
 | ||||
| 	if m.stopCronJobs != nil { | ||||
| 		// only set if cache prune age > 0 | ||||
| 		cronErr = m.stopCronJobs() | ||||
| 	} | ||||
| 
 | ||||
| 	if m.stopCronJobs != nil { // only defined if cron jobs are actually running | ||||
| 		logrus.Info("stopping media manager cache cleanup jobs") | ||||
| 		return m.stopCronJobs() | ||||
| 	if mediaErr != nil { | ||||
| 		return mediaErr | ||||
| 	} else if emojiErr != nil { | ||||
| 		return emojiErr | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| 	return cronErr | ||||
| } | ||||
|  |  | |||
|  | @ -443,8 +443,6 @@ func (suite *ManagerTestSuite) TestSimpleJpegQueueSpamming() { | |||
| 	} | ||||
| 
 | ||||
| 	for _, processingMedia := range inProcess { | ||||
| 		fmt.Printf("\n\n\nactive workers: %d, queue length: %d\n\n\n", suite.manager.ActiveWorkers(), suite.manager.JobsQueued()) | ||||
| 
 | ||||
| 		// fetch the attachment id from the processing media | ||||
| 		attachmentID := processingMedia.AttachmentID() | ||||
| 
 | ||||
|  |  | |||
|  | @ -3,6 +3,8 @@ package worker | |||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"path" | ||||
| 	"reflect" | ||||
| 	"runtime" | ||||
| 
 | ||||
|  | @ -17,34 +19,44 @@ type Worker[MsgType any] struct { | |||
| 	prefix  string // contains type prefix for logging | ||||
| } | ||||
| 
 | ||||
| // New returns a new Worker[MsgType] with given number of workers and queue size | ||||
| // (see runners.WorkerPool for more information on args). If args < 1 then suitable | ||||
| // defaults are determined from the runtime's GOMAXPROCS variable. | ||||
| func New[MsgType any](workers int, queue int) *Worker[MsgType] { | ||||
| // New returns a new Worker[MsgType] with given number of workers and queue ratio, | ||||
| // where the queue ratio is multiplied by no. workers to get queue size. If args < 1 | ||||
| // then suitable defaults are determined from the runtime's GOMAXPROCS variable. | ||||
| func New[MsgType any](workers int, queueRatio int) *Worker[MsgType] { | ||||
| 	var zero MsgType | ||||
| 
 | ||||
| 	if workers < 1 { | ||||
| 		// ensure sensible workers | ||||
| 		workers = runtime.GOMAXPROCS(0) | ||||
| 	} | ||||
| 	if queue < 1 { | ||||
| 		// ensure sensible queue | ||||
| 		queue = workers * 100 | ||||
| 	if queueRatio < 1 { | ||||
| 		// ensure sensible ratio | ||||
| 		queueRatio = 100 | ||||
| 	} | ||||
| 
 | ||||
| 	// Calculate the short type string for the msg type | ||||
| 	msgType := reflect.TypeOf(zero).String() | ||||
| 	_, msgType = path.Split(msgType) | ||||
| 
 | ||||
| 	w := &Worker[MsgType]{ | ||||
| 		workers: runners.NewWorkerPool(workers, queue), | ||||
| 		workers: runners.NewWorkerPool(workers, workers*queueRatio), | ||||
| 		process: nil, | ||||
| 		prefix:  reflect.TypeOf(Worker[MsgType]{}).String(), //nolint | ||||
| 		prefix:  fmt.Sprintf("worker.Worker[%s]", msgType), | ||||
| 	} | ||||
| 
 | ||||
| 	// Log new worker creation with type prefix | ||||
| 	logrus.Infof("%s created with workers=%d queue=%d", w.prefix, workers, queue) | ||||
| 	logrus.Infof("%s created with workers=%d queue=%d", | ||||
| 		w.prefix, | ||||
| 		workers, | ||||
| 		workers*queueRatio, | ||||
| 	) | ||||
| 
 | ||||
| 	return w | ||||
| } | ||||
| 
 | ||||
| // Start will attempt to start the underlying worker pool, or return error. | ||||
| func (w *Worker[MsgType]) Start() error { | ||||
| 	logrus.Info(w.prefix, "starting") | ||||
| 	logrus.Infof("%s starting", w.prefix) | ||||
| 
 | ||||
| 	// Check processor was set | ||||
| 	if w.process == nil { | ||||
|  | @ -61,7 +73,7 @@ func (w *Worker[MsgType]) Start() error { | |||
| 
 | ||||
| // Stop will attempt to stop the underlying worker pool, or return error. | ||||
| func (w *Worker[MsgType]) Stop() error { | ||||
| 	logrus.Info(w.prefix, "stopping") | ||||
| 	logrus.Infof("%s stopping", w.prefix) | ||||
| 
 | ||||
| 	// Attempt to stop pool | ||||
| 	if !w.workers.Stop() { | ||||
|  | @ -74,17 +86,19 @@ func (w *Worker[MsgType]) Stop() error { | |||
| // SetProcessor will set the Worker's processor function, which is called for each queued message. | ||||
| func (w *Worker[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) { | ||||
| 	if w.process != nil { | ||||
| 		logrus.Panic(w.prefix, "Worker.process is already set") | ||||
| 		logrus.Panicf("%s Worker.process is already set", w.prefix) | ||||
| 	} | ||||
| 	w.process = fn | ||||
| } | ||||
| 
 | ||||
| // Queue will queue provided message to be processed with there's a free worker. | ||||
| func (w *Worker[MsgType]) Queue(msg MsgType) { | ||||
| 	logrus.Tracef("%s queueing message: %+v", w.prefix, msg) | ||||
| 	logrus.Tracef("%s queueing message (workers=%d queue=%d): %+v", | ||||
| 		w.prefix, w.workers.Workers(), w.workers.Queue(), msg, | ||||
| 	) | ||||
| 	w.workers.Enqueue(func(ctx context.Context) { | ||||
| 		if err := w.process(ctx, msg); err != nil { | ||||
| 			logrus.Error(err) | ||||
| 			logrus.Errorf("%s %v", w.prefix, err) | ||||
| 		} | ||||
| 	}) | ||||
| } | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue