| 
									
										
										
										
											2022-05-26 11:51:59 +02:00
										 |  |  | /* | 
					
						
							|  |  |  |    GoToSocial | 
					
						
							|  |  |  |    Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |    This program is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  |    it under the terms of the GNU Affero General Public License as published by | 
					
						
							|  |  |  |    the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  |    (at your option) any later version. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |    This program is distributed in the hope that it will be useful, | 
					
						
							|  |  |  |    but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  |    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
					
						
							|  |  |  |    GNU Affero General Public License for more details. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |    You should have received a copy of the GNU Affero General Public License | 
					
						
							|  |  |  |    along with this program.  If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  | */ | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | package concurrency | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"path" | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 	"reflect" | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	"runtime" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"codeberg.org/gruf/go-runners" | 
					
						
							|  |  |  | 	"github.com/sirupsen/logrus" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | // WorkerPool represents a proccessor for MsgType objects, using a worker pool to allocate resources. | 
					
						
							|  |  |  | type WorkerPool[MsgType any] struct { | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	workers runners.WorkerPool | 
					
						
							|  |  |  | 	process func(context.Context, MsgType) error | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 	prefix  string // contains type prefix for logging | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | // New returns a new WorkerPool[MsgType] with given number of workers and queue ratio, | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | // where the queue ratio is multiplied by no. workers to get queue size. If args < 1 | 
					
						
							|  |  |  | // then suitable defaults are determined from the runtime's GOMAXPROCS variable. | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType] { | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 	var zero MsgType | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	if workers < 1 { | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 		// ensure sensible workers | 
					
						
							| 
									
										
										
										
											2022-05-26 11:51:59 +02:00
										 |  |  | 		workers = runtime.GOMAXPROCS(0) * 2 | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 	if queueRatio < 1 { | 
					
						
							|  |  |  | 		// ensure sensible ratio | 
					
						
							|  |  |  | 		queueRatio = 100 | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 	// Calculate the short type string for the msg type | 
					
						
							|  |  |  | 	msgType := reflect.TypeOf(zero).String() | 
					
						
							|  |  |  | 	_, msgType = path.Split(msgType) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | 	w := &WorkerPool[MsgType]{ | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 		workers: runners.NewWorkerPool(workers, workers*queueRatio), | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 		process: nil, | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 		prefix:  fmt.Sprintf("worker.Worker[%s]", msgType), | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Log new worker creation with type prefix | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 	logrus.Infof("%s created with workers=%d queue=%d", | 
					
						
							|  |  |  | 		w.prefix, | 
					
						
							|  |  |  | 		workers, | 
					
						
							|  |  |  | 		workers*queueRatio, | 
					
						
							|  |  |  | 	) | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	return w | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Start will attempt to start the underlying worker pool, or return error. | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | func (w *WorkerPool[MsgType]) Start() error { | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 	logrus.Infof("%s starting", w.prefix) | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Check processor was set | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	if w.process == nil { | 
					
						
							|  |  |  | 		return errors.New("nil Worker.process function") | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Attempt to start pool | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	if !w.workers.Start() { | 
					
						
							|  |  |  | 		return errors.New("failed to start Worker pool") | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Stop will attempt to stop the underlying worker pool, or return error. | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | func (w *WorkerPool[MsgType]) Stop() error { | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 	logrus.Infof("%s stopping", w.prefix) | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Attempt to stop pool | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	if !w.workers.Stop() { | 
					
						
							|  |  |  | 		return errors.New("failed to stop Worker pool") | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // SetProcessor will set the Worker's processor function, which is called for each queued message. | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | func (w *WorkerPool[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) { | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	if w.process != nil { | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 		logrus.Panicf("%s Worker.process is already set", w.prefix) | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	w.process = fn | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Queue will queue provided message to be processed with there's a free worker. | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | func (w *WorkerPool[MsgType]) Queue(msg MsgType) { | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 	logrus.Tracef("%s queueing message (workers=%d queue=%d): %+v", | 
					
						
							|  |  |  | 		w.prefix, w.workers.Workers(), w.workers.Queue(), msg, | 
					
						
							|  |  |  | 	) | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	w.workers.Enqueue(func(ctx context.Context) { | 
					
						
							|  |  |  | 		if err := w.process(ctx, msg); err != nil { | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 			logrus.Errorf("%s %v", w.prefix, err) | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | } |