mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-11-03 17:32:27 -06:00 
			
		
		
		
	* replace media workers with just runners.WorkerPool, move to state structure, use go-sched for global task scheduling
* improved code comment
* fix worker tryUntil function, update go-runners/go-sched
* make preprocess functions package public, use these where possible to stop doubled up processing
* remove separate emoji worker pool
* limit calls to time.Now() during media preprocessing
* use Processor{} to manage singular runtime of processing media
* ensure workers get started when media manager is used
* improved error setting in processing media, fix media test
* port changes from processingmedia to processing emoji
* finish code commenting
* finish code commenting and comment-out client API + federator worker pools until concurrency worker pools replaced
* linterrrrrrrrrrrrrrrr
---------
Signed-off-by: kim <grufwub@gmail.com>
		
	
			
		
			
				
	
	
		
			217 lines
		
	
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			217 lines
		
	
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package runners
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"sync"
 | 
						|
)
 | 
						|
 | 
						|
// Service provides a means of tracking a single long-running service, provided protected state
 | 
						|
// changes and preventing multiple instances running. Also providing service state information.
 | 
						|
type Service struct {
 | 
						|
	state uint32        // 0=stopped, 1=running, 2=stopping
 | 
						|
	mutex sync.Mutex    // mutex protects overall state changes
 | 
						|
	wait  sync.Mutex    // wait is used as a single-entity wait-group, only ever locked within 'mutex'
 | 
						|
	ctx   chan struct{} // ctx is the current context for running function (or nil if not running)
 | 
						|
}
 | 
						|
 | 
						|
// Run will run the supplied function until completion, using given context to propagate cancel.
 | 
						|
// Immediately returns false if the Service is already running, and true after completed run.
 | 
						|
func (svc *Service) Run(fn func(context.Context)) bool {
 | 
						|
	// Attempt to start the svc
 | 
						|
	ctx, ok := svc.doStart()
 | 
						|
	if !ok {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		// unlock single wait
 | 
						|
		svc.wait.Unlock()
 | 
						|
 | 
						|
		// ensure stopped
 | 
						|
		_ = svc.Stop()
 | 
						|
	}()
 | 
						|
 | 
						|
	// Run with context.
 | 
						|
	fn(CancelCtx(ctx))
 | 
						|
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// GoRun will run the supplied function until completion in a goroutine, using given context to
 | 
						|
// propagate cancel. Immediately returns boolean indicating success, or that service is already running.
 | 
						|
func (svc *Service) GoRun(fn func(context.Context)) bool {
 | 
						|
	// Attempt to start the svc
 | 
						|
	ctx, ok := svc.doStart()
 | 
						|
	if !ok {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	go func() {
 | 
						|
		defer func() {
 | 
						|
			// unlock single wait
 | 
						|
			svc.wait.Unlock()
 | 
						|
 | 
						|
			// ensure stopped
 | 
						|
			_ = svc.Stop()
 | 
						|
		}()
 | 
						|
 | 
						|
		// Run with context.
 | 
						|
		fn(CancelCtx(ctx))
 | 
						|
	}()
 | 
						|
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// RunWait is functionally the same as .Run(), but blocks until the first instance of .Run() returns.
 | 
						|
func (svc *Service) RunWait(fn func(context.Context)) bool {
 | 
						|
	// Attempt to start the svc
 | 
						|
	ctx, ok := svc.doStart()
 | 
						|
	if !ok {
 | 
						|
		<-ctx // block
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		// unlock single wait
 | 
						|
		svc.wait.Unlock()
 | 
						|
 | 
						|
		// ensure stopped
 | 
						|
		_ = svc.Stop()
 | 
						|
	}()
 | 
						|
 | 
						|
	// Run with context.
 | 
						|
	fn(CancelCtx(ctx))
 | 
						|
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// Stop will attempt to stop the service, cancelling the running function's context. Immediately
 | 
						|
// returns false if not running, and true only after Service is fully stopped.
 | 
						|
func (svc *Service) Stop() bool {
 | 
						|
	// Attempt to stop the svc
 | 
						|
	ctx, ok := svc.doStop()
 | 
						|
	if !ok {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		// Get svc lock
 | 
						|
		svc.mutex.Lock()
 | 
						|
 | 
						|
		// Wait until stopped
 | 
						|
		svc.wait.Lock()
 | 
						|
		svc.wait.Unlock()
 | 
						|
 | 
						|
		// Reset the svc
 | 
						|
		svc.ctx = nil
 | 
						|
		svc.state = 0
 | 
						|
		svc.mutex.Unlock()
 | 
						|
	}()
 | 
						|
 | 
						|
	// Cancel ctx
 | 
						|
	close(ctx)
 | 
						|
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// While allows you to execute given function guaranteed within current
 | 
						|
// service state. Please note that this will hold the underlying service
 | 
						|
// state change mutex open while executing the function.
 | 
						|
func (svc *Service) While(fn func()) {
 | 
						|
	// Protect state change
 | 
						|
	svc.mutex.Lock()
 | 
						|
	defer svc.mutex.Unlock()
 | 
						|
 | 
						|
	// Run
 | 
						|
	fn()
 | 
						|
}
 | 
						|
 | 
						|
// doStart will safely set Service state to started, returning a ptr to this context insance.
 | 
						|
func (svc *Service) doStart() (chan struct{}, bool) {
 | 
						|
	// Protect startup
 | 
						|
	svc.mutex.Lock()
 | 
						|
 | 
						|
	if svc.ctx == nil {
 | 
						|
		// this will only have been allocated
 | 
						|
		// if svc.Done() was already called.
 | 
						|
		svc.ctx = make(chan struct{})
 | 
						|
	}
 | 
						|
 | 
						|
	// Take our own ptr
 | 
						|
	ctx := svc.ctx
 | 
						|
 | 
						|
	if svc.state != 0 {
 | 
						|
		// State was not stopped.
 | 
						|
		svc.mutex.Unlock()
 | 
						|
		return ctx, false
 | 
						|
	}
 | 
						|
 | 
						|
	// Set started.
 | 
						|
	svc.state = 1
 | 
						|
 | 
						|
	// Start waiter.
 | 
						|
	svc.wait.Lock()
 | 
						|
 | 
						|
	// Unlock and return
 | 
						|
	svc.mutex.Unlock()
 | 
						|
	return ctx, true
 | 
						|
}
 | 
						|
 | 
						|
// doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance.
 | 
						|
func (svc *Service) doStop() (chan struct{}, bool) {
 | 
						|
	// Protect stop
 | 
						|
	svc.mutex.Lock()
 | 
						|
 | 
						|
	if svc.state != 1 /* not started */ {
 | 
						|
		svc.mutex.Unlock()
 | 
						|
		return nil, false
 | 
						|
	}
 | 
						|
 | 
						|
	// state stopping
 | 
						|
	svc.state = 2
 | 
						|
 | 
						|
	// Take our own ptr
 | 
						|
	// and unlock state
 | 
						|
	ctx := svc.ctx
 | 
						|
	svc.mutex.Unlock()
 | 
						|
 | 
						|
	return ctx, true
 | 
						|
}
 | 
						|
 | 
						|
// Running returns if Service is running (i.e. state NOT stopped / stopping).
 | 
						|
func (svc *Service) Running() bool {
 | 
						|
	svc.mutex.Lock()
 | 
						|
	state := svc.state
 | 
						|
	svc.mutex.Unlock()
 | 
						|
	return (state == 1)
 | 
						|
}
 | 
						|
 | 
						|
// Done returns a channel that's closed when Service.Stop() is called. It is
 | 
						|
// the same channel provided to the currently running service function.
 | 
						|
func (svc *Service) Done() <-chan struct{} {
 | 
						|
	var done <-chan struct{}
 | 
						|
 | 
						|
	svc.mutex.Lock()
 | 
						|
	switch svc.state {
 | 
						|
	// stopped
 | 
						|
	case 0:
 | 
						|
		if svc.ctx == nil {
 | 
						|
			// here we create a new context so that the
 | 
						|
			// returned 'done' channel here will still
 | 
						|
			// be valid for when Service is next started.
 | 
						|
			svc.ctx = make(chan struct{})
 | 
						|
		}
 | 
						|
		done = svc.ctx
 | 
						|
 | 
						|
	// started
 | 
						|
	case 1:
 | 
						|
		done = svc.ctx
 | 
						|
 | 
						|
	// stopping
 | 
						|
	case 2:
 | 
						|
		done = svc.ctx
 | 
						|
	}
 | 
						|
	svc.mutex.Unlock()
 | 
						|
 | 
						|
	return done
 | 
						|
}
 |