mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-10-31 11:52:24 -05:00 
			
		
		
		
	Add logging to the new generic worker package (#516)
* add logging to generic worker type
This commit is contained in:
		
					parent
					
						
							
								f5e006892e
							
						
					
				
			
			
				commit
				
					
						faae2505c0
					
				
			
		
					 1 changed files with 24 additions and 3 deletions
				
			
		|  | @ -3,6 +3,7 @@ package worker | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"errors" | 	"errors" | ||||||
|  | 	"reflect" | ||||||
| 	"runtime" | 	"runtime" | ||||||
| 
 | 
 | ||||||
| 	"codeberg.org/gruf/go-runners" | 	"codeberg.org/gruf/go-runners" | ||||||
|  | @ -13,6 +14,7 @@ import ( | ||||||
| type Worker[MsgType any] struct { | type Worker[MsgType any] struct { | ||||||
| 	workers runners.WorkerPool | 	workers runners.WorkerPool | ||||||
| 	process func(context.Context, MsgType) error | 	process func(context.Context, MsgType) error | ||||||
|  | 	prefix  string // contains type prefix for logging | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // New returns a new Worker[MsgType] with given number of workers and queue size | // New returns a new Worker[MsgType] with given number of workers and queue size | ||||||
|  | @ -20,47 +22,66 @@ type Worker[MsgType any] struct { | ||||||
| // defaults are determined from the runtime's GOMAXPROCS variable. | // defaults are determined from the runtime's GOMAXPROCS variable. | ||||||
| func New[MsgType any](workers int, queue int) *Worker[MsgType] { | func New[MsgType any](workers int, queue int) *Worker[MsgType] { | ||||||
| 	if workers < 1 { | 	if workers < 1 { | ||||||
|  | 		// ensure sensible workers | ||||||
| 		workers = runtime.GOMAXPROCS(0) | 		workers = runtime.GOMAXPROCS(0) | ||||||
| 	} | 	} | ||||||
| 	if queue < 1 { | 	if queue < 1 { | ||||||
|  | 		// ensure sensible queue | ||||||
| 		queue = workers * 100 | 		queue = workers * 100 | ||||||
| 	} | 	} | ||||||
| 	return &Worker[MsgType]{ | 
 | ||||||
|  | 	w := &Worker[MsgType]{ | ||||||
| 		workers: runners.NewWorkerPool(workers, queue), | 		workers: runners.NewWorkerPool(workers, queue), | ||||||
| 		process: nil, | 		process: nil, | ||||||
|  | 		prefix:  reflect.TypeOf(Worker[MsgType]{}).String(), //nolint | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	// Log new worker creation with type prefix | ||||||
|  | 	logrus.Infof("%s created with workers=%d queue=%d", w.prefix, workers, queue) | ||||||
|  | 
 | ||||||
|  | 	return w | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Start will attempt to start the underlying worker pool, or return error. | // Start will attempt to start the underlying worker pool, or return error. | ||||||
| func (w *Worker[MsgType]) Start() error { | func (w *Worker[MsgType]) Start() error { | ||||||
|  | 	logrus.Info(w.prefix, "starting") | ||||||
|  | 
 | ||||||
|  | 	// Check processor was set | ||||||
| 	if w.process == nil { | 	if w.process == nil { | ||||||
| 		return errors.New("nil Worker.process function") | 		return errors.New("nil Worker.process function") | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	// Attempt to start pool | ||||||
| 	if !w.workers.Start() { | 	if !w.workers.Start() { | ||||||
| 		return errors.New("failed to start Worker pool") | 		return errors.New("failed to start Worker pool") | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Stop will attempt to stop the underlying worker pool, or return error. | // Stop will attempt to stop the underlying worker pool, or return error. | ||||||
| func (w *Worker[MsgType]) Stop() error { | func (w *Worker[MsgType]) Stop() error { | ||||||
|  | 	logrus.Info(w.prefix, "stopping") | ||||||
|  | 
 | ||||||
|  | 	// Attempt to stop pool | ||||||
| 	if !w.workers.Stop() { | 	if !w.workers.Stop() { | ||||||
| 		return errors.New("failed to stop Worker pool") | 		return errors.New("failed to stop Worker pool") | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // SetProcessor will set the Worker's processor function, which is called for each queued message. | // SetProcessor will set the Worker's processor function, which is called for each queued message. | ||||||
| func (w *Worker[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) { | func (w *Worker[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) { | ||||||
| 	if w.process != nil { | 	if w.process != nil { | ||||||
| 		logrus.Panic("Worker.process is already set") | 		logrus.Panic(w.prefix, "Worker.process is already set") | ||||||
| 	} | 	} | ||||||
| 	w.process = fn | 	w.process = fn | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Queue will queue provided message to be processed with there's a free worker. | // Queue will queue provided message to be processed with there's a free worker. | ||||||
| func (w *Worker[MsgType]) Queue(msg MsgType) { | func (w *Worker[MsgType]) Queue(msg MsgType) { | ||||||
| 	logrus.Tracef("queueing %[1]T message; %+[1]v", msg) | 	logrus.Tracef("%s queueing message: %+v", w.prefix, msg) | ||||||
| 	w.workers.Enqueue(func(ctx context.Context) { | 	w.workers.Enqueue(func(ctx context.Context) { | ||||||
| 		if err := w.process(ctx, msg); err != nil { | 		if err := w.process(ctx, msg); err != nil { | ||||||
| 			logrus.Error(err) | 			logrus.Error(err) | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue