| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | package sched | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"sort" | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 	"sync" | 
					
						
							|  |  |  | 	"sync/atomic" | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"codeberg.org/gruf/go-runners" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | // precision is the maximum time we can offer scheduler run-time precision down to. | 
					
						
							|  |  |  | const precision = time.Millisecond | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | var ( | 
					
						
							|  |  |  | 	// neverticks is a timer channel that never ticks (it's starved). | 
					
						
							|  |  |  | 	neverticks = make(chan time.Time) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// alwaysticks is a timer channel that always ticks (it's closed). | 
					
						
							|  |  |  | 	alwaysticks = func() chan time.Time { | 
					
						
							|  |  |  | 		ch := make(chan time.Time) | 
					
						
							|  |  |  | 		close(ch) | 
					
						
							|  |  |  | 		return ch | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Scheduler provides a means of running jobs at specific times and | 
					
						
							|  |  |  | // regular intervals, all while sharing a single underlying timer. | 
					
						
							|  |  |  | type Scheduler struct { | 
					
						
							|  |  |  | 	jobs []*Job           // jobs is a list of tracked Jobs to be executed | 
					
						
							|  |  |  | 	jch  chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs | 
					
						
							|  |  |  | 	svc  runners.Service  // svc manages the main scheduler routine | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 	jid  atomic.Uint64    // jid is used to iteratively generate unique IDs for jobs | 
					
						
							| 
									
										
										
										
											2023-01-06 10:16:09 +00:00
										 |  |  | 	rgo  func(func())     // goroutine runner, allows using goroutine pool to launch jobs | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run. | 
					
						
							| 
									
										
										
										
											2023-01-06 10:16:09 +00:00
										 |  |  | func (sch *Scheduler) Start(gorun func(func())) bool { | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 	var block sync.Mutex | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Use mutex to synchronize between started | 
					
						
							|  |  |  | 	// goroutine and ourselves, to ensure that | 
					
						
							|  |  |  | 	// we don't return before Scheduler init'd. | 
					
						
							|  |  |  | 	block.Lock() | 
					
						
							|  |  |  | 	defer block.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	ok := sch.svc.GoRun(func(ctx context.Context) { | 
					
						
							|  |  |  | 		// Create Scheduler job channel | 
					
						
							|  |  |  | 		sch.jch = make(chan interface{}) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-01-06 10:16:09 +00:00
										 |  |  | 		// Set goroutine runner function | 
					
						
							|  |  |  | 		if sch.rgo = gorun; sch.rgo == nil { | 
					
						
							|  |  |  | 			sch.rgo = func(f func()) { go f() } | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-01-06 10:16:09 +00:00
										 |  |  | 		// Unlock start routine | 
					
						
							|  |  |  | 		block.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 		// Enter main loop | 
					
						
							|  |  |  | 		sch.run(ctx) | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if ok { | 
					
						
							|  |  |  | 		// Wait on goroutine | 
					
						
							|  |  |  | 		block.Lock() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return ok | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped. | 
					
						
							|  |  |  | func (sch *Scheduler) Stop() bool { | 
					
						
							|  |  |  | 	return sch.svc.Stop() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | // Running will return whether Scheduler is running (i.e. NOT stopped / stopping). | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | func (sch *Scheduler) Running() bool { | 
					
						
							|  |  |  | 	return sch.svc.Running() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | // Done returns a channel that's closed when Scheduler.Stop() is called. | 
					
						
							|  |  |  | func (sch *Scheduler) Done() <-chan struct{} { | 
					
						
							|  |  |  | 	return sch.svc.Done() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | // Schedule will add provided Job to the Scheduler, returning a cancel function. | 
					
						
							|  |  |  | func (sch *Scheduler) Schedule(job *Job) (cancel func()) { | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 	switch { | 
					
						
							|  |  |  | 	// Check a job was passed | 
					
						
							|  |  |  | 	case job == nil: | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 		panic("nil job") | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Check we are running | 
					
						
							| 
									
										
										
										
											2023-01-06 10:16:09 +00:00
										 |  |  | 	case !sch.Running(): | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 		panic("scheduler not running") | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 	// Calculate next job ID | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 	last := sch.jid.Load() | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 	next := sch.jid.Add(1) | 
					
						
							|  |  |  | 	if next < last { | 
					
						
							|  |  |  | 		panic("job id overflow") | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Pass job to scheduler | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 	job.id = next | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 	sch.jch <- job | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 	// Take ptrs to current state chs | 
					
						
							|  |  |  | 	ctx := sch.svc.Done() | 
					
						
							|  |  |  | 	jch := sch.jch | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 	// Return cancel function for job ID | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 	return func() { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		// Sched stopped | 
					
						
							|  |  |  | 		case <-ctx: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Cancel this job | 
					
						
							|  |  |  | 		case jch <- next: | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // run is the main scheduler run routine, which runs for as long as ctx is valid. | 
					
						
							|  |  |  | func (sch *Scheduler) run(ctx context.Context) { | 
					
						
							|  |  |  | 	var ( | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 		// now stores the current time, and will only be | 
					
						
							|  |  |  | 		// set when the timer channel is set to be the | 
					
						
							|  |  |  | 		// 'alwaysticks' channel. this allows minimizing | 
					
						
							|  |  |  | 		// the number of calls required to time.Now(). | 
					
						
							|  |  |  | 		now time.Time | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 		// timerset represents whether timer was running | 
					
						
							|  |  |  | 		// for a particular run of the loop. false means | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 		// that tch == neverticks || tch == alwaysticks. | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 		timerset bool | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 		// timer tick channel (or always / never ticks). | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 		tch <-chan time.Time | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// timer notifies this main routine to wake when | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 		// the job queued needs to be checked for executions. | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 		timer *time.Timer | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// stopdrain will stop and drain the timer | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 		// if it has been running (i.e. timerset == true). | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 		stopdrain = func() { | 
					
						
							|  |  |  | 			if timerset && !timer.Stop() { | 
					
						
							|  |  |  | 				<-timer.C | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 	// Create a stopped timer. | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 	timer = time.NewTimer(1) | 
					
						
							|  |  |  | 	<-timer.C | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for { | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 		// Reset timer state. | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 		timerset = false | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		if len(sch.jobs) > 0 { | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 			// Get now time. | 
					
						
							|  |  |  | 			now = time.Now() | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 			// Sort jobs by next occurring. | 
					
						
							|  |  |  | 			sort.Sort(byNext(sch.jobs)) | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 			// Get next job time. | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 			next := sch.jobs[0].Next() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 			// If this job is _just_ about to be ready, we don't bother | 
					
						
							|  |  |  | 			// sleeping. It's wasted cycles only sleeping for some obscenely | 
					
						
							|  |  |  | 			// tiny amount of time we can't guarantee precision for. | 
					
						
							| 
									
										
										
										
											2022-07-22 11:43:34 +01:00
										 |  |  | 			if until := next.Sub(now); until <= precision/1e3 { | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 				// This job is behind, | 
					
						
							|  |  |  | 				// set to always tick. | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 				tch = alwaysticks | 
					
						
							|  |  |  | 			} else { | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 				// Reset timer to period. | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 				timer.Reset(until) | 
					
						
							|  |  |  | 				tch = timer.C | 
					
						
							|  |  |  | 				timerset = true | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			// Unset timer | 
					
						
							|  |  |  | 			tch = neverticks | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		// Scheduler stopped | 
					
						
							|  |  |  | 		case <-ctx.Done(): | 
					
						
							|  |  |  | 			stopdrain() | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Timer ticked, run scheduled | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 		case t := <-tch: | 
					
						
							| 
									
										
										
										
											2022-07-22 11:43:34 +01:00
										 |  |  | 			if !timerset { | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 				// 'alwaysticks' returns zero | 
					
						
							|  |  |  | 				// times, BUT 'now' will have | 
					
						
							|  |  |  | 				// been set during above sort. | 
					
						
							|  |  |  | 				t = now | 
					
						
							| 
									
										
										
										
											2022-07-22 11:43:34 +01:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2023-02-13 18:40:48 +00:00
										 |  |  | 			sch.schedule(t) | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		// Received update, handle job/id | 
					
						
							|  |  |  | 		case v := <-sch.jch: | 
					
						
							|  |  |  | 			sch.handle(v) | 
					
						
							|  |  |  | 			stopdrain() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // handle takes an interfaces received from Scheduler.jch and handles either: | 
					
						
							|  |  |  | // - Job --> new job to add. | 
					
						
							|  |  |  | // - uint64 --> job ID to remove. | 
					
						
							|  |  |  | func (sch *Scheduler) handle(v interface{}) { | 
					
						
							|  |  |  | 	switch v := v.(type) { | 
					
						
							|  |  |  | 	// New job added | 
					
						
							|  |  |  | 	case *Job: | 
					
						
							|  |  |  | 		// Get current time | 
					
						
							|  |  |  | 		now := time.Now() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Update the next call time | 
					
						
							|  |  |  | 		next := v.timing.Next(now) | 
					
						
							| 
									
										
										
										
											2024-09-26 19:23:41 +00:00
										 |  |  | 		storeTime(&v.next, next) | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		// Append this job to queued | 
					
						
							|  |  |  | 		sch.jobs = append(sch.jobs, v) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Job removed | 
					
						
							|  |  |  | 	case uint64: | 
					
						
							|  |  |  | 		for i := 0; i < len(sch.jobs); i++ { | 
					
						
							|  |  |  | 			if sch.jobs[i].id == v { | 
					
						
							|  |  |  | 				// This is the job we're looking for! Drop this | 
					
						
							|  |  |  | 				sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) | 
					
						
							|  |  |  | 				return | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time. | 
					
						
							|  |  |  | func (sch *Scheduler) schedule(now time.Time) { | 
					
						
							|  |  |  | 	for i := 0; i < len(sch.jobs); { | 
					
						
							|  |  |  | 		// Scope our own var | 
					
						
							|  |  |  | 		job := sch.jobs[i] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// We know these jobs are ordered by .Next(), so as soon | 
					
						
							|  |  |  | 		// as we reach one with .Next() after now, we can return | 
					
						
							|  |  |  | 		if job.Next().After(now) { | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-01-06 10:16:09 +00:00
										 |  |  | 		// Pass to runner | 
					
						
							|  |  |  | 		sch.rgo(func() { | 
					
						
							|  |  |  | 			job.Run(now) | 
					
						
							|  |  |  | 		}) | 
					
						
							| 
									
										
										
										
											2022-09-28 18:30:40 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 		// Update the next call time | 
					
						
							|  |  |  | 		next := job.timing.Next(now) | 
					
						
							| 
									
										
										
										
											2024-09-26 19:23:41 +00:00
										 |  |  | 		storeTime(&job.next, next) | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-07-22 11:43:34 +01:00
										 |  |  | 		if next.IsZero() { | 
					
						
							| 
									
										
										
										
											2022-07-10 16:18:21 +01:00
										 |  |  | 			// Zero time, this job is done and can be dropped | 
					
						
							|  |  |  | 			sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Iter | 
					
						
							|  |  |  | 		i++ | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // byNext is an implementation of sort.Interface to sort Jobs by their .Next() time. | 
					
						
							|  |  |  | type byNext []*Job | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (by byNext) Len() int { | 
					
						
							|  |  |  | 	return len(by) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (by byNext) Less(i int, j int) bool { | 
					
						
							|  |  |  | 	return by[i].Next().Before(by[j].Next()) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (by byNext) Swap(i int, j int) { | 
					
						
							|  |  |  | 	by[i], by[j] = by[j], by[i] | 
					
						
							|  |  |  | } |