mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-10-29 13:52:25 -05:00
[chore] pull in latest go-cache, go-runners versions (#1306)
Signed-off-by: kim <grufwub@gmail.com> Signed-off-by: kim <grufwub@gmail.com>
This commit is contained in:
parent
0dbe6c514f
commit
adbc87700a
23 changed files with 329 additions and 865 deletions
20
vendor/codeberg.org/gruf/go-runners/context.go
generated
vendored
20
vendor/codeberg.org/gruf/go-runners/context.go
generated
vendored
|
|
@ -12,6 +12,11 @@ var closedctx = func() context.Context {
|
|||
return ctx
|
||||
}()
|
||||
|
||||
// Closed returns an always closed context.
|
||||
func Closed() context.Context {
|
||||
return closedctx
|
||||
}
|
||||
|
||||
// ContextWithCancel returns a new context.Context impl with cancel.
|
||||
func ContextWithCancel() (context.Context, context.CancelFunc) {
|
||||
ctx := make(cancelctx)
|
||||
|
|
@ -41,3 +46,18 @@ func (ctx cancelctx) Err() error {
|
|||
func (cancelctx) Value(key interface{}) interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ctx cancelctx) String() string {
|
||||
var state string
|
||||
select {
|
||||
case <-ctx:
|
||||
state = "closed"
|
||||
default:
|
||||
state = "open"
|
||||
}
|
||||
return "cancelctx{state:" + state + "}"
|
||||
}
|
||||
|
||||
func (ctx cancelctx) GoString() string {
|
||||
return "runners." + ctx.String()
|
||||
}
|
||||
|
|
|
|||
127
vendor/codeberg.org/gruf/go-runners/pool.go
generated
vendored
127
vendor/codeberg.org/gruf/go-runners/pool.go
generated
vendored
|
|
@ -2,8 +2,12 @@ package runners
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"codeberg.org/gruf/go-errors/v2"
|
||||
)
|
||||
|
||||
// WorkerFunc represents a function processable by a worker in WorkerPool. Note
|
||||
|
|
@ -26,17 +30,22 @@ func (pool *WorkerPool) Start(workers int, queue int) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
if workers < 1 {
|
||||
// Use $GOMAXPROCS as default worker count
|
||||
if workers <= 0 {
|
||||
// Use $GOMAXPROCS as default.
|
||||
workers = runtime.GOMAXPROCS(0)
|
||||
}
|
||||
|
||||
if queue < 0 {
|
||||
// Set a reasonable queue default
|
||||
queue = workers * 2
|
||||
// Use reasonable queue default.
|
||||
queue = workers * 10
|
||||
}
|
||||
|
||||
// Allocate pool queue of given size
|
||||
// Allocate pool queue of given size.
|
||||
//
|
||||
// This MUST be set BEFORE we return and NOT in
|
||||
// the launched goroutine, or there is a risk that
|
||||
// the pool may appear as closed for a short time
|
||||
// until the main goroutine has been entered.
|
||||
fns := make(chan WorkerFunc, queue)
|
||||
pool.fns = fns
|
||||
|
||||
|
|
@ -53,50 +62,49 @@ func (pool *WorkerPool) Start(workers int, queue int) bool {
|
|||
|
||||
// Start goroutine worker functions
|
||||
for i := 0; i < workers; i++ {
|
||||
wait.Add(1)
|
||||
|
||||
go func() {
|
||||
// Trigger start / stop
|
||||
wait.Add(1)
|
||||
defer wait.Done()
|
||||
|
||||
// Keep workers running on panic
|
||||
for !workerstart(ctx, fns) {
|
||||
// Run worker function.
|
||||
for !worker_run(ctx, fns) {
|
||||
// retry on panic
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Set GC finalizer to stop pool on dealloc
|
||||
// Set GC finalizer to stop pool on dealloc.
|
||||
runtime.SetFinalizer(pool, func(pool *WorkerPool) {
|
||||
pool.svc.Stop()
|
||||
_ = pool.svc.Stop()
|
||||
})
|
||||
|
||||
// Wait on ctx
|
||||
<-ctx.Done()
|
||||
|
||||
// Stop all workers
|
||||
close(pool.fns)
|
||||
// Drain function queue.
|
||||
//
|
||||
// All functions in the queue MUST be
|
||||
// run, so we pass them a closed context.
|
||||
//
|
||||
// This mainly allows us to block until
|
||||
// the function queue is empty, as worker
|
||||
// functions will also continue draining in
|
||||
// the background with the (now) closed ctx.
|
||||
for !drain_queue(fns) {
|
||||
// retry on panic
|
||||
}
|
||||
|
||||
// Now the queue is empty, we can
|
||||
// safely close the channel signalling
|
||||
// all of the workers to return.
|
||||
close(fns)
|
||||
wait.Wait()
|
||||
}()
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// workerstart is the main worker runner routine, accepting functions from 'fns' until it is closed.
|
||||
func workerstart(ctx context.Context, fns <-chan WorkerFunc) bool {
|
||||
// Recover and drop any panic
|
||||
defer func() { recover() }()
|
||||
|
||||
for {
|
||||
// Wait on next func
|
||||
fn, ok := <-fns
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
|
||||
// Run with ctx
|
||||
fn(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// Stop will stop the WorkerPool management loop, blocking until stopped.
|
||||
func (pool *WorkerPool) Stop() bool {
|
||||
return pool.svc.Stop()
|
||||
|
|
@ -124,22 +132,24 @@ func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
|
|||
|
||||
// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the
|
||||
// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc.
|
||||
func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) {
|
||||
func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool {
|
||||
// Check valid fn
|
||||
if fn == nil {
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
select {
|
||||
// Caller ctx cancelled
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
|
||||
// Pool ctx cancelled
|
||||
case <-pool.svc.Done():
|
||||
fn(closedctx)
|
||||
return false
|
||||
|
||||
// Placed fn in queue
|
||||
case pool.fns <- fn:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -167,5 +177,54 @@ func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool {
|
|||
|
||||
// Queue returns the number of currently queued WorkerFuncs.
|
||||
func (pool *WorkerPool) Queue() int {
|
||||
return len(pool.fns)
|
||||
var l int
|
||||
pool.svc.While(func() {
|
||||
l = len(pool.fns)
|
||||
})
|
||||
return l
|
||||
}
|
||||
|
||||
// worker_run is the main worker routine, accepting functions from 'fns' until it is closed.
|
||||
func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool {
|
||||
defer func() {
|
||||
// Recover and drop any panic
|
||||
if r := recover(); r != nil {
|
||||
const msg = "worker_run: recovered panic: %v\n\n%s\n"
|
||||
fmt.Fprintf(os.Stderr, msg, r, errors.GetCallers(2, 10))
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
// Wait on next func
|
||||
fn, ok := <-fns
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
|
||||
// Run with ctx
|
||||
fn(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// drain_queue will drain and run all functions in worker queue, passing in a closed context.
|
||||
func drain_queue(fns <-chan WorkerFunc) bool {
|
||||
defer func() {
|
||||
// Recover and drop any panic
|
||||
if r := recover(); r != nil {
|
||||
const msg = "drain_queue: recovered panic: %v\n\n%s\n"
|
||||
fmt.Fprintf(os.Stderr, msg, r, errors.GetCallers(2, 10))
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
// Run with closed ctx
|
||||
case fn := <-fns:
|
||||
fn(closedctx)
|
||||
|
||||
// Queue is empty
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
105
vendor/codeberg.org/gruf/go-runners/service.go
generated
vendored
105
vendor/codeberg.org/gruf/go-runners/service.go
generated
vendored
|
|
@ -8,11 +8,10 @@ import (
|
|||
// Service provides a means of tracking a single long-running service, provided protected state
|
||||
// changes and preventing multiple instances running. Also providing service state information.
|
||||
type Service struct {
|
||||
state uint32 // 0=stopped, 1=running, 2=stopping
|
||||
wait sync.Mutex // wait is the mutex used as a single-entity wait-group, i.e. just a "wait" :p
|
||||
cncl context.CancelFunc // cncl is the cancel function set for the current context
|
||||
ctx context.Context // ctx is the current context for running function (or nil if not running)
|
||||
mu sync.Mutex // mu protects state changes
|
||||
state uint32 // 0=stopped, 1=running, 2=stopping
|
||||
mutex sync.Mutex // mutext protects overall state changes
|
||||
wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex'
|
||||
ctx cancelctx // ctx is the current context for running function (or nil if not running)
|
||||
}
|
||||
|
||||
// Run will run the supplied function until completion, using given context to propagate cancel.
|
||||
|
|
@ -29,13 +28,12 @@ func (svc *Service) Run(fn func(context.Context)) bool {
|
|||
svc.wait.Unlock()
|
||||
|
||||
// ensure stopped
|
||||
svc.Stop()
|
||||
_ = svc.Stop()
|
||||
}()
|
||||
|
||||
// Run user func
|
||||
if fn != nil {
|
||||
fn(ctx)
|
||||
}
|
||||
// Run
|
||||
fn(ctx)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
@ -54,13 +52,11 @@ func (svc *Service) GoRun(fn func(context.Context)) bool {
|
|||
svc.wait.Unlock()
|
||||
|
||||
// ensure stopped
|
||||
svc.Stop()
|
||||
_ = svc.Stop()
|
||||
}()
|
||||
|
||||
// Run user func
|
||||
if fn != nil {
|
||||
fn(ctx)
|
||||
}
|
||||
// Run
|
||||
fn(ctx)
|
||||
}()
|
||||
|
||||
return true
|
||||
|
|
@ -70,14 +66,14 @@ func (svc *Service) GoRun(fn func(context.Context)) bool {
|
|||
// returns false if not running, and true only after Service is fully stopped.
|
||||
func (svc *Service) Stop() bool {
|
||||
// Attempt to stop the svc
|
||||
cncl, ok := svc.doStop()
|
||||
ctx, ok := svc.doStop()
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// Get svc lock
|
||||
svc.mu.Lock()
|
||||
svc.mutex.Lock()
|
||||
|
||||
// Wait until stopped
|
||||
svc.wait.Lock()
|
||||
|
|
@ -85,53 +81,65 @@ func (svc *Service) Stop() bool {
|
|||
|
||||
// Reset the svc
|
||||
svc.ctx = nil
|
||||
svc.cncl = nil
|
||||
svc.state = 0
|
||||
svc.mu.Unlock()
|
||||
svc.mutex.Unlock()
|
||||
}()
|
||||
|
||||
cncl() // cancel ctx
|
||||
// Cancel ctx
|
||||
close(ctx)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// While allows you to execute given function guaranteed within current
|
||||
// service state. Please note that this will hold the underlying service
|
||||
// state change mutex open while executing the function.
|
||||
func (svc *Service) While(fn func()) {
|
||||
// Protect state change
|
||||
svc.mutex.Lock()
|
||||
defer svc.mutex.Unlock()
|
||||
|
||||
// Run
|
||||
fn()
|
||||
}
|
||||
|
||||
// doStart will safely set Service state to started, returning a ptr to this context insance.
|
||||
func (svc *Service) doStart() (context.Context, bool) {
|
||||
func (svc *Service) doStart() (cancelctx, bool) {
|
||||
// Protect startup
|
||||
svc.mu.Lock()
|
||||
svc.mutex.Lock()
|
||||
|
||||
if svc.state != 0 /* not stopped */ {
|
||||
svc.mu.Unlock()
|
||||
svc.mutex.Unlock()
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// state started
|
||||
svc.state = 1
|
||||
|
||||
// Take our own ptr
|
||||
var ctx context.Context
|
||||
|
||||
if svc.ctx == nil {
|
||||
// Context required allocating
|
||||
svc.ctx, svc.cncl = ContextWithCancel()
|
||||
// this will only have been allocated
|
||||
// if svc.Done() was already called.
|
||||
svc.ctx = make(cancelctx)
|
||||
}
|
||||
|
||||
// Start the waiter
|
||||
svc.wait.Lock()
|
||||
|
||||
// Set our ptr + unlock
|
||||
ctx = svc.ctx
|
||||
svc.mu.Unlock()
|
||||
// Take our own ptr
|
||||
// and unlock state
|
||||
ctx := svc.ctx
|
||||
svc.mutex.Unlock()
|
||||
|
||||
return ctx, true
|
||||
}
|
||||
|
||||
// doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance.
|
||||
func (svc *Service) doStop() (context.CancelFunc, bool) {
|
||||
func (svc *Service) doStop() (cancelctx, bool) {
|
||||
// Protect stop
|
||||
svc.mu.Lock()
|
||||
svc.mutex.Lock()
|
||||
|
||||
if svc.state != 1 /* not started */ {
|
||||
svc.mu.Unlock()
|
||||
svc.mutex.Unlock()
|
||||
return nil, false
|
||||
}
|
||||
|
||||
|
|
@ -140,17 +148,17 @@ func (svc *Service) doStop() (context.CancelFunc, bool) {
|
|||
|
||||
// Take our own ptr
|
||||
// and unlock state
|
||||
cncl := svc.cncl
|
||||
svc.mu.Unlock()
|
||||
ctx := svc.ctx
|
||||
svc.mutex.Unlock()
|
||||
|
||||
return cncl, true
|
||||
return ctx, true
|
||||
}
|
||||
|
||||
// Running returns if Service is running (i.e. state NOT stopped / stopping).
|
||||
func (svc *Service) Running() bool {
|
||||
svc.mu.Lock()
|
||||
svc.mutex.Lock()
|
||||
state := svc.state
|
||||
svc.mu.Unlock()
|
||||
svc.mutex.Unlock()
|
||||
return (state == 1)
|
||||
}
|
||||
|
||||
|
|
@ -159,28 +167,27 @@ func (svc *Service) Running() bool {
|
|||
func (svc *Service) Done() <-chan struct{} {
|
||||
var done <-chan struct{}
|
||||
|
||||
svc.mu.Lock()
|
||||
svc.mutex.Lock()
|
||||
switch svc.state {
|
||||
// stopped
|
||||
// (here we create a new context so that the
|
||||
// returned 'done' channel here will still
|
||||
// be valid for when Service is next started)
|
||||
case 0:
|
||||
if svc.ctx == nil {
|
||||
// need to allocate new context
|
||||
svc.ctx, svc.cncl = ContextWithCancel()
|
||||
// here we create a new context so that the
|
||||
// returned 'done' channel here will still
|
||||
// be valid for when Service is next started.
|
||||
svc.ctx = make(cancelctx)
|
||||
}
|
||||
done = svc.ctx.Done()
|
||||
done = svc.ctx
|
||||
|
||||
// started
|
||||
case 1:
|
||||
done = svc.ctx.Done()
|
||||
done = svc.ctx
|
||||
|
||||
// stopping
|
||||
case 2:
|
||||
done = svc.ctx.Done()
|
||||
done = svc.ctx
|
||||
}
|
||||
svc.mu.Unlock()
|
||||
svc.mutex.Unlock()
|
||||
|
||||
return done
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue