| 
									
										
										
										
											2021-02-28 15:17:18 +01:00
										 |  |  | /* | 
					
						
							| 
									
										
										
										
											2021-03-01 15:41:43 +01:00
										 |  |  |    GoToSocial | 
					
						
							| 
									
										
										
										
											2021-12-20 18:42:19 +01:00
										 |  |  |    Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org | 
					
						
							| 
									
										
										
										
											2021-02-28 15:17:18 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-01 15:41:43 +01:00
										 |  |  |    This program is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  |    it under the terms of the GNU Affero General Public License as published by | 
					
						
							|  |  |  |    the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  |    (at your option) any later version. | 
					
						
							| 
									
										
										
										
											2021-02-28 15:17:18 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-01 15:41:43 +01:00
										 |  |  |    This program is distributed in the hope that it will be useful, | 
					
						
							|  |  |  |    but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  |    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
					
						
							|  |  |  |    GNU Affero General Public License for more details. | 
					
						
							| 
									
										
										
										
											2021-02-28 15:17:18 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-01 15:41:43 +01:00
										 |  |  |    You should have received a copy of the GNU Affero General Public License | 
					
						
							|  |  |  |    along with this program.  If not, see <http://www.gnu.org/licenses/>. | 
					
						
							| 
									
										
										
										
											2021-02-28 15:17:18 +01:00
										 |  |  | */ | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-09 17:03:40 +01:00
										 |  |  | package media | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-04-01 20:46:45 +02:00
										 |  |  | import ( | 
					
						
							| 
									
										
										
										
											2021-05-17 19:06:58 +02:00
										 |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2021-04-01 20:46:45 +02:00
										 |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2022-01-02 15:00:53 +01:00
										 |  |  | 	"runtime" | 
					
						
							| 
									
										
										
										
											2021-04-01 20:46:45 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-03 17:37:38 +01:00
										 |  |  | 	"codeberg.org/gruf/go-runners" | 
					
						
							| 
									
										
										
										
											2021-11-13 12:29:08 +01:00
										 |  |  | 	"codeberg.org/gruf/go-store/kv" | 
					
						
							| 
									
										
										
										
											2022-01-03 17:37:38 +01:00
										 |  |  | 	"github.com/sirupsen/logrus" | 
					
						
							| 
									
										
										
										
											2021-04-01 20:46:45 +02:00
										 |  |  | 	"github.com/superseriousbusiness/gotosocial/internal/db" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-12-28 16:36:00 +01:00
										 |  |  | // Manager provides an interface for managing media: parsing, storing, and retrieving media objects like photos, videos, and gifs. | 
					
						
							|  |  |  | type Manager interface { | 
					
						
							| 
									
										
										
										
											2022-01-11 17:49:14 +01:00
										 |  |  | 	// ProcessMedia begins the process of decoding and storing the given data as an attachment. | 
					
						
							| 
									
										
										
										
											2022-01-08 17:17:01 +01:00
										 |  |  | 	// It will return a pointer to a Media struct upon which further actions can be performed, such as getting | 
					
						
							| 
									
										
										
										
											2022-01-10 18:36:09 +01:00
										 |  |  | 	// the finished media, thumbnail, attachment, etc. | 
					
						
							| 
									
										
										
										
											2022-01-08 17:17:01 +01:00
										 |  |  | 	// | 
					
						
							| 
									
										
										
										
											2022-01-11 17:49:14 +01:00
										 |  |  | 	// data should be a function that the media manager can call to return raw bytes of a piece of media. | 
					
						
							|  |  |  | 	// | 
					
						
							| 
									
										
										
										
											2022-01-08 17:17:01 +01:00
										 |  |  | 	// accountID should be the account that the media belongs to. | 
					
						
							|  |  |  | 	// | 
					
						
							| 
									
										
										
										
											2022-01-10 18:36:09 +01:00
										 |  |  | 	// ai is optional and can be nil. Any additional information about the attachment provided will be put in the database. | 
					
						
							| 
									
										
										
										
											2022-01-11 17:49:14 +01:00
										 |  |  | 	ProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) | 
					
						
							|  |  |  | 	ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) | 
					
						
							| 
									
										
										
										
											2022-01-10 18:36:09 +01:00
										 |  |  | 	// NumWorkers returns the total number of workers available to this manager. | 
					
						
							|  |  |  | 	NumWorkers() int | 
					
						
							|  |  |  | 	// QueueSize returns the total capacity of the queue. | 
					
						
							|  |  |  | 	QueueSize() int | 
					
						
							|  |  |  | 	// JobsQueued returns the number of jobs currently in the task queue. | 
					
						
							|  |  |  | 	JobsQueued() int | 
					
						
							|  |  |  | 	// ActiveWorkers returns the number of workers currently performing jobs. | 
					
						
							|  |  |  | 	ActiveWorkers() int | 
					
						
							|  |  |  | 	// Stop stops the underlying worker pool of the manager. It should be called | 
					
						
							|  |  |  | 	// when closing GoToSocial in order to cleanly finish any in-progress jobs. | 
					
						
							|  |  |  | 	// It will block until workers are finished processing. | 
					
						
							|  |  |  | 	Stop() error | 
					
						
							| 
									
										
										
										
											2021-04-01 20:46:45 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-12-28 16:36:00 +01:00
										 |  |  | type manager struct { | 
					
						
							| 
									
										
										
										
											2022-01-10 18:36:09 +01:00
										 |  |  | 	db         db.DB | 
					
						
							|  |  |  | 	storage    *kv.KVStore | 
					
						
							|  |  |  | 	pool       runners.WorkerPool | 
					
						
							|  |  |  | 	numWorkers int | 
					
						
							|  |  |  | 	queueSize  int | 
					
						
							| 
									
										
										
										
											2021-04-01 20:46:45 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-10 18:36:09 +01:00
										 |  |  | // NewManager returns a media manager with the given db and underlying storage. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // A worker pool will also be initialized for the manager, to ensure that only | 
					
						
							|  |  |  | // a limited number of media will be processed in parallel. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The number of workers will be the number of CPUs available to the Go runtime, | 
					
						
							|  |  |  | // divided by 2 (rounding down, but always at least 1). | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // The length of the queue will be the number of workers multiplied by 10. | 
					
						
							|  |  |  | // | 
					
						
							|  |  |  | // So for an 8 core machine, the media manager will get 4 workers, and a queue of length 40. | 
					
						
							|  |  |  | // For a 4 core machine, this will be 2 workers, and a queue length of 20. | 
					
						
							|  |  |  | // For a single or 2-core machine, the media manager will get 1 worker, and a queue of length 10. | 
					
						
							|  |  |  | func NewManager(database db.DB, storage *kv.KVStore) (Manager, error) { | 
					
						
							|  |  |  | 	numWorkers := runtime.NumCPU() / 2 | 
					
						
							|  |  |  | 	// make sure we always have at least 1 worker even on single-core machines | 
					
						
							|  |  |  | 	if numWorkers == 0 { | 
					
						
							|  |  |  | 		numWorkers = 1 | 
					
						
							| 
									
										
										
										
											2022-01-03 17:37:38 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-01-10 18:36:09 +01:00
										 |  |  | 	queueSize := numWorkers * 10 | 
					
						
							| 
									
										
										
										
											2022-01-03 17:37:38 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	m := &manager{ | 
					
						
							| 
									
										
										
										
											2022-01-10 18:36:09 +01:00
										 |  |  | 		db:         database, | 
					
						
							|  |  |  | 		storage:    storage, | 
					
						
							|  |  |  | 		pool:       runners.NewWorkerPool(numWorkers, queueSize), | 
					
						
							|  |  |  | 		numWorkers: numWorkers, | 
					
						
							|  |  |  | 		queueSize:  queueSize, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if start := m.pool.Start(); !start { | 
					
						
							|  |  |  | 		return nil, errors.New("could not start worker pool") | 
					
						
							| 
									
										
										
										
											2021-04-01 20:46:45 +02:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-01-10 18:36:09 +01:00
										 |  |  | 	logrus.Debugf("started media manager worker pool with %d workers and queue capacity of %d", numWorkers, queueSize) | 
					
						
							| 
									
										
										
										
											2022-01-03 17:37:38 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	return m, nil | 
					
						
							| 
									
										
										
										
											2021-04-01 20:46:45 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-11 17:49:14 +01:00
										 |  |  | func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) { | 
					
						
							|  |  |  | 	processingMedia, err := m.preProcessMedia(ctx, data, accountID, ai) | 
					
						
							| 
									
										
										
										
											2021-12-28 16:36:00 +01:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-11 17:49:14 +01:00
										 |  |  | 	logrus.Tracef("ProcessMedia: about to enqueue media with attachmentID %s, queue length is %d", processingMedia.AttachmentID(), m.pool.Queue()) | 
					
						
							|  |  |  | 	m.pool.Enqueue(func(innerCtx context.Context) { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-innerCtx.Done(): | 
					
						
							|  |  |  | 			// if the inner context is done that means the worker pool is closing, so we should just return | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		default: | 
					
						
							|  |  |  | 			// start loading the media already for the caller's convenience | 
					
						
							|  |  |  | 			if _, err := processingMedia.LoadAttachment(innerCtx); err != nil { | 
					
						
							|  |  |  | 				logrus.Errorf("ProcessMedia: error processing media with attachmentID %s: %s", processingMedia.AttachmentID(), err) | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2022-01-03 17:37:38 +01:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2022-01-11 17:49:14 +01:00
										 |  |  | 	}) | 
					
						
							|  |  |  | 	logrus.Tracef("ProcessMedia: succesfully queued media with attachmentID %s, queue length is %d", processingMedia.AttachmentID(), m.pool.Queue()) | 
					
						
							| 
									
										
										
										
											2022-01-03 17:37:38 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-11 17:49:14 +01:00
										 |  |  | 	return processingMedia, nil | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2022-01-03 17:37:38 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-11 17:49:14 +01:00
										 |  |  | func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) { | 
					
						
							|  |  |  | 	processingEmoji, err := m.preProcessEmoji(ctx, data, shortcode, ai) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							| 
									
										
										
										
											2021-05-21 15:48:26 +02:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-01-08 13:45:42 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-11 17:49:14 +01:00
										 |  |  | 	logrus.Tracef("ProcessEmoji: about to enqueue emoji with id %s, queue length is %d", processingEmoji.EmojiID(), m.pool.Queue()) | 
					
						
							|  |  |  | 	m.pool.Enqueue(func(innerCtx context.Context) { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-innerCtx.Done(): | 
					
						
							|  |  |  | 			// if the inner context is done that means the worker pool is closing, so we should just return | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		default: | 
					
						
							|  |  |  | 			// start loading the emoji already for the caller's convenience | 
					
						
							|  |  |  | 			if _, err := processingEmoji.LoadEmoji(innerCtx); err != nil { | 
					
						
							|  |  |  | 				logrus.Errorf("ProcessEmoji: error processing emoji with id %s: %s", processingEmoji.EmojiID(), err) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	logrus.Tracef("ProcessEmoji: succesfully queued emoji with id %s, queue length is %d", processingEmoji.EmojiID(), m.pool.Queue()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return processingEmoji, nil | 
					
						
							| 
									
										
										
										
											2022-01-08 17:17:01 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-10 18:36:09 +01:00
										 |  |  | func (m *manager) NumWorkers() int { | 
					
						
							|  |  |  | 	return m.numWorkers | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2022-01-09 18:41:22 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-10 18:36:09 +01:00
										 |  |  | func (m *manager) QueueSize() int { | 
					
						
							|  |  |  | 	return m.queueSize | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2022-01-09 18:41:22 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-10 18:36:09 +01:00
										 |  |  | func (m *manager) JobsQueued() int { | 
					
						
							|  |  |  | 	return m.pool.Queue() | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2022-01-09 18:41:22 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-10 18:36:09 +01:00
										 |  |  | func (m *manager) ActiveWorkers() int { | 
					
						
							|  |  |  | 	return m.pool.Workers() | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2022-01-09 18:41:22 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-10 18:36:09 +01:00
										 |  |  | func (m *manager) Stop() error { | 
					
						
							|  |  |  | 	logrus.Info("stopping media manager worker pool") | 
					
						
							| 
									
										
										
										
											2022-01-09 18:41:22 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-10 18:36:09 +01:00
										 |  |  | 	stopped := m.pool.Stop() | 
					
						
							|  |  |  | 	if !stopped { | 
					
						
							|  |  |  | 		return errors.New("could not stop media manager worker pool") | 
					
						
							| 
									
										
										
										
											2022-01-08 13:45:42 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-01-10 18:36:09 +01:00
										 |  |  | 	return nil | 
					
						
							| 
									
										
										
										
											2022-01-08 13:45:42 +01:00
										 |  |  | } |