| 
									
										
										
										
											2022-05-26 11:51:59 +02:00
										 |  |  | /* | 
					
						
							|  |  |  |    GoToSocial | 
					
						
							| 
									
										
										
										
											2023-01-05 12:43:00 +01:00
										 |  |  |    Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org | 
					
						
							| 
									
										
										
										
											2022-05-26 11:51:59 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |    This program is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  |    it under the terms of the GNU Affero General Public License as published by | 
					
						
							|  |  |  |    the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  |    (at your option) any later version. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |    This program is distributed in the hope that it will be useful, | 
					
						
							|  |  |  |    but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  |    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
					
						
							|  |  |  |    GNU Affero General Public License for more details. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |    You should have received a copy of the GNU Affero General Public License | 
					
						
							|  |  |  |    along with this program.  If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  | */ | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | package concurrency | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"path" | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 	"reflect" | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	"runtime" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-08 09:35:24 +00:00
										 |  |  | 	"codeberg.org/gruf/go-kv" | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	"codeberg.org/gruf/go-runners" | 
					
						
							| 
									
										
										
										
											2022-07-19 09:47:55 +01:00
										 |  |  | 	"github.com/superseriousbusiness/gotosocial/internal/log" | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | // WorkerPool represents a proccessor for MsgType objects, using a worker pool to allocate resources. | 
					
						
							|  |  |  | type WorkerPool[MsgType any] struct { | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	workers runners.WorkerPool | 
					
						
							|  |  |  | 	process func(context.Context, MsgType) error | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 	nw, nq  int | 
					
						
							| 
									
										
										
										
											2022-11-08 09:35:24 +00:00
										 |  |  | 	wtype   string // contains worker type for logging | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | // New returns a new WorkerPool[MsgType] with given number of workers and queue ratio, | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | // where the queue ratio is multiplied by no. workers to get queue size. If args < 1 | 
					
						
							|  |  |  | // then suitable defaults are determined from the runtime's GOMAXPROCS variable. | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType] { | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 	var zero MsgType | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	if workers < 1 { | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 		// ensure sensible workers | 
					
						
							| 
									
										
										
										
											2022-06-11 11:01:34 +02:00
										 |  |  | 		workers = runtime.GOMAXPROCS(0) * 4 | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 	if queueRatio < 1 { | 
					
						
							|  |  |  | 		// ensure sensible ratio | 
					
						
							|  |  |  | 		queueRatio = 100 | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 	// Calculate the short type string for the msg type | 
					
						
							|  |  |  | 	msgType := reflect.TypeOf(zero).String() | 
					
						
							|  |  |  | 	_, msgType = path.Split(msgType) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | 	w := &WorkerPool[MsgType]{ | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 		process: nil, | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 		nw:      workers, | 
					
						
							|  |  |  | 		nq:      workers * queueRatio, | 
					
						
							| 
									
										
										
										
											2022-11-08 09:35:24 +00:00
										 |  |  | 		wtype:   fmt.Sprintf("worker.Worker[%s]", msgType), | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-08 09:35:24 +00:00
										 |  |  | 	// Log new worker creation with worker type prefix | 
					
						
							| 
									
										
										
										
											2023-02-17 12:02:29 +01:00
										 |  |  | 	log.Infof(nil, "%s created with workers=%d queue=%d", | 
					
						
							| 
									
										
										
										
											2022-11-08 09:35:24 +00:00
										 |  |  | 		w.wtype, | 
					
						
							| 
									
										
										
										
											2022-05-07 16:36:01 +01:00
										 |  |  | 		workers, | 
					
						
							|  |  |  | 		workers*queueRatio, | 
					
						
							|  |  |  | 	) | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	return w | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Start will attempt to start the underlying worker pool, or return error. | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | func (w *WorkerPool[MsgType]) Start() error { | 
					
						
							| 
									
										
										
										
											2023-02-17 12:02:29 +01:00
										 |  |  | 	log.Infof(nil, "%s starting", w.wtype) | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Check processor was set | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	if w.process == nil { | 
					
						
							|  |  |  | 		return errors.New("nil Worker.process function") | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Attempt to start pool | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 	if !w.workers.Start(w.nw, w.nq) { | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 		return errors.New("failed to start Worker pool") | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Stop will attempt to stop the underlying worker pool, or return error. | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | func (w *WorkerPool[MsgType]) Stop() error { | 
					
						
							| 
									
										
										
										
											2023-02-17 12:02:29 +01:00
										 |  |  | 	log.Infof(nil, "%s stopping", w.wtype) | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Attempt to stop pool | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	if !w.workers.Stop() { | 
					
						
							|  |  |  | 		return errors.New("failed to stop Worker pool") | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-05-01 15:45:15 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // SetProcessor will set the Worker's processor function, which is called for each queued message. | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | func (w *WorkerPool[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) { | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	if w.process != nil { | 
					
						
							| 
									
										
										
										
											2023-02-17 12:02:29 +01:00
										 |  |  | 		log.Panicf(nil, "%s Worker.process is already set", w.wtype) | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	w.process = fn | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Queue will queue provided message to be processed with there's a free worker. | 
					
						
							| 
									
										
										
										
											2022-05-15 10:16:43 +01:00
										 |  |  | func (w *WorkerPool[MsgType]) Queue(msg MsgType) { | 
					
						
							| 
									
										
										
										
											2023-02-17 12:02:29 +01:00
										 |  |  | 	log.Tracef(nil, "%s queueing message: %+v", w.wtype, msg) | 
					
						
							| 
									
										
										
										
											2022-11-08 09:35:24 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Create new process function for msg | 
					
						
							|  |  |  | 	process := func(ctx context.Context) { | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 		if err := w.process(ctx, msg); err != nil { | 
					
						
							| 
									
										
										
										
											2022-11-08 09:35:24 +00:00
										 |  |  | 			log.WithFields(kv.Fields{ | 
					
						
							|  |  |  | 				kv.Field{K: "type", V: w.wtype}, | 
					
						
							|  |  |  | 				kv.Field{K: "error", V: err}, | 
					
						
							|  |  |  | 			}...).Error("message processing error") | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2022-11-08 09:35:24 +00:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Attempt a fast-enqueue of process | 
					
						
							|  |  |  | 	if !w.workers.EnqueueNow(process) { | 
					
						
							|  |  |  | 		// No spot acquired, log warning | 
					
						
							|  |  |  | 		log.WithFields(kv.Fields{ | 
					
						
							|  |  |  | 			kv.Field{K: "type", V: w.wtype}, | 
					
						
							|  |  |  | 			kv.Field{K: "queue", V: w.workers.Queue()}, | 
					
						
							|  |  |  | 		}...).Warn("full worker queue") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Block on enqueuing process func | 
					
						
							|  |  |  | 		w.workers.Enqueue(process) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-04-28 13:23:11 +01:00
										 |  |  | } |