mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-11-03 18:22:25 -06:00 
			
		
		
		
	
		
			
	
	
		
			161 lines
		
	
	
	
		
			3.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			161 lines
		
	
	
	
		
			3.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| 
								 | 
							
								package runners
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import (
							 | 
						||
| 
								 | 
							
									"context"
							 | 
						||
| 
								 | 
							
									"sync"
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// WorkerFunc represents a function processable by a worker in WorkerPool. Note
							 | 
						||
| 
								 | 
							
								// that implementations absolutely MUST check whether passed context is Done()
							 | 
						||
| 
								 | 
							
								// otherwise stopping the pool may block for large periods of time.
							 | 
						||
| 
								 | 
							
								type WorkerFunc func(context.Context)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// WorkerPool provides a means of enqueuing asynchronous work.
							 | 
						||
| 
								 | 
							
								type WorkerPool struct {
							 | 
						||
| 
								 | 
							
									queue chan WorkerFunc
							 | 
						||
| 
								 | 
							
									free  chan struct{}
							 | 
						||
| 
								 | 
							
									wait  sync.WaitGroup
							 | 
						||
| 
								 | 
							
									svc   Service
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// NewWorkerPool returns a new WorkerPool with provided worker count and WorkerFunc queue size.
							 | 
						||
| 
								 | 
							
								// The number of workers represents how many WorkerFuncs can be executed simultaneously, and the
							 | 
						||
| 
								 | 
							
								// queue size represents the max number of WorkerFuncs that can be queued at any one time.
							 | 
						||
| 
								 | 
							
								func NewWorkerPool(workers int, queue int) WorkerPool {
							 | 
						||
| 
								 | 
							
									return WorkerPool{
							 | 
						||
| 
								 | 
							
										queue: make(chan WorkerFunc, queue),
							 | 
						||
| 
								 | 
							
										free:  make(chan struct{}, workers),
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Start will attempt to start the worker pool, asynchronously. Return is success state.
							 | 
						||
| 
								 | 
							
								func (pool *WorkerPool) Start() bool {
							 | 
						||
| 
								 | 
							
									ok := true
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									done := make(chan struct{})
							 | 
						||
| 
								 | 
							
									go func() {
							 | 
						||
| 
								 | 
							
										ok = pool.svc.Run(func(ctx context.Context) {
							 | 
						||
| 
								 | 
							
											close(done)
							 | 
						||
| 
								 | 
							
											pool.process(ctx)
							 | 
						||
| 
								 | 
							
										})
							 | 
						||
| 
								 | 
							
										if !ok {
							 | 
						||
| 
								 | 
							
											close(done)
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}()
							 | 
						||
| 
								 | 
							
									<-done
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									return ok
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Stop will attempt to stop the worker pool, this will block until stopped. Return is success state.
							 | 
						||
| 
								 | 
							
								func (pool *WorkerPool) Stop() bool {
							 | 
						||
| 
								 | 
							
									return pool.svc.Stop()
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Running returns whether the worker pool is running.
							 | 
						||
| 
								 | 
							
								func (pool *WorkerPool) Running() bool {
							 | 
						||
| 
								 | 
							
									return pool.svc.Running()
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// execute will take a queued function and pass it to a free worker when available.
							 | 
						||
| 
								 | 
							
								func (pool *WorkerPool) execute(ctx context.Context, fn WorkerFunc) {
							 | 
						||
| 
								 | 
							
									// Set as running
							 | 
						||
| 
								 | 
							
									pool.wait.Add(1)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									select {
							 | 
						||
| 
								 | 
							
									// Pool context cancelled
							 | 
						||
| 
								 | 
							
									case <-ctx.Done():
							 | 
						||
| 
								 | 
							
										pool.wait.Done()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// Free worker acquired
							 | 
						||
| 
								 | 
							
									case pool.free <- struct{}{}:
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									go func() {
							 | 
						||
| 
								 | 
							
										defer func() {
							 | 
						||
| 
								 | 
							
											// defer in case panic
							 | 
						||
| 
								 | 
							
											<-pool.free
							 | 
						||
| 
								 | 
							
											pool.wait.Done()
							 | 
						||
| 
								 | 
							
										}()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										// Run queued
							 | 
						||
| 
								 | 
							
										fn(ctx)
							 | 
						||
| 
								 | 
							
									}()
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// process is the background processing routine that passes queued functions to workers.
							 | 
						||
| 
								 | 
							
								func (pool *WorkerPool) process(ctx context.Context) {
							 | 
						||
| 
								 | 
							
									for {
							 | 
						||
| 
								 | 
							
										select {
							 | 
						||
| 
								 | 
							
										// Pool context cancelled
							 | 
						||
| 
								 | 
							
										case <-ctx.Done():
							 | 
						||
| 
								 | 
							
											for {
							 | 
						||
| 
								 | 
							
												select {
							 | 
						||
| 
								 | 
							
												// Pop and execute queued
							 | 
						||
| 
								 | 
							
												case fn := <-pool.queue:
							 | 
						||
| 
								 | 
							
													fn(ctx) // ctx is closed
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
												// Empty, wait for workers
							 | 
						||
| 
								 | 
							
												default:
							 | 
						||
| 
								 | 
							
													pool.wait.Wait()
							 | 
						||
| 
								 | 
							
													return
							 | 
						||
| 
								 | 
							
												}
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										// Queued func received
							 | 
						||
| 
								 | 
							
										case fn := <-pool.queue:
							 | 
						||
| 
								 | 
							
											pool.execute(ctx, fn)
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker.
							 | 
						||
| 
								 | 
							
								// Note that 'fn' will ALWAYS be executed, and the supplied context will specify whether this 'fn'
							 | 
						||
| 
								 | 
							
								// is being executed during normal pool execution, or if the pool has been stopped with <-ctx.Done().
							 | 
						||
| 
								 | 
							
								func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
							 | 
						||
| 
								 | 
							
									// Check valid fn
							 | 
						||
| 
								 | 
							
									if fn == nil {
							 | 
						||
| 
								 | 
							
										return
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									select {
							 | 
						||
| 
								 | 
							
									// Pool context cancelled
							 | 
						||
| 
								 | 
							
									case <-pool.svc.Done():
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// Placed fn in queue
							 | 
						||
| 
								 | 
							
									case pool.queue <- fn:
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// EnqueueNoBlock performs Enqueue but returns false if queue size is at max. Else, true.
							 | 
						||
| 
								 | 
							
								func (pool *WorkerPool) EnqueueNoBlock(fn WorkerFunc) bool {
							 | 
						||
| 
								 | 
							
									// Check valid fn
							 | 
						||
| 
								 | 
							
									if fn == nil {
							 | 
						||
| 
								 | 
							
										return false
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									select {
							 | 
						||
| 
								 | 
							
									// Pool context cancelled
							 | 
						||
| 
								 | 
							
									case <-pool.svc.Done():
							 | 
						||
| 
								 | 
							
										return false
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// Placed fn in queue
							 | 
						||
| 
								 | 
							
									case pool.queue <- fn:
							 | 
						||
| 
								 | 
							
										return true
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// Queue is full
							 | 
						||
| 
								 | 
							
									default:
							 | 
						||
| 
								 | 
							
										return false
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Queue returns the number of currently queued WorkerFuncs.
							 | 
						||
| 
								 | 
							
								func (pool *WorkerPool) Queue() int {
							 | 
						||
| 
								 | 
							
									return len(pool.queue)
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// Workers returns the number of currently active workers.
							 | 
						||
| 
								 | 
							
								func (pool *WorkerPool) Workers() int {
							 | 
						||
| 
								 | 
							
									return len(pool.free)
							 | 
						||
| 
								 | 
							
								}
							 |