pass a function into the manager, start work on emoji

This commit is contained in:
tsmethurst 2022-01-11 17:49:14 +01:00
commit 113f9d9ab4
20 changed files with 936 additions and 299 deletions

View file

@ -21,9 +21,7 @@ package media
import (
"context"
"errors"
"fmt"
"runtime"
"strings"
"codeberg.org/gruf/go-runners"
"codeberg.org/gruf/go-store/kv"
@ -33,15 +31,17 @@ import (
// Manager provides an interface for managing media: parsing, storing, and retrieving media objects like photos, videos, and gifs.
type Manager interface {
// ProcessMedia begins the process of decoding and storing the given data as a piece of media (aka an attachment).
// ProcessMedia begins the process of decoding and storing the given data as an attachment.
// It will return a pointer to a Media struct upon which further actions can be performed, such as getting
// the finished media, thumbnail, attachment, etc.
//
// data should be a function that the media manager can call to return raw bytes of a piece of media.
//
// accountID should be the account that the media belongs to.
//
// ai is optional and can be nil. Any additional information about the attachment provided will be put in the database.
ProcessMedia(ctx context.Context, data []byte, accountID string, ai *AdditionalInfo) (*Processing, error)
ProcessEmoji(ctx context.Context, data []byte, accountID string) (*Processing, error)
ProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error)
ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error)
// NumWorkers returns the total number of workers available to this manager.
NumWorkers() int
// QueueSize returns the total capacity of the queue.
@ -101,49 +101,52 @@ func NewManager(database db.DB, storage *kv.KVStore) (Manager, error) {
return m, nil
}
func (m *manager) ProcessMedia(ctx context.Context, data []byte, accountID string, ai *AdditionalInfo) (*Processing, error) {
contentType, err := parseContentType(data)
func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
processingMedia, err := m.preProcessMedia(ctx, data, accountID, ai)
if err != nil {
return nil, err
}
split := strings.Split(contentType, "/")
if len(split) != 2 {
return nil, fmt.Errorf("content type %s malformed", contentType)
}
mainType := split[0]
switch mainType {
case mimeImage:
media, err := m.preProcessImage(ctx, data, contentType, accountID, ai)
if err != nil {
return nil, err
}
logrus.Tracef("ProcessMedia: about to enqueue media with attachmentID %s, queue length is %d", media.AttachmentID(), m.pool.Queue())
m.pool.Enqueue(func(innerCtx context.Context) {
select {
case <-innerCtx.Done():
// if the inner context is done that means the worker pool is closing, so we should just return
return
default:
// start loading the media already for the caller's convenience
if _, err := media.Load(innerCtx); err != nil {
logrus.Errorf("ProcessMedia: error processing media with attachmentID %s: %s", media.AttachmentID(), err)
}
logrus.Tracef("ProcessMedia: about to enqueue media with attachmentID %s, queue length is %d", processingMedia.AttachmentID(), m.pool.Queue())
m.pool.Enqueue(func(innerCtx context.Context) {
select {
case <-innerCtx.Done():
// if the inner context is done that means the worker pool is closing, so we should just return
return
default:
// start loading the media already for the caller's convenience
if _, err := processingMedia.LoadAttachment(innerCtx); err != nil {
logrus.Errorf("ProcessMedia: error processing media with attachmentID %s: %s", processingMedia.AttachmentID(), err)
}
})
logrus.Tracef("ProcessMedia: succesfully queued media with attachmentID %s, queue length is %d", media.AttachmentID(), m.pool.Queue())
}
})
logrus.Tracef("ProcessMedia: succesfully queued media with attachmentID %s, queue length is %d", processingMedia.AttachmentID(), m.pool.Queue())
return media, nil
default:
return nil, fmt.Errorf("content type %s not (yet) supported", contentType)
}
return processingMedia, nil
}
func (m *manager) ProcessEmoji(ctx context.Context, data []byte, accountID string) (*Processing, error) {
return nil, nil
func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) {
processingEmoji, err := m.preProcessEmoji(ctx, data, shortcode, ai)
if err != nil {
return nil, err
}
logrus.Tracef("ProcessEmoji: about to enqueue emoji with id %s, queue length is %d", processingEmoji.EmojiID(), m.pool.Queue())
m.pool.Enqueue(func(innerCtx context.Context) {
select {
case <-innerCtx.Done():
// if the inner context is done that means the worker pool is closing, so we should just return
return
default:
// start loading the emoji already for the caller's convenience
if _, err := processingEmoji.LoadEmoji(innerCtx); err != nil {
logrus.Errorf("ProcessEmoji: error processing emoji with id %s: %s", processingEmoji.EmojiID(), err)
}
}
})
logrus.Tracef("ProcessEmoji: succesfully queued emoji with id %s, queue length is %d", processingEmoji.EmojiID(), m.pool.Queue())
return processingEmoji, nil
}
func (m *manager) NumWorkers() int {