When acquiring from a semaphore could proceed without contention, the previous behavior was to always do so, even when the provided context was done. This was the documented behavior, but it could lead to confusion. It isn't much more expensive to check the context error, so cancel acquisition if it's done. Fixes golang/go#63615. goos: linux goarch: amd64 pkg: golang.org/x/sync/semaphore cpu: 12th Gen Intel(R) Core(TM) i5-1235U │ old.bench │ new.bench │ │ sec/op │ sec/op vs base │ AcquireSeq/Weighted-acquire-1-1-1-12 26.45n ± 2% 27.25n ± 3% +3.04% (p=0.001 n=20) AcquireSeq/Weighted-acquire-2-1-1-12 26.96n ± 1% 27.12n ± 1% ~ (p=0.104 n=20) AcquireSeq/Weighted-acquire-16-1-1-12 26.07n ± 3% 27.48n ± 1% +5.45% (p=0.000 n=20) AcquireSeq/Weighted-acquire-128-1-1-12 26.19n ± 2% 27.24n ± 1% +4.01% (p=0.000 n=20) AcquireSeq/Weighted-acquire-2-2-1-12 25.61n ± 1% 25.99n ± 2% ~ (p=0.066 n=20) AcquireSeq/Weighted-acquire-16-2-8-12 209.6n ± 2% 211.0n ± 3% ~ (p=0.280 n=20) AcquireSeq/Weighted-acquire-128-2-64-12 1.669µ ± 1% 1.721µ ± 2% +3.09% (p=0.000 n=20) AcquireSeq/Weighted-acquire-2-1-2-12 51.08n ± 1% 53.03n ± 2% +3.82% (p=0.000 n=20) AcquireSeq/Weighted-acquire-16-8-2-12 52.48n ± 2% 53.66n ± 2% +2.26% (p=0.028 n=20) AcquireSeq/Weighted-acquire-128-64-2-12 52.27n ± 1% 53.71n ± 2% +2.75% (p=0.000 n=20) geomean 60.06n 61.69n +2.71% Change-Id: I0ae1a0bb6c027461ac1a9ee71c51efd8427ab308 Reviewed-on: https://go-review.googlesource.com/c/sync/+/536275 Auto-Submit: Bryan Mills <bcmills@google.com> Reviewed-by: Bryan Mills <bcmills@google.com> Reviewed-by: Michael Knyszek <mknyszek@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
160 lines
4.3 KiB
Go
160 lines
4.3 KiB
Go
// Copyright 2017 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
// Package semaphore provides a weighted semaphore implementation.
|
|
package semaphore // import "golang.org/x/sync/semaphore"
|
|
|
|
import (
|
|
"container/list"
|
|
"context"
|
|
"sync"
|
|
)
|
|
|
|
type waiter struct {
|
|
n int64
|
|
ready chan<- struct{} // Closed when semaphore acquired.
|
|
}
|
|
|
|
// NewWeighted creates a new weighted semaphore with the given
|
|
// maximum combined weight for concurrent access.
|
|
func NewWeighted(n int64) *Weighted {
|
|
w := &Weighted{size: n}
|
|
return w
|
|
}
|
|
|
|
// Weighted provides a way to bound concurrent access to a resource.
|
|
// The callers can request access with a given weight.
|
|
type Weighted struct {
|
|
size int64
|
|
cur int64
|
|
mu sync.Mutex
|
|
waiters list.List
|
|
}
|
|
|
|
// Acquire acquires the semaphore with a weight of n, blocking until resources
|
|
// are available or ctx is done. On success, returns nil. On failure, returns
|
|
// ctx.Err() and leaves the semaphore unchanged.
|
|
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
|
|
done := ctx.Done()
|
|
|
|
s.mu.Lock()
|
|
select {
|
|
case <-done:
|
|
// ctx becoming done has "happened before" acquiring the semaphore,
|
|
// whether it became done before the call began or while we were
|
|
// waiting for the mutex. We prefer to fail even if we could acquire
|
|
// the mutex without blocking.
|
|
s.mu.Unlock()
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
if s.size-s.cur >= n && s.waiters.Len() == 0 {
|
|
// Since we hold s.mu and haven't synchronized since checking done, if
|
|
// ctx becomes done before we return here, it becoming done must have
|
|
// "happened concurrently" with this call - it cannot "happen before"
|
|
// we return in this branch. So, we're ok to always acquire here.
|
|
s.cur += n
|
|
s.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
if n > s.size {
|
|
// Don't make other Acquire calls block on one that's doomed to fail.
|
|
s.mu.Unlock()
|
|
<-done
|
|
return ctx.Err()
|
|
}
|
|
|
|
ready := make(chan struct{})
|
|
w := waiter{n: n, ready: ready}
|
|
elem := s.waiters.PushBack(w)
|
|
s.mu.Unlock()
|
|
|
|
select {
|
|
case <-done:
|
|
s.mu.Lock()
|
|
select {
|
|
case <-ready:
|
|
// Acquired the semaphore after we were canceled.
|
|
// Pretend we didn't and put the tokens back.
|
|
s.cur -= n
|
|
s.notifyWaiters()
|
|
default:
|
|
isFront := s.waiters.Front() == elem
|
|
s.waiters.Remove(elem)
|
|
// If we're at the front and there're extra tokens left, notify other waiters.
|
|
if isFront && s.size > s.cur {
|
|
s.notifyWaiters()
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
return ctx.Err()
|
|
|
|
case <-ready:
|
|
// Acquired the semaphore. Check that ctx isn't already done.
|
|
// We check the done channel instead of calling ctx.Err because we
|
|
// already have the channel, and ctx.Err is O(n) with the nesting
|
|
// depth of ctx.
|
|
select {
|
|
case <-done:
|
|
s.Release(n)
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// TryAcquire acquires the semaphore with a weight of n without blocking.
|
|
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
|
|
func (s *Weighted) TryAcquire(n int64) bool {
|
|
s.mu.Lock()
|
|
success := s.size-s.cur >= n && s.waiters.Len() == 0
|
|
if success {
|
|
s.cur += n
|
|
}
|
|
s.mu.Unlock()
|
|
return success
|
|
}
|
|
|
|
// Release releases the semaphore with a weight of n.
|
|
func (s *Weighted) Release(n int64) {
|
|
s.mu.Lock()
|
|
s.cur -= n
|
|
if s.cur < 0 {
|
|
s.mu.Unlock()
|
|
panic("semaphore: released more than held")
|
|
}
|
|
s.notifyWaiters()
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *Weighted) notifyWaiters() {
|
|
for {
|
|
next := s.waiters.Front()
|
|
if next == nil {
|
|
break // No more waiters blocked.
|
|
}
|
|
|
|
w := next.Value.(waiter)
|
|
if s.size-s.cur < w.n {
|
|
// Not enough tokens for the next waiter. We could keep going (to try to
|
|
// find a waiter with a smaller request), but under load that could cause
|
|
// starvation for large requests; instead, we leave all remaining waiters
|
|
// blocked.
|
|
//
|
|
// Consider a semaphore used as a read-write lock, with N tokens, N
|
|
// readers, and one writer. Each reader can Acquire(1) to obtain a read
|
|
// lock. The writer can Acquire(N) to obtain a write lock, excluding all
|
|
// of the readers. If we allow the readers to jump ahead in the queue,
|
|
// the writer will starve — there is always one token available for every
|
|
// reader.
|
|
break
|
|
}
|
|
|
|
s.cur += w.n
|
|
s.waiters.Remove(next)
|
|
close(w.ready)
|
|
}
|
|
}
|