mirror of
				https://github.com/superseriousbusiness/gotosocial.git
				synced 2025-11-03 18:22:25 -06:00 
			
		
		
		
	* update go-cache library to fix critical bug regarding cache sweep scheduling Signed-off-by: kim <grufwub@gmail.com> * update go-sched Signed-off-by: kim <grufwub@gmail.com>
		
			
				
	
	
		
			250 lines
		
	
	
	
		
			5.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			250 lines
		
	
	
	
		
			5.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package sched
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"sort"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"codeberg.org/gruf/go-atomics"
 | 
						|
	"codeberg.org/gruf/go-runners"
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	// neverticks is a timer channel that never ticks (it's starved).
 | 
						|
	neverticks = make(chan time.Time)
 | 
						|
 | 
						|
	// alwaysticks is a timer channel that always ticks (it's closed).
 | 
						|
	alwaysticks = func() chan time.Time {
 | 
						|
		ch := make(chan time.Time)
 | 
						|
		close(ch)
 | 
						|
		return ch
 | 
						|
	}()
 | 
						|
)
 | 
						|
 | 
						|
// Scheduler provides a means of running jobs at specific times and
 | 
						|
// regular intervals, all while sharing a single underlying timer.
 | 
						|
type Scheduler struct {
 | 
						|
	jobs []*Job           // jobs is a list of tracked Jobs to be executed
 | 
						|
	jch  chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs
 | 
						|
	svc  runners.Service  // svc manages the main scheduler routine
 | 
						|
	jid  atomics.Uint64   // jid is used to iteratively generate unique IDs for jobs
 | 
						|
}
 | 
						|
 | 
						|
// New returns a new Scheduler instance with given job change queue size.
 | 
						|
func NewScheduler(queue int) Scheduler {
 | 
						|
	if queue < 0 {
 | 
						|
		queue = 10
 | 
						|
	}
 | 
						|
	return Scheduler{jch: make(chan interface{}, queue)}
 | 
						|
}
 | 
						|
 | 
						|
// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run.
 | 
						|
func (sch *Scheduler) Start() bool {
 | 
						|
	return sch.svc.Run(sch.run)
 | 
						|
}
 | 
						|
 | 
						|
// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped.
 | 
						|
func (sch *Scheduler) Stop() bool {
 | 
						|
	return sch.svc.Stop()
 | 
						|
}
 | 
						|
 | 
						|
// Running will return whether Scheduler is running.
 | 
						|
func (sch *Scheduler) Running() bool {
 | 
						|
	return sch.svc.Running()
 | 
						|
}
 | 
						|
 | 
						|
// Schedule will add provided Job to the Scheduler, returning a cancel function.
 | 
						|
func (sch *Scheduler) Schedule(job *Job) (cancel func()) {
 | 
						|
	if job == nil {
 | 
						|
		// Ensure there's a job!
 | 
						|
		panic("nil job")
 | 
						|
	}
 | 
						|
 | 
						|
	// Get last known job ID
 | 
						|
	last := sch.jid.Load()
 | 
						|
 | 
						|
	// Give this job an ID and check overflow
 | 
						|
	if job.id = sch.jid.Add(1); job.id < last {
 | 
						|
		panic("scheduler job id overflow")
 | 
						|
	}
 | 
						|
 | 
						|
	// Pass job to scheduler
 | 
						|
	sch.jch <- job
 | 
						|
 | 
						|
	// Return cancel function for job ID
 | 
						|
	return func() { sch.jch <- job.id }
 | 
						|
}
 | 
						|
 | 
						|
// run is the main scheduler run routine, which runs for as long as ctx is valid.
 | 
						|
func (sch *Scheduler) run(ctx context.Context) {
 | 
						|
	var (
 | 
						|
		// timerset represents whether timer was running
 | 
						|
		// for a particular run of the loop. false means
 | 
						|
		// that tch == neverticks || tch == alwaysticks
 | 
						|
		timerset bool
 | 
						|
 | 
						|
		// timer tick channel (or a never-tick channel)
 | 
						|
		tch <-chan time.Time
 | 
						|
 | 
						|
		// timer notifies this main routine to wake when
 | 
						|
		// the job queued needs to be checked for executions
 | 
						|
		timer *time.Timer
 | 
						|
 | 
						|
		// stopdrain will stop and drain the timer
 | 
						|
		// if it has been running (i.e. timerset == true)
 | 
						|
		stopdrain = func() {
 | 
						|
			if timerset && !timer.Stop() {
 | 
						|
				<-timer.C
 | 
						|
			}
 | 
						|
		}
 | 
						|
	)
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		// Handle received job/id
 | 
						|
		case v := <-sch.jch:
 | 
						|
			sch.handle(v)
 | 
						|
			continue
 | 
						|
 | 
						|
		// No more
 | 
						|
		default:
 | 
						|
		}
 | 
						|
 | 
						|
		// Done
 | 
						|
		break
 | 
						|
	}
 | 
						|
 | 
						|
	// Create a stopped timer
 | 
						|
	timer = time.NewTimer(1)
 | 
						|
	<-timer.C
 | 
						|
 | 
						|
	for {
 | 
						|
		// Reset timer state
 | 
						|
		timerset = false
 | 
						|
 | 
						|
		if len(sch.jobs) > 0 {
 | 
						|
			// Sort jobs by next occurring
 | 
						|
			sort.Sort(byNext(sch.jobs))
 | 
						|
 | 
						|
			// Get execution time
 | 
						|
			now := time.Now()
 | 
						|
 | 
						|
			// Get next job time
 | 
						|
			next := sch.jobs[0].Next()
 | 
						|
 | 
						|
			// If this job is _just_ about to be ready, we
 | 
						|
			// don't bother sleeping. It's wasted cycles only
 | 
						|
			// sleeping for some obscenely tiny amount of time
 | 
						|
			// we can't guarantee precision for.
 | 
						|
			const precision = time.Millisecond
 | 
						|
 | 
						|
			if until := next.Sub(now); until <= precision/1e3 {
 | 
						|
				// This job is behind schedule,
 | 
						|
				// set timer to always tick
 | 
						|
				tch = alwaysticks
 | 
						|
			} else {
 | 
						|
				// Reset timer to period
 | 
						|
				timer.Reset(until)
 | 
						|
				tch = timer.C
 | 
						|
				timerset = true
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			// Unset timer
 | 
						|
			tch = neverticks
 | 
						|
		}
 | 
						|
 | 
						|
		select {
 | 
						|
		// Scheduler stopped
 | 
						|
		case <-ctx.Done():
 | 
						|
			stopdrain()
 | 
						|
			return
 | 
						|
 | 
						|
		// Timer ticked, run scheduled
 | 
						|
		case now := <-tch:
 | 
						|
			if !timerset {
 | 
						|
				// alwaysticks returns zero times
 | 
						|
				now = time.Now()
 | 
						|
			}
 | 
						|
			sch.schedule(now)
 | 
						|
 | 
						|
		// Received update, handle job/id
 | 
						|
		case v := <-sch.jch:
 | 
						|
			sch.handle(v)
 | 
						|
			stopdrain()
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// handle takes an interfaces received from Scheduler.jch and handles either:
 | 
						|
// - Job --> new job to add.
 | 
						|
// - uint64 --> job ID to remove.
 | 
						|
func (sch *Scheduler) handle(v interface{}) {
 | 
						|
	switch v := v.(type) {
 | 
						|
	// New job added
 | 
						|
	case *Job:
 | 
						|
		// Get current time
 | 
						|
		now := time.Now()
 | 
						|
 | 
						|
		// Update the next call time
 | 
						|
		next := v.timing.Next(now)
 | 
						|
		v.next.Store(next)
 | 
						|
 | 
						|
		// Append this job to queued
 | 
						|
		sch.jobs = append(sch.jobs, v)
 | 
						|
 | 
						|
	// Job removed
 | 
						|
	case uint64:
 | 
						|
		for i := 0; i < len(sch.jobs); i++ {
 | 
						|
			if sch.jobs[i].id == v {
 | 
						|
				// This is the job we're looking for! Drop this
 | 
						|
				sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time.
 | 
						|
func (sch *Scheduler) schedule(now time.Time) {
 | 
						|
	for i := 0; i < len(sch.jobs); {
 | 
						|
		// Scope our own var
 | 
						|
		job := sch.jobs[i]
 | 
						|
 | 
						|
		// We know these jobs are ordered by .Next(), so as soon
 | 
						|
		// as we reach one with .Next() after now, we can return
 | 
						|
		if job.Next().After(now) {
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// Update the next call time
 | 
						|
		next := job.timing.Next(now)
 | 
						|
		job.next.Store(next)
 | 
						|
 | 
						|
		// Run this job async!
 | 
						|
		go job.Run(now)
 | 
						|
 | 
						|
		if next.IsZero() {
 | 
						|
			// Zero time, this job is done and can be dropped
 | 
						|
			sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		// Iter
 | 
						|
		i++
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time.
 | 
						|
type byNext []*Job
 | 
						|
 | 
						|
func (by byNext) Len() int {
 | 
						|
	return len(by)
 | 
						|
}
 | 
						|
 | 
						|
func (by byNext) Less(i int, j int) bool {
 | 
						|
	return by[i].Next().Before(by[j].Next())
 | 
						|
}
 | 
						|
 | 
						|
func (by byNext) Swap(i int, j int) {
 | 
						|
	by[i], by[j] = by[j], by[i]
 | 
						|
}
 |