mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-11-02 20:02:25 -06:00 
			
		
		
		
	
		
			
				
	
	
		
			292 lines
		
	
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			292 lines
		
	
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package runners
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"os"
 | 
						|
	"runtime"
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"codeberg.org/gruf/go-errors/v2"
 | 
						|
)
 | 
						|
 | 
						|
// WorkerFunc represents a function processable by a worker in WorkerPool. Note
 | 
						|
// that implementations absolutely MUST check whether passed context is <-ctx.Done()
 | 
						|
// otherwise stopping the pool may block indefinitely.
 | 
						|
type WorkerFunc func(context.Context)
 | 
						|
 | 
						|
// WorkerPool provides a means of enqueuing asynchronous work.
 | 
						|
type WorkerPool struct {
 | 
						|
	fns chan WorkerFunc
 | 
						|
	svc Service
 | 
						|
}
 | 
						|
 | 
						|
// Start will start the main WorkerPool management loop in a new goroutine, along
 | 
						|
// with requested number of child worker goroutines. Returns false if already running.
 | 
						|
func (pool *WorkerPool) Start(workers int, queue int) bool {
 | 
						|
	// Attempt to start the svc
 | 
						|
	ctx, ok := pool.svc.doStart()
 | 
						|
	if !ok {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	if workers <= 0 {
 | 
						|
		// Use $GOMAXPROCS as default.
 | 
						|
		workers = runtime.GOMAXPROCS(0)
 | 
						|
	}
 | 
						|
 | 
						|
	if queue < 0 {
 | 
						|
		// Use reasonable queue default.
 | 
						|
		queue = workers * 10
 | 
						|
	}
 | 
						|
 | 
						|
	// Allocate pool queue of given size.
 | 
						|
	//
 | 
						|
	// This MUST be set BEFORE we return and NOT in
 | 
						|
	// the launched goroutine, or there is a risk that
 | 
						|
	// the pool may appear as closed for a short time
 | 
						|
	// until the main goroutine has been entered.
 | 
						|
	fns := make(chan WorkerFunc, queue)
 | 
						|
	pool.fns = fns
 | 
						|
 | 
						|
	go func() {
 | 
						|
		defer func() {
 | 
						|
			// unlock single wait
 | 
						|
			pool.svc.wait.Unlock()
 | 
						|
 | 
						|
			// ensure stopped
 | 
						|
			pool.svc.Stop()
 | 
						|
		}()
 | 
						|
 | 
						|
		var wait sync.WaitGroup
 | 
						|
 | 
						|
		// Start goroutine worker functions
 | 
						|
		for i := 0; i < workers; i++ {
 | 
						|
			wait.Add(1)
 | 
						|
 | 
						|
			go func() {
 | 
						|
				defer wait.Done()
 | 
						|
 | 
						|
				// Run worker function (retry on panic)
 | 
						|
				for !worker_run(CancelCtx(ctx), fns) {
 | 
						|
				}
 | 
						|
			}()
 | 
						|
		}
 | 
						|
 | 
						|
		// Wait on ctx
 | 
						|
		<-ctx
 | 
						|
 | 
						|
		// Drain function queue.
 | 
						|
		//
 | 
						|
		// All functions in the queue MUST be
 | 
						|
		// run, so we pass them a closed context.
 | 
						|
		//
 | 
						|
		// This mainly allows us to block until
 | 
						|
		// the function queue is empty, as worker
 | 
						|
		// functions will also continue draining in
 | 
						|
		// the background with the (now) closed ctx.
 | 
						|
		for !drain_queue(fns) {
 | 
						|
			// retry on panic
 | 
						|
		}
 | 
						|
 | 
						|
		// Now the queue is empty, we can
 | 
						|
		// safely close the channel signalling
 | 
						|
		// all of the workers to return.
 | 
						|
		close(fns)
 | 
						|
		wait.Wait()
 | 
						|
	}()
 | 
						|
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// Stop will stop the WorkerPool management loop, blocking until stopped.
 | 
						|
func (pool *WorkerPool) Stop() bool {
 | 
						|
	return pool.svc.Stop()
 | 
						|
}
 | 
						|
 | 
						|
// Running returns if WorkerPool management loop is running (i.e. NOT stopped / stopping).
 | 
						|
func (pool *WorkerPool) Running() bool {
 | 
						|
	return pool.svc.Running()
 | 
						|
}
 | 
						|
 | 
						|
// Done returns a channel that's closed when WorkerPool.Stop() is called. It is the same channel provided to the currently running worker functions.
 | 
						|
func (pool *WorkerPool) Done() <-chan struct{} {
 | 
						|
	return pool.svc.Done()
 | 
						|
}
 | 
						|
 | 
						|
// Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker.
 | 
						|
// This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be
 | 
						|
// executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx.
 | 
						|
// WorkerFuncs MUST respect the passed context.
 | 
						|
func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
 | 
						|
	// Check valid fn
 | 
						|
	if fn == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	select {
 | 
						|
	// Pool ctx cancelled
 | 
						|
	case <-pool.svc.Done():
 | 
						|
		fn(closedctx)
 | 
						|
 | 
						|
	// Placed fn in queue
 | 
						|
	case pool.fns <- fn:
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the
 | 
						|
// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc.
 | 
						|
func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool {
 | 
						|
	// Check valid fn
 | 
						|
	if fn == nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	select {
 | 
						|
	// Caller ctx cancelled
 | 
						|
	case <-ctx.Done():
 | 
						|
		return false
 | 
						|
 | 
						|
	// Pool ctx cancelled
 | 
						|
	case <-pool.svc.Done():
 | 
						|
		return false
 | 
						|
 | 
						|
	// Placed fn in queue
 | 
						|
	case pool.fns <- fn:
 | 
						|
		return true
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// MustEnqueueCtx functionally performs similarly to WorkerPool.EnqueueCtx(), but in the case
 | 
						|
// that the provided <-ctx.Done() is closed, it is passed asynchronously to WorkerPool.Enqueue().
 | 
						|
// Return boolean indicates whether function was executed in time before <-ctx.Done() is closed.
 | 
						|
func (pool *WorkerPool) MustEnqueueCtx(ctx context.Context, fn WorkerFunc) (ok bool) {
 | 
						|
	// Check valid fn
 | 
						|
	if fn == nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	select {
 | 
						|
	case <-ctx.Done():
 | 
						|
		// We failed to add this entry to the worker queue before the
 | 
						|
		// incoming context was cancelled. So to ensure processing
 | 
						|
		// we simply queue it asynchronously and return early to caller.
 | 
						|
		go pool.Enqueue(fn)
 | 
						|
		return false
 | 
						|
 | 
						|
	case <-pool.svc.Done():
 | 
						|
		// Pool ctx cancelled
 | 
						|
		fn(closedctx)
 | 
						|
		return false
 | 
						|
 | 
						|
	case pool.fns <- fn:
 | 
						|
		// Placed fn in queue
 | 
						|
		return true
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// EnqueueNow attempts Enqueue but returns false if not executed.
 | 
						|
func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool {
 | 
						|
	// Check valid fn
 | 
						|
	if fn == nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	select {
 | 
						|
	// Pool ctx cancelled
 | 
						|
	case <-pool.svc.Done():
 | 
						|
		return false
 | 
						|
 | 
						|
	// Placed fn in queue
 | 
						|
	case pool.fns <- fn:
 | 
						|
		return true
 | 
						|
 | 
						|
	// Queue is full
 | 
						|
	default:
 | 
						|
		return false
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Queue returns the number of currently queued WorkerFuncs.
 | 
						|
func (pool *WorkerPool) Queue() int {
 | 
						|
	var l int
 | 
						|
	pool.svc.While(func() {
 | 
						|
		l = len(pool.fns)
 | 
						|
	})
 | 
						|
	return l
 | 
						|
}
 | 
						|
 | 
						|
// worker_run is the main worker routine, accepting functions from 'fns' until it is closed.
 | 
						|
func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool {
 | 
						|
	defer func() {
 | 
						|
		// Recover and drop any panic
 | 
						|
		if r := recover(); r != nil {
 | 
						|
 | 
						|
			// Gather calling func frames.
 | 
						|
			pcs := make([]uintptr, 10)
 | 
						|
			n := runtime.Callers(3, pcs)
 | 
						|
			i := runtime.CallersFrames(pcs[:n])
 | 
						|
			c := gatherFrames(i, n)
 | 
						|
 | 
						|
			const msg = "worker_run: recovered panic: %v\n\n%s\n"
 | 
						|
			fmt.Fprintf(os.Stderr, msg, r, c.String())
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	for {
 | 
						|
		// Wait on next func
 | 
						|
		fn, ok := <-fns
 | 
						|
		if !ok {
 | 
						|
			return true
 | 
						|
		}
 | 
						|
 | 
						|
		// Run with ctx
 | 
						|
		fn(ctx)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// drain_queue will drain and run all functions in worker queue, passing in a closed context.
 | 
						|
func drain_queue(fns <-chan WorkerFunc) bool {
 | 
						|
	defer func() {
 | 
						|
		// Recover and drop any panic
 | 
						|
		if r := recover(); r != nil {
 | 
						|
 | 
						|
			// Gather calling func frames.
 | 
						|
			pcs := make([]uintptr, 10)
 | 
						|
			n := runtime.Callers(3, pcs)
 | 
						|
			i := runtime.CallersFrames(pcs[:n])
 | 
						|
			c := gatherFrames(i, n)
 | 
						|
 | 
						|
			const msg = "worker_run: recovered panic: %v\n\n%s\n"
 | 
						|
			fmt.Fprintf(os.Stderr, msg, r, c.String())
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		// Run with closed ctx
 | 
						|
		case fn := <-fns:
 | 
						|
			fn(closedctx)
 | 
						|
 | 
						|
		// Queue is empty
 | 
						|
		default:
 | 
						|
			return true
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// gatherFrames collates runtime frames from a frame iterator.
 | 
						|
func gatherFrames(iter *runtime.Frames, n int) errors.Callers {
 | 
						|
	if iter == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	frames := make([]runtime.Frame, 0, n)
 | 
						|
	for {
 | 
						|
		f, ok := iter.Next()
 | 
						|
		if !ok {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		frames = append(frames, f)
 | 
						|
	}
 | 
						|
	return frames
 | 
						|
}
 |