mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-10-29 19:52:24 -05:00 
			
		
		
		
	* replace media workers with just runners.WorkerPool, move to state structure, use go-sched for global task scheduling
* improved code comment
* fix worker tryUntil function, update go-runners/go-sched
* make preprocess functions package public, use these where possible to stop doubled up processing
* remove separate emoji worker pool
* limit calls to time.Now() during media preprocessing
* use Processor{} to manage singular runtime of processing media
* ensure workers get started when media manager is used
* improved error setting in processing media, fix media test
* port changes from processingmedia to processing emoji
* finish code commenting
* finish code commenting and comment-out client API + federator worker pools until concurrency worker pools replaced
* linterrrrrrrrrrrrrrrr
---------
Signed-off-by: kim <grufwub@gmail.com>
		
	
			
		
			
				
	
	
		
			217 lines
		
	
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			217 lines
		
	
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package runners
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"sync"
 | |
| )
 | |
| 
 | |
| // Service provides a means of tracking a single long-running service, provided protected state
 | |
| // changes and preventing multiple instances running. Also providing service state information.
 | |
| type Service struct {
 | |
| 	state uint32        // 0=stopped, 1=running, 2=stopping
 | |
| 	mutex sync.Mutex    // mutex protects overall state changes
 | |
| 	wait  sync.Mutex    // wait is used as a single-entity wait-group, only ever locked within 'mutex'
 | |
| 	ctx   chan struct{} // ctx is the current context for running function (or nil if not running)
 | |
| }
 | |
| 
 | |
| // Run will run the supplied function until completion, using given context to propagate cancel.
 | |
| // Immediately returns false if the Service is already running, and true after completed run.
 | |
| func (svc *Service) Run(fn func(context.Context)) bool {
 | |
| 	// Attempt to start the svc
 | |
| 	ctx, ok := svc.doStart()
 | |
| 	if !ok {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	defer func() {
 | |
| 		// unlock single wait
 | |
| 		svc.wait.Unlock()
 | |
| 
 | |
| 		// ensure stopped
 | |
| 		_ = svc.Stop()
 | |
| 	}()
 | |
| 
 | |
| 	// Run with context.
 | |
| 	fn(CancelCtx(ctx))
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| // GoRun will run the supplied function until completion in a goroutine, using given context to
 | |
| // propagate cancel. Immediately returns boolean indicating success, or that service is already running.
 | |
| func (svc *Service) GoRun(fn func(context.Context)) bool {
 | |
| 	// Attempt to start the svc
 | |
| 	ctx, ok := svc.doStart()
 | |
| 	if !ok {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	go func() {
 | |
| 		defer func() {
 | |
| 			// unlock single wait
 | |
| 			svc.wait.Unlock()
 | |
| 
 | |
| 			// ensure stopped
 | |
| 			_ = svc.Stop()
 | |
| 		}()
 | |
| 
 | |
| 		// Run with context.
 | |
| 		fn(CancelCtx(ctx))
 | |
| 	}()
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| // RunWait is functionally the same as .Run(), but blocks until the first instance of .Run() returns.
 | |
| func (svc *Service) RunWait(fn func(context.Context)) bool {
 | |
| 	// Attempt to start the svc
 | |
| 	ctx, ok := svc.doStart()
 | |
| 	if !ok {
 | |
| 		<-ctx // block
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	defer func() {
 | |
| 		// unlock single wait
 | |
| 		svc.wait.Unlock()
 | |
| 
 | |
| 		// ensure stopped
 | |
| 		_ = svc.Stop()
 | |
| 	}()
 | |
| 
 | |
| 	// Run with context.
 | |
| 	fn(CancelCtx(ctx))
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| // Stop will attempt to stop the service, cancelling the running function's context. Immediately
 | |
| // returns false if not running, and true only after Service is fully stopped.
 | |
| func (svc *Service) Stop() bool {
 | |
| 	// Attempt to stop the svc
 | |
| 	ctx, ok := svc.doStop()
 | |
| 	if !ok {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	defer func() {
 | |
| 		// Get svc lock
 | |
| 		svc.mutex.Lock()
 | |
| 
 | |
| 		// Wait until stopped
 | |
| 		svc.wait.Lock()
 | |
| 		svc.wait.Unlock()
 | |
| 
 | |
| 		// Reset the svc
 | |
| 		svc.ctx = nil
 | |
| 		svc.state = 0
 | |
| 		svc.mutex.Unlock()
 | |
| 	}()
 | |
| 
 | |
| 	// Cancel ctx
 | |
| 	close(ctx)
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| // While allows you to execute given function guaranteed within current
 | |
| // service state. Please note that this will hold the underlying service
 | |
| // state change mutex open while executing the function.
 | |
| func (svc *Service) While(fn func()) {
 | |
| 	// Protect state change
 | |
| 	svc.mutex.Lock()
 | |
| 	defer svc.mutex.Unlock()
 | |
| 
 | |
| 	// Run
 | |
| 	fn()
 | |
| }
 | |
| 
 | |
| // doStart will safely set Service state to started, returning a ptr to this context insance.
 | |
| func (svc *Service) doStart() (chan struct{}, bool) {
 | |
| 	// Protect startup
 | |
| 	svc.mutex.Lock()
 | |
| 
 | |
| 	if svc.ctx == nil {
 | |
| 		// this will only have been allocated
 | |
| 		// if svc.Done() was already called.
 | |
| 		svc.ctx = make(chan struct{})
 | |
| 	}
 | |
| 
 | |
| 	// Take our own ptr
 | |
| 	ctx := svc.ctx
 | |
| 
 | |
| 	if svc.state != 0 {
 | |
| 		// State was not stopped.
 | |
| 		svc.mutex.Unlock()
 | |
| 		return ctx, false
 | |
| 	}
 | |
| 
 | |
| 	// Set started.
 | |
| 	svc.state = 1
 | |
| 
 | |
| 	// Start waiter.
 | |
| 	svc.wait.Lock()
 | |
| 
 | |
| 	// Unlock and return
 | |
| 	svc.mutex.Unlock()
 | |
| 	return ctx, true
 | |
| }
 | |
| 
 | |
| // doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance.
 | |
| func (svc *Service) doStop() (chan struct{}, bool) {
 | |
| 	// Protect stop
 | |
| 	svc.mutex.Lock()
 | |
| 
 | |
| 	if svc.state != 1 /* not started */ {
 | |
| 		svc.mutex.Unlock()
 | |
| 		return nil, false
 | |
| 	}
 | |
| 
 | |
| 	// state stopping
 | |
| 	svc.state = 2
 | |
| 
 | |
| 	// Take our own ptr
 | |
| 	// and unlock state
 | |
| 	ctx := svc.ctx
 | |
| 	svc.mutex.Unlock()
 | |
| 
 | |
| 	return ctx, true
 | |
| }
 | |
| 
 | |
| // Running returns if Service is running (i.e. state NOT stopped / stopping).
 | |
| func (svc *Service) Running() bool {
 | |
| 	svc.mutex.Lock()
 | |
| 	state := svc.state
 | |
| 	svc.mutex.Unlock()
 | |
| 	return (state == 1)
 | |
| }
 | |
| 
 | |
| // Done returns a channel that's closed when Service.Stop() is called. It is
 | |
| // the same channel provided to the currently running service function.
 | |
| func (svc *Service) Done() <-chan struct{} {
 | |
| 	var done <-chan struct{}
 | |
| 
 | |
| 	svc.mutex.Lock()
 | |
| 	switch svc.state {
 | |
| 	// stopped
 | |
| 	case 0:
 | |
| 		if svc.ctx == nil {
 | |
| 			// here we create a new context so that the
 | |
| 			// returned 'done' channel here will still
 | |
| 			// be valid for when Service is next started.
 | |
| 			svc.ctx = make(chan struct{})
 | |
| 		}
 | |
| 		done = svc.ctx
 | |
| 
 | |
| 	// started
 | |
| 	case 1:
 | |
| 		done = svc.ctx
 | |
| 
 | |
| 	// stopping
 | |
| 	case 2:
 | |
| 		done = svc.ctx
 | |
| 	}
 | |
| 	svc.mutex.Unlock()
 | |
| 
 | |
| 	return done
 | |
| }
 |