Compare commits
10 commits
fe3591bd8a
...
86842326fb
| Author | SHA1 | Date | |
|---|---|---|---|
| 86842326fb | |||
|
|
04914c200c | ||
|
|
7fad2c9213 | ||
|
|
8a14946fb0 | ||
|
|
1869c690bf | ||
|
|
d1ac909e84 | ||
|
|
506c70f973 | ||
|
|
396f3a06ea | ||
|
|
b637f27e40 | ||
|
|
960bf1fb13 |
21 changed files with 65 additions and 1962 deletions
|
|
@ -1,26 +0,0 @@
|
||||||
# Contributing to Go
|
|
||||||
|
|
||||||
Go is an open source project.
|
|
||||||
|
|
||||||
It is the work of hundreds of contributors. We appreciate your help!
|
|
||||||
|
|
||||||
## Filing issues
|
|
||||||
|
|
||||||
When [filing an issue](https://golang.org/issue/new), make sure to answer these five questions:
|
|
||||||
|
|
||||||
1. What version of Go are you using (`go version`)?
|
|
||||||
2. What operating system and processor architecture are you using?
|
|
||||||
3. What did you do?
|
|
||||||
4. What did you expect to see?
|
|
||||||
5. What did you see instead?
|
|
||||||
|
|
||||||
General questions should go to the [golang-nuts mailing list](https://groups.google.com/group/golang-nuts) instead of the issue tracker.
|
|
||||||
The gophers there will answer or ask you to file an issue if you've tripped over a bug.
|
|
||||||
|
|
||||||
## Contributing code
|
|
||||||
|
|
||||||
Please read the [Contribution Guidelines](https://golang.org/doc/contribute.html)
|
|
||||||
before sending patches.
|
|
||||||
|
|
||||||
Unless otherwise noted, the Go source files are distributed under
|
|
||||||
the BSD-style license found in the LICENSE file.
|
|
||||||
22
PATENTS
22
PATENTS
|
|
@ -1,22 +0,0 @@
|
||||||
Additional IP Rights Grant (Patents)
|
|
||||||
|
|
||||||
"This implementation" means the copyrightable works distributed by
|
|
||||||
Google as part of the Go project.
|
|
||||||
|
|
||||||
Google hereby grants to You a perpetual, worldwide, non-exclusive,
|
|
||||||
no-charge, royalty-free, irrevocable (except as stated in this section)
|
|
||||||
patent license to make, have made, use, offer to sell, sell, import,
|
|
||||||
transfer and otherwise run, modify and propagate the contents of this
|
|
||||||
implementation of Go, where such license applies only to those patent
|
|
||||||
claims, both currently owned or controlled by Google and acquired in
|
|
||||||
the future, licensable by Google that are necessarily infringed by this
|
|
||||||
implementation of Go. This grant does not include claims that would be
|
|
||||||
infringed only as a consequence of further modification of this
|
|
||||||
implementation. If you or your agent or exclusive licensee institute or
|
|
||||||
order or agree to the institution of patent litigation against any
|
|
||||||
entity (including a cross-claim or counterclaim in a lawsuit) alleging
|
|
||||||
that this implementation of Go or any code incorporated within this
|
|
||||||
implementation of Go constitutes direct or contributory patent
|
|
||||||
infringement, or inducement of patent infringement, then any patent
|
|
||||||
rights granted to you under this License for this implementation of Go
|
|
||||||
shall terminate as of the date such litigation is filed.
|
|
||||||
17
README.md
17
README.md
|
|
@ -1,17 +0,0 @@
|
||||||
# Go Sync
|
|
||||||
|
|
||||||
[](https://pkg.go.dev/golang.org/x/sync)
|
|
||||||
|
|
||||||
This repository provides Go concurrency primitives in addition to the
|
|
||||||
ones provided by the language and "sync" and "sync/atomic" packages.
|
|
||||||
|
|
||||||
## Report Issues / Send Patches
|
|
||||||
|
|
||||||
This repository uses Gerrit for code changes. To learn how to submit changes to
|
|
||||||
this repository, see https://go.dev/doc/contribute.
|
|
||||||
|
|
||||||
The git repository is https://go.googlesource.com/sync.
|
|
||||||
|
|
||||||
The main issue tracker for the sync repository is located at
|
|
||||||
https://go.dev/issues. Prefix your issue with "x/sync:" in the
|
|
||||||
subject line, so it is easy to find.
|
|
||||||
|
|
@ -1 +0,0 @@
|
||||||
issuerepo: golang/go
|
|
||||||
|
|
@ -18,7 +18,7 @@ import (
|
||||||
type token struct{}
|
type token struct{}
|
||||||
|
|
||||||
// A Group is a collection of goroutines working on subtasks that are part of
|
// A Group is a collection of goroutines working on subtasks that are part of
|
||||||
// the same overall task.
|
// the same overall task. A Group should not be reused for different tasks.
|
||||||
//
|
//
|
||||||
// A zero Group is valid, has no limit on the number of active goroutines,
|
// A zero Group is valid, has no limit on the number of active goroutines,
|
||||||
// and does not cancel on error.
|
// and does not cancel on error.
|
||||||
|
|
@ -46,7 +46,7 @@ func (g *Group) done() {
|
||||||
// returns a non-nil error or the first time Wait returns, whichever occurs
|
// returns a non-nil error or the first time Wait returns, whichever occurs
|
||||||
// first.
|
// first.
|
||||||
func WithContext(ctx context.Context) (*Group, context.Context) {
|
func WithContext(ctx context.Context) (*Group, context.Context) {
|
||||||
ctx, cancel := withCancelCause(ctx)
|
ctx, cancel := context.WithCancelCause(ctx)
|
||||||
return &Group{cancel: cancel}, ctx
|
return &Group{cancel: cancel}, ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -61,11 +61,14 @@ func (g *Group) Wait() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Go calls the given function in a new goroutine.
|
// Go calls the given function in a new goroutine.
|
||||||
// It blocks until the new goroutine can be added without the number of
|
|
||||||
// active goroutines in the group exceeding the configured limit.
|
|
||||||
//
|
//
|
||||||
// The first call to return a non-nil error cancels the group's context, if the
|
// The first call to Go must happen before a Wait.
|
||||||
// group was created by calling WithContext. The error will be returned by Wait.
|
// It blocks until the new goroutine can be added without the number of
|
||||||
|
// goroutines in the group exceeding the configured limit.
|
||||||
|
//
|
||||||
|
// The first goroutine in the group that returns a non-nil error will
|
||||||
|
// cancel the associated Context, if any. The error will be returned
|
||||||
|
// by Wait.
|
||||||
func (g *Group) Go(f func() error) {
|
func (g *Group) Go(f func() error) {
|
||||||
if g.sem != nil {
|
if g.sem != nil {
|
||||||
g.sem <- token{}
|
g.sem <- token{}
|
||||||
|
|
@ -75,6 +78,18 @@ func (g *Group) Go(f func() error) {
|
||||||
go func() {
|
go func() {
|
||||||
defer g.done()
|
defer g.done()
|
||||||
|
|
||||||
|
// It is tempting to propagate panics from f()
|
||||||
|
// up to the goroutine that calls Wait, but
|
||||||
|
// it creates more problems than it solves:
|
||||||
|
// - it delays panics arbitrarily,
|
||||||
|
// making bugs harder to detect;
|
||||||
|
// - it turns f's panic stack into a mere value,
|
||||||
|
// hiding it from crash-monitoring tools;
|
||||||
|
// - it risks deadlocks that hide the panic entirely,
|
||||||
|
// if f's panic leaves the program in a state
|
||||||
|
// that prevents the Wait call from being reached.
|
||||||
|
// See #53757, #74275, #74304, #74306.
|
||||||
|
|
||||||
if err := f(); err != nil {
|
if err := f(); err != nil {
|
||||||
g.errOnce.Do(func() {
|
g.errOnce.Do(func() {
|
||||||
g.err = err
|
g.err = err
|
||||||
|
|
@ -1,13 +0,0 @@
|
||||||
// Copyright 2023 The Go Authors. All rights reserved.
|
|
||||||
// Use of this source code is governed by a BSD-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
//go:build go1.20
|
|
||||||
|
|
||||||
package errgroup
|
|
||||||
|
|
||||||
import "context"
|
|
||||||
|
|
||||||
func withCancelCause(parent context.Context) (context.Context, func(error)) {
|
|
||||||
return context.WithCancelCause(parent)
|
|
||||||
}
|
|
||||||
|
|
@ -1,54 +0,0 @@
|
||||||
// Copyright 2023 The Go Authors. All rights reserved.
|
|
||||||
// Use of this source code is governed by a BSD-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
//go:build go1.20
|
|
||||||
|
|
||||||
package errgroup_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestCancelCause(t *testing.T) {
|
|
||||||
errDoom := errors.New("group_test: doomed")
|
|
||||||
|
|
||||||
cases := []struct {
|
|
||||||
errs []error
|
|
||||||
want error
|
|
||||||
}{
|
|
||||||
{want: nil},
|
|
||||||
{errs: []error{nil}, want: nil},
|
|
||||||
{errs: []error{errDoom}, want: errDoom},
|
|
||||||
{errs: []error{errDoom, nil}, want: errDoom},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range cases {
|
|
||||||
g, ctx := errgroup.WithContext(context.Background())
|
|
||||||
|
|
||||||
for _, err := range tc.errs {
|
|
||||||
err := err
|
|
||||||
g.TryGo(func() error { return err })
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := g.Wait(); err != tc.want {
|
|
||||||
t.Errorf("after %T.TryGo(func() error { return err }) for err in %v\n"+
|
|
||||||
"g.Wait() = %v; want %v",
|
|
||||||
g, tc.errs, err, tc.want)
|
|
||||||
}
|
|
||||||
|
|
||||||
if tc.want == nil {
|
|
||||||
tc.want = context.Canceled
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := context.Cause(ctx); err != tc.want {
|
|
||||||
t.Errorf("after %T.TryGo(func() error { return err }) for err in %v\n"+
|
|
||||||
"context.Cause(ctx) = %v; tc.want %v",
|
|
||||||
g, tc.errs, err, tc.want)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,14 +0,0 @@
|
||||||
// Copyright 2023 The Go Authors. All rights reserved.
|
|
||||||
// Use of this source code is governed by a BSD-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
//go:build !go1.20
|
|
||||||
|
|
||||||
package errgroup
|
|
||||||
|
|
||||||
import "context"
|
|
||||||
|
|
||||||
func withCancelCause(parent context.Context) (context.Context, func(error)) {
|
|
||||||
ctx, cancel := context.WithCancel(parent)
|
|
||||||
return ctx, func(error) { cancel() }
|
|
||||||
}
|
|
||||||
|
|
@ -8,12 +8,11 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
"codeberg.org/danjones000/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Pipeline demonstrates the use of a Group to implement a multi-stage
|
// Pipeline demonstrates the use of a Group to implement a multi-stage
|
||||||
|
|
@ -69,7 +68,7 @@ func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error)
|
||||||
for i := 0; i < numDigesters; i++ {
|
for i := 0; i < numDigesters; i++ {
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
for path := range paths {
|
for path := range paths {
|
||||||
data, err := ioutil.ReadFile(path)
|
data, err := os.ReadFile(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -14,7 +14,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
"codeberg.org/danjones000/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -250,6 +250,45 @@ func TestGoLimit(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCancelCause(t *testing.T) {
|
||||||
|
errDoom := errors.New("group_test: doomed")
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
errs []error
|
||||||
|
want error
|
||||||
|
}{
|
||||||
|
{want: nil},
|
||||||
|
{errs: []error{nil}, want: nil},
|
||||||
|
{errs: []error{errDoom}, want: errDoom},
|
||||||
|
{errs: []error{errDoom, nil}, want: errDoom},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
g, ctx := errgroup.WithContext(context.Background())
|
||||||
|
|
||||||
|
for _, err := range tc.errs {
|
||||||
|
err := err
|
||||||
|
g.TryGo(func() error { return err })
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := g.Wait(); err != tc.want {
|
||||||
|
t.Errorf("after %T.TryGo(func() error { return err }) for err in %v\n"+
|
||||||
|
"g.Wait() = %v; want %v",
|
||||||
|
g, tc.errs, err, tc.want)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.want == nil {
|
||||||
|
tc.want = context.Canceled
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := context.Cause(ctx); err != tc.want {
|
||||||
|
t.Errorf("after %T.TryGo(func() error { return err }) for err in %v\n"+
|
||||||
|
"context.Cause(ctx) = %v; tc.want %v",
|
||||||
|
g, tc.errs, err, tc.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkGo(b *testing.B) {
|
func BenchmarkGo(b *testing.B) {
|
||||||
fn := func() {}
|
fn := func() {}
|
||||||
g := &errgroup.Group{}
|
g := &errgroup.Group{}
|
||||||
4
go.mod
4
go.mod
|
|
@ -1,3 +1,3 @@
|
||||||
module golang.org/x/sync
|
module codeberg.org/danjones000/errgroup
|
||||||
|
|
||||||
go 1.18
|
go 1.25.0
|
||||||
|
|
|
||||||
|
|
@ -1,160 +0,0 @@
|
||||||
// Copyright 2017 The Go Authors. All rights reserved.
|
|
||||||
// Use of this source code is governed by a BSD-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
// Package semaphore provides a weighted semaphore implementation.
|
|
||||||
package semaphore // import "golang.org/x/sync/semaphore"
|
|
||||||
|
|
||||||
import (
|
|
||||||
"container/list"
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
type waiter struct {
|
|
||||||
n int64
|
|
||||||
ready chan<- struct{} // Closed when semaphore acquired.
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewWeighted creates a new weighted semaphore with the given
|
|
||||||
// maximum combined weight for concurrent access.
|
|
||||||
func NewWeighted(n int64) *Weighted {
|
|
||||||
w := &Weighted{size: n}
|
|
||||||
return w
|
|
||||||
}
|
|
||||||
|
|
||||||
// Weighted provides a way to bound concurrent access to a resource.
|
|
||||||
// The callers can request access with a given weight.
|
|
||||||
type Weighted struct {
|
|
||||||
size int64
|
|
||||||
cur int64
|
|
||||||
mu sync.Mutex
|
|
||||||
waiters list.List
|
|
||||||
}
|
|
||||||
|
|
||||||
// Acquire acquires the semaphore with a weight of n, blocking until resources
|
|
||||||
// are available or ctx is done. On success, returns nil. On failure, returns
|
|
||||||
// ctx.Err() and leaves the semaphore unchanged.
|
|
||||||
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
|
|
||||||
done := ctx.Done()
|
|
||||||
|
|
||||||
s.mu.Lock()
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
// ctx becoming done has "happened before" acquiring the semaphore,
|
|
||||||
// whether it became done before the call began or while we were
|
|
||||||
// waiting for the mutex. We prefer to fail even if we could acquire
|
|
||||||
// the mutex without blocking.
|
|
||||||
s.mu.Unlock()
|
|
||||||
return ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
if s.size-s.cur >= n && s.waiters.Len() == 0 {
|
|
||||||
// Since we hold s.mu and haven't synchronized since checking done, if
|
|
||||||
// ctx becomes done before we return here, it becoming done must have
|
|
||||||
// "happened concurrently" with this call - it cannot "happen before"
|
|
||||||
// we return in this branch. So, we're ok to always acquire here.
|
|
||||||
s.cur += n
|
|
||||||
s.mu.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if n > s.size {
|
|
||||||
// Don't make other Acquire calls block on one that's doomed to fail.
|
|
||||||
s.mu.Unlock()
|
|
||||||
<-done
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
ready := make(chan struct{})
|
|
||||||
w := waiter{n: n, ready: ready}
|
|
||||||
elem := s.waiters.PushBack(w)
|
|
||||||
s.mu.Unlock()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
s.mu.Lock()
|
|
||||||
select {
|
|
||||||
case <-ready:
|
|
||||||
// Acquired the semaphore after we were canceled.
|
|
||||||
// Pretend we didn't and put the tokens back.
|
|
||||||
s.cur -= n
|
|
||||||
s.notifyWaiters()
|
|
||||||
default:
|
|
||||||
isFront := s.waiters.Front() == elem
|
|
||||||
s.waiters.Remove(elem)
|
|
||||||
// If we're at the front and there're extra tokens left, notify other waiters.
|
|
||||||
if isFront && s.size > s.cur {
|
|
||||||
s.notifyWaiters()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
s.mu.Unlock()
|
|
||||||
return ctx.Err()
|
|
||||||
|
|
||||||
case <-ready:
|
|
||||||
// Acquired the semaphore. Check that ctx isn't already done.
|
|
||||||
// We check the done channel instead of calling ctx.Err because we
|
|
||||||
// already have the channel, and ctx.Err is O(n) with the nesting
|
|
||||||
// depth of ctx.
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
s.Release(n)
|
|
||||||
return ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TryAcquire acquires the semaphore with a weight of n without blocking.
|
|
||||||
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
|
|
||||||
func (s *Weighted) TryAcquire(n int64) bool {
|
|
||||||
s.mu.Lock()
|
|
||||||
success := s.size-s.cur >= n && s.waiters.Len() == 0
|
|
||||||
if success {
|
|
||||||
s.cur += n
|
|
||||||
}
|
|
||||||
s.mu.Unlock()
|
|
||||||
return success
|
|
||||||
}
|
|
||||||
|
|
||||||
// Release releases the semaphore with a weight of n.
|
|
||||||
func (s *Weighted) Release(n int64) {
|
|
||||||
s.mu.Lock()
|
|
||||||
s.cur -= n
|
|
||||||
if s.cur < 0 {
|
|
||||||
s.mu.Unlock()
|
|
||||||
panic("semaphore: released more than held")
|
|
||||||
}
|
|
||||||
s.notifyWaiters()
|
|
||||||
s.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Weighted) notifyWaiters() {
|
|
||||||
for {
|
|
||||||
next := s.waiters.Front()
|
|
||||||
if next == nil {
|
|
||||||
break // No more waiters blocked.
|
|
||||||
}
|
|
||||||
|
|
||||||
w := next.Value.(waiter)
|
|
||||||
if s.size-s.cur < w.n {
|
|
||||||
// Not enough tokens for the next waiter. We could keep going (to try to
|
|
||||||
// find a waiter with a smaller request), but under load that could cause
|
|
||||||
// starvation for large requests; instead, we leave all remaining waiters
|
|
||||||
// blocked.
|
|
||||||
//
|
|
||||||
// Consider a semaphore used as a read-write lock, with N tokens, N
|
|
||||||
// readers, and one writer. Each reader can Acquire(1) to obtain a read
|
|
||||||
// lock. The writer can Acquire(N) to obtain a write lock, excluding all
|
|
||||||
// of the readers. If we allow the readers to jump ahead in the queue,
|
|
||||||
// the writer will starve — there is always one token available for every
|
|
||||||
// reader.
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
s.cur += w.n
|
|
||||||
s.waiters.Remove(next)
|
|
||||||
close(w.ready)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,129 +0,0 @@
|
||||||
// Copyright 2017 The Go Authors. All rights reserved.
|
|
||||||
// Use of this source code is governed by a BSD-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
package semaphore_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"golang.org/x/sync/semaphore"
|
|
||||||
)
|
|
||||||
|
|
||||||
// weighted is an interface matching a subset of *Weighted. It allows
|
|
||||||
// alternate implementations for testing and benchmarking.
|
|
||||||
type weighted interface {
|
|
||||||
Acquire(context.Context, int64) error
|
|
||||||
TryAcquire(int64) bool
|
|
||||||
Release(int64)
|
|
||||||
}
|
|
||||||
|
|
||||||
// semChan implements Weighted using a channel for
|
|
||||||
// comparing against the condition variable-based implementation.
|
|
||||||
type semChan chan struct{}
|
|
||||||
|
|
||||||
func newSemChan(n int64) semChan {
|
|
||||||
return semChan(make(chan struct{}, n))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s semChan) Acquire(_ context.Context, n int64) error {
|
|
||||||
for i := int64(0); i < n; i++ {
|
|
||||||
s <- struct{}{}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s semChan) TryAcquire(n int64) bool {
|
|
||||||
if int64(len(s))+n > int64(cap(s)) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := int64(0); i < n; i++ {
|
|
||||||
s <- struct{}{}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s semChan) Release(n int64) {
|
|
||||||
for i := int64(0); i < n; i++ {
|
|
||||||
<-s
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// acquireN calls Acquire(size) on sem N times and then calls Release(size) N times.
|
|
||||||
func acquireN(b *testing.B, sem weighted, size int64, N int) {
|
|
||||||
b.ResetTimer()
|
|
||||||
for i := 0; i < b.N; i++ {
|
|
||||||
for j := 0; j < N; j++ {
|
|
||||||
sem.Acquire(context.Background(), size)
|
|
||||||
}
|
|
||||||
for j := 0; j < N; j++ {
|
|
||||||
sem.Release(size)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// tryAcquireN calls TryAcquire(size) on sem N times and then calls Release(size) N times.
|
|
||||||
func tryAcquireN(b *testing.B, sem weighted, size int64, N int) {
|
|
||||||
b.ResetTimer()
|
|
||||||
for i := 0; i < b.N; i++ {
|
|
||||||
for j := 0; j < N; j++ {
|
|
||||||
if !sem.TryAcquire(size) {
|
|
||||||
b.Fatalf("TryAcquire(%v) = false, want true", size)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for j := 0; j < N; j++ {
|
|
||||||
sem.Release(size)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func BenchmarkNewSeq(b *testing.B) {
|
|
||||||
for _, cap := range []int64{1, 128} {
|
|
||||||
b.Run(fmt.Sprintf("Weighted-%d", cap), func(b *testing.B) {
|
|
||||||
for i := 0; i < b.N; i++ {
|
|
||||||
_ = semaphore.NewWeighted(cap)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
b.Run(fmt.Sprintf("semChan-%d", cap), func(b *testing.B) {
|
|
||||||
for i := 0; i < b.N; i++ {
|
|
||||||
_ = newSemChan(cap)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func BenchmarkAcquireSeq(b *testing.B) {
|
|
||||||
for _, c := range []struct {
|
|
||||||
cap, size int64
|
|
||||||
N int
|
|
||||||
}{
|
|
||||||
{1, 1, 1},
|
|
||||||
{2, 1, 1},
|
|
||||||
{16, 1, 1},
|
|
||||||
{128, 1, 1},
|
|
||||||
{2, 2, 1},
|
|
||||||
{16, 2, 8},
|
|
||||||
{128, 2, 64},
|
|
||||||
{2, 1, 2},
|
|
||||||
{16, 8, 2},
|
|
||||||
{128, 64, 2},
|
|
||||||
} {
|
|
||||||
for _, w := range []struct {
|
|
||||||
name string
|
|
||||||
w weighted
|
|
||||||
}{
|
|
||||||
{"Weighted", semaphore.NewWeighted(c.cap)},
|
|
||||||
{"semChan", newSemChan(c.cap)},
|
|
||||||
} {
|
|
||||||
b.Run(fmt.Sprintf("%s-acquire-%d-%d-%d", w.name, c.cap, c.size, c.N), func(b *testing.B) {
|
|
||||||
acquireN(b, w.w, c.size, c.N)
|
|
||||||
})
|
|
||||||
b.Run(fmt.Sprintf("%s-tryAcquire-%d-%d-%d", w.name, c.cap, c.size, c.N), func(b *testing.B) {
|
|
||||||
tryAcquireN(b, w.w, c.size, c.N)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,84 +0,0 @@
|
||||||
// Copyright 2017 The Go Authors. All rights reserved.
|
|
||||||
// Use of this source code is governed by a BSD-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
package semaphore_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"runtime"
|
|
||||||
|
|
||||||
"golang.org/x/sync/semaphore"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Example_workerPool demonstrates how to use a semaphore to limit the number of
|
|
||||||
// goroutines working on parallel tasks.
|
|
||||||
//
|
|
||||||
// This use of a semaphore mimics a typical “worker pool” pattern, but without
|
|
||||||
// the need to explicitly shut down idle workers when the work is done.
|
|
||||||
func Example_workerPool() {
|
|
||||||
ctx := context.TODO()
|
|
||||||
|
|
||||||
var (
|
|
||||||
maxWorkers = runtime.GOMAXPROCS(0)
|
|
||||||
sem = semaphore.NewWeighted(int64(maxWorkers))
|
|
||||||
out = make([]int, 32)
|
|
||||||
)
|
|
||||||
|
|
||||||
// Compute the output using up to maxWorkers goroutines at a time.
|
|
||||||
for i := range out {
|
|
||||||
// When maxWorkers goroutines are in flight, Acquire blocks until one of the
|
|
||||||
// workers finishes.
|
|
||||||
if err := sem.Acquire(ctx, 1); err != nil {
|
|
||||||
log.Printf("Failed to acquire semaphore: %v", err)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
go func(i int) {
|
|
||||||
defer sem.Release(1)
|
|
||||||
out[i] = collatzSteps(i + 1)
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Acquire all of the tokens to wait for any remaining workers to finish.
|
|
||||||
//
|
|
||||||
// If you are already waiting for the workers by some other means (such as an
|
|
||||||
// errgroup.Group), you can omit this final Acquire call.
|
|
||||||
if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
|
|
||||||
log.Printf("Failed to acquire semaphore: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println(out)
|
|
||||||
|
|
||||||
// Output:
|
|
||||||
// [0 1 7 2 5 8 16 3 19 6 14 9 9 17 17 4 12 20 20 7 7 15 15 10 23 10 111 18 18 18 106 5]
|
|
||||||
}
|
|
||||||
|
|
||||||
// collatzSteps computes the number of steps to reach 1 under the Collatz
|
|
||||||
// conjecture. (See https://en.wikipedia.org/wiki/Collatz_conjecture.)
|
|
||||||
func collatzSteps(n int) (steps int) {
|
|
||||||
if n <= 0 {
|
|
||||||
panic("nonpositive input")
|
|
||||||
}
|
|
||||||
|
|
||||||
for ; n > 1; steps++ {
|
|
||||||
if steps < 0 {
|
|
||||||
panic("too many steps")
|
|
||||||
}
|
|
||||||
|
|
||||||
if n%2 == 0 {
|
|
||||||
n /= 2
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
const maxInt = int(^uint(0) >> 1)
|
|
||||||
if n > (maxInt-1)/3 {
|
|
||||||
panic("overflow")
|
|
||||||
}
|
|
||||||
n = 3*n + 1
|
|
||||||
}
|
|
||||||
|
|
||||||
return steps
|
|
||||||
}
|
|
||||||
|
|
@ -1,237 +0,0 @@
|
||||||
// Copyright 2017 The Go Authors. All rights reserved.
|
|
||||||
// Use of this source code is governed by a BSD-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
package semaphore_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"math/rand"
|
|
||||||
"runtime"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
"golang.org/x/sync/semaphore"
|
|
||||||
)
|
|
||||||
|
|
||||||
const maxSleep = 1 * time.Millisecond
|
|
||||||
|
|
||||||
func HammerWeighted(sem *semaphore.Weighted, n int64, loops int) {
|
|
||||||
for i := 0; i < loops; i++ {
|
|
||||||
sem.Acquire(context.Background(), n)
|
|
||||||
time.Sleep(time.Duration(rand.Int63n(int64(maxSleep/time.Nanosecond))) * time.Nanosecond)
|
|
||||||
sem.Release(n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWeighted(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
n := runtime.GOMAXPROCS(0)
|
|
||||||
loops := 10000 / n
|
|
||||||
sem := semaphore.NewWeighted(int64(n))
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(n)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
i := i
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
HammerWeighted(sem, int64(i), loops)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWeightedPanic(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
if recover() == nil {
|
|
||||||
t.Fatal("release of an unacquired weighted semaphore did not panic")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
w := semaphore.NewWeighted(1)
|
|
||||||
w.Release(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWeightedTryAcquire(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
sem := semaphore.NewWeighted(2)
|
|
||||||
tries := []bool{}
|
|
||||||
sem.Acquire(ctx, 1)
|
|
||||||
tries = append(tries, sem.TryAcquire(1))
|
|
||||||
tries = append(tries, sem.TryAcquire(1))
|
|
||||||
|
|
||||||
sem.Release(2)
|
|
||||||
|
|
||||||
tries = append(tries, sem.TryAcquire(1))
|
|
||||||
sem.Acquire(ctx, 1)
|
|
||||||
tries = append(tries, sem.TryAcquire(1))
|
|
||||||
|
|
||||||
want := []bool{true, false, true, false}
|
|
||||||
for i := range tries {
|
|
||||||
if tries[i] != want[i] {
|
|
||||||
t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWeightedAcquire(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
sem := semaphore.NewWeighted(2)
|
|
||||||
tryAcquire := func(n int64) bool {
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
|
|
||||||
defer cancel()
|
|
||||||
return sem.Acquire(ctx, n) == nil
|
|
||||||
}
|
|
||||||
|
|
||||||
tries := []bool{}
|
|
||||||
sem.Acquire(ctx, 1)
|
|
||||||
tries = append(tries, tryAcquire(1))
|
|
||||||
tries = append(tries, tryAcquire(1))
|
|
||||||
|
|
||||||
sem.Release(2)
|
|
||||||
|
|
||||||
tries = append(tries, tryAcquire(1))
|
|
||||||
sem.Acquire(ctx, 1)
|
|
||||||
tries = append(tries, tryAcquire(1))
|
|
||||||
|
|
||||||
want := []bool{true, false, true, false}
|
|
||||||
for i := range tries {
|
|
||||||
if tries[i] != want[i] {
|
|
||||||
t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWeightedDoesntBlockIfTooBig(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
const n = 2
|
|
||||||
sem := semaphore.NewWeighted(n)
|
|
||||||
{
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
go sem.Acquire(ctx, n+1)
|
|
||||||
}
|
|
||||||
|
|
||||||
g, ctx := errgroup.WithContext(context.Background())
|
|
||||||
for i := n * 3; i > 0; i-- {
|
|
||||||
g.Go(func() error {
|
|
||||||
err := sem.Acquire(ctx, 1)
|
|
||||||
if err == nil {
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
sem.Release(1)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if err := g.Wait(); err != nil {
|
|
||||||
t.Errorf("semaphore.NewWeighted(%v) failed to AcquireCtx(_, 1) with AcquireCtx(_, %v) pending", n, n+1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestLargeAcquireDoesntStarve times out if a large call to Acquire starves.
|
|
||||||
// Merely returning from the test function indicates success.
|
|
||||||
func TestLargeAcquireDoesntStarve(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
n := int64(runtime.GOMAXPROCS(0))
|
|
||||||
sem := semaphore.NewWeighted(n)
|
|
||||||
running := true
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(int(n))
|
|
||||||
for i := n; i > 0; i-- {
|
|
||||||
sem.Acquire(ctx, 1)
|
|
||||||
go func() {
|
|
||||||
defer func() {
|
|
||||||
sem.Release(1)
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
for running {
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
sem.Release(1)
|
|
||||||
sem.Acquire(ctx, 1)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
sem.Acquire(ctx, n)
|
|
||||||
running = false
|
|
||||||
sem.Release(n)
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// translated from https://github.com/zhiqiangxu/util/blob/master/mutex/crwmutex_test.go#L43
|
|
||||||
func TestAllocCancelDoesntStarve(t *testing.T) {
|
|
||||||
sem := semaphore.NewWeighted(10)
|
|
||||||
|
|
||||||
// Block off a portion of the semaphore so that Acquire(_, 10) can eventually succeed.
|
|
||||||
sem.Acquire(context.Background(), 1)
|
|
||||||
|
|
||||||
// In the background, Acquire(_, 10).
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
go func() {
|
|
||||||
sem.Acquire(ctx, 10)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Wait until the Acquire(_, 10) call blocks.
|
|
||||||
for sem.TryAcquire(1) {
|
|
||||||
sem.Release(1)
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now try to grab a read lock, and simultaneously unblock the Acquire(_, 10) call.
|
|
||||||
// Both Acquire calls should unblock and return, in either order.
|
|
||||||
go cancel()
|
|
||||||
|
|
||||||
err := sem.Acquire(context.Background(), 1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Acquire(_, 1) failed unexpectedly: %v", err)
|
|
||||||
}
|
|
||||||
sem.Release(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWeightedAcquireCanceled(t *testing.T) {
|
|
||||||
// https://go.dev/issue/63615
|
|
||||||
sem := semaphore.NewWeighted(2)
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
sem.Acquire(context.Background(), 1)
|
|
||||||
ch := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
// Synchronize with the Acquire(2) below.
|
|
||||||
for sem.TryAcquire(1) {
|
|
||||||
sem.Release(1)
|
|
||||||
}
|
|
||||||
// Now cancel ctx, and then release the token.
|
|
||||||
cancel()
|
|
||||||
sem.Release(1)
|
|
||||||
close(ch)
|
|
||||||
}()
|
|
||||||
// Since the context closing happens before enough tokens become available,
|
|
||||||
// this Acquire must fail.
|
|
||||||
if err := sem.Acquire(ctx, 2); err != context.Canceled {
|
|
||||||
t.Errorf("Acquire with canceled context returned wrong error: want context.Canceled, got %v", err)
|
|
||||||
}
|
|
||||||
// There must always be two tokens in the semaphore after the other
|
|
||||||
// goroutine releases the one we held at the start.
|
|
||||||
<-ch
|
|
||||||
if !sem.TryAcquire(2) {
|
|
||||||
t.Fatal("TryAcquire after canceled Acquire failed")
|
|
||||||
}
|
|
||||||
// Additionally verify that we don't acquire with a done context even when
|
|
||||||
// we wouldn't need to block to do so.
|
|
||||||
sem.Release(2)
|
|
||||||
if err := sem.Acquire(ctx, 1); err != context.Canceled {
|
|
||||||
t.Errorf("Acquire with canceled context returned wrong error: want context.Canceled, got %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,214 +0,0 @@
|
||||||
// Copyright 2013 The Go Authors. All rights reserved.
|
|
||||||
// Use of this source code is governed by a BSD-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
// Package singleflight provides a duplicate function call suppression
|
|
||||||
// mechanism.
|
|
||||||
package singleflight // import "golang.org/x/sync/singleflight"
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"runtime"
|
|
||||||
"runtime/debug"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
// errGoexit indicates the runtime.Goexit was called in
|
|
||||||
// the user given function.
|
|
||||||
var errGoexit = errors.New("runtime.Goexit was called")
|
|
||||||
|
|
||||||
// A panicError is an arbitrary value recovered from a panic
|
|
||||||
// with the stack trace during the execution of given function.
|
|
||||||
type panicError struct {
|
|
||||||
value interface{}
|
|
||||||
stack []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
// Error implements error interface.
|
|
||||||
func (p *panicError) Error() string {
|
|
||||||
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *panicError) Unwrap() error {
|
|
||||||
err, ok := p.value.(error)
|
|
||||||
if !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func newPanicError(v interface{}) error {
|
|
||||||
stack := debug.Stack()
|
|
||||||
|
|
||||||
// The first line of the stack trace is of the form "goroutine N [status]:"
|
|
||||||
// but by the time the panic reaches Do the goroutine may no longer exist
|
|
||||||
// and its status will have changed. Trim out the misleading line.
|
|
||||||
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
|
|
||||||
stack = stack[line+1:]
|
|
||||||
}
|
|
||||||
return &panicError{value: v, stack: stack}
|
|
||||||
}
|
|
||||||
|
|
||||||
// call is an in-flight or completed singleflight.Do call
|
|
||||||
type call struct {
|
|
||||||
wg sync.WaitGroup
|
|
||||||
|
|
||||||
// These fields are written once before the WaitGroup is done
|
|
||||||
// and are only read after the WaitGroup is done.
|
|
||||||
val interface{}
|
|
||||||
err error
|
|
||||||
|
|
||||||
// These fields are read and written with the singleflight
|
|
||||||
// mutex held before the WaitGroup is done, and are read but
|
|
||||||
// not written after the WaitGroup is done.
|
|
||||||
dups int
|
|
||||||
chans []chan<- Result
|
|
||||||
}
|
|
||||||
|
|
||||||
// Group represents a class of work and forms a namespace in
|
|
||||||
// which units of work can be executed with duplicate suppression.
|
|
||||||
type Group struct {
|
|
||||||
mu sync.Mutex // protects m
|
|
||||||
m map[string]*call // lazily initialized
|
|
||||||
}
|
|
||||||
|
|
||||||
// Result holds the results of Do, so they can be passed
|
|
||||||
// on a channel.
|
|
||||||
type Result struct {
|
|
||||||
Val interface{}
|
|
||||||
Err error
|
|
||||||
Shared bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do executes and returns the results of the given function, making
|
|
||||||
// sure that only one execution is in-flight for a given key at a
|
|
||||||
// time. If a duplicate comes in, the duplicate caller waits for the
|
|
||||||
// original to complete and receives the same results.
|
|
||||||
// The return value shared indicates whether v was given to multiple callers.
|
|
||||||
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
|
|
||||||
g.mu.Lock()
|
|
||||||
if g.m == nil {
|
|
||||||
g.m = make(map[string]*call)
|
|
||||||
}
|
|
||||||
if c, ok := g.m[key]; ok {
|
|
||||||
c.dups++
|
|
||||||
g.mu.Unlock()
|
|
||||||
c.wg.Wait()
|
|
||||||
|
|
||||||
if e, ok := c.err.(*panicError); ok {
|
|
||||||
panic(e)
|
|
||||||
} else if c.err == errGoexit {
|
|
||||||
runtime.Goexit()
|
|
||||||
}
|
|
||||||
return c.val, c.err, true
|
|
||||||
}
|
|
||||||
c := new(call)
|
|
||||||
c.wg.Add(1)
|
|
||||||
g.m[key] = c
|
|
||||||
g.mu.Unlock()
|
|
||||||
|
|
||||||
g.doCall(c, key, fn)
|
|
||||||
return c.val, c.err, c.dups > 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// DoChan is like Do but returns a channel that will receive the
|
|
||||||
// results when they are ready.
|
|
||||||
//
|
|
||||||
// The returned channel will not be closed.
|
|
||||||
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
|
|
||||||
ch := make(chan Result, 1)
|
|
||||||
g.mu.Lock()
|
|
||||||
if g.m == nil {
|
|
||||||
g.m = make(map[string]*call)
|
|
||||||
}
|
|
||||||
if c, ok := g.m[key]; ok {
|
|
||||||
c.dups++
|
|
||||||
c.chans = append(c.chans, ch)
|
|
||||||
g.mu.Unlock()
|
|
||||||
return ch
|
|
||||||
}
|
|
||||||
c := &call{chans: []chan<- Result{ch}}
|
|
||||||
c.wg.Add(1)
|
|
||||||
g.m[key] = c
|
|
||||||
g.mu.Unlock()
|
|
||||||
|
|
||||||
go g.doCall(c, key, fn)
|
|
||||||
|
|
||||||
return ch
|
|
||||||
}
|
|
||||||
|
|
||||||
// doCall handles the single call for a key.
|
|
||||||
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
|
|
||||||
normalReturn := false
|
|
||||||
recovered := false
|
|
||||||
|
|
||||||
// use double-defer to distinguish panic from runtime.Goexit,
|
|
||||||
// more details see https://golang.org/cl/134395
|
|
||||||
defer func() {
|
|
||||||
// the given function invoked runtime.Goexit
|
|
||||||
if !normalReturn && !recovered {
|
|
||||||
c.err = errGoexit
|
|
||||||
}
|
|
||||||
|
|
||||||
g.mu.Lock()
|
|
||||||
defer g.mu.Unlock()
|
|
||||||
c.wg.Done()
|
|
||||||
if g.m[key] == c {
|
|
||||||
delete(g.m, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, ok := c.err.(*panicError); ok {
|
|
||||||
// In order to prevent the waiting channels from being blocked forever,
|
|
||||||
// needs to ensure that this panic cannot be recovered.
|
|
||||||
if len(c.chans) > 0 {
|
|
||||||
go panic(e)
|
|
||||||
select {} // Keep this goroutine around so that it will appear in the crash dump.
|
|
||||||
} else {
|
|
||||||
panic(e)
|
|
||||||
}
|
|
||||||
} else if c.err == errGoexit {
|
|
||||||
// Already in the process of goexit, no need to call again
|
|
||||||
} else {
|
|
||||||
// Normal return
|
|
||||||
for _, ch := range c.chans {
|
|
||||||
ch <- Result{c.val, c.err, c.dups > 0}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
func() {
|
|
||||||
defer func() {
|
|
||||||
if !normalReturn {
|
|
||||||
// Ideally, we would wait to take a stack trace until we've determined
|
|
||||||
// whether this is a panic or a runtime.Goexit.
|
|
||||||
//
|
|
||||||
// Unfortunately, the only way we can distinguish the two is to see
|
|
||||||
// whether the recover stopped the goroutine from terminating, and by
|
|
||||||
// the time we know that, the part of the stack trace relevant to the
|
|
||||||
// panic has been discarded.
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
c.err = newPanicError(r)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
c.val, c.err = fn()
|
|
||||||
normalReturn = true
|
|
||||||
}()
|
|
||||||
|
|
||||||
if !normalReturn {
|
|
||||||
recovered = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Forget tells the singleflight to forget about a key. Future calls
|
|
||||||
// to Do for this key will call the function rather than waiting for
|
|
||||||
// an earlier call to complete.
|
|
||||||
func (g *Group) Forget(key string) {
|
|
||||||
g.mu.Lock()
|
|
||||||
delete(g.m, key)
|
|
||||||
g.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
@ -1,422 +0,0 @@
|
||||||
// Copyright 2013 The Go Authors. All rights reserved.
|
|
||||||
// Use of this source code is governed by a BSD-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
package singleflight
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"os/exec"
|
|
||||||
"runtime"
|
|
||||||
"runtime/debug"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type errValue struct{}
|
|
||||||
|
|
||||||
func (err *errValue) Error() string {
|
|
||||||
return "error value"
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPanicErrorUnwrap(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
testCases := []struct {
|
|
||||||
name string
|
|
||||||
panicValue interface{}
|
|
||||||
wrappedErrorType bool
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "panicError wraps non-error type",
|
|
||||||
panicValue: &panicError{value: "string value"},
|
|
||||||
wrappedErrorType: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "panicError wraps error type",
|
|
||||||
panicValue: &panicError{value: new(errValue)},
|
|
||||||
wrappedErrorType: false,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
tc := tc
|
|
||||||
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
var recovered interface{}
|
|
||||||
|
|
||||||
group := &Group{}
|
|
||||||
|
|
||||||
func() {
|
|
||||||
defer func() {
|
|
||||||
recovered = recover()
|
|
||||||
t.Logf("after panic(%#v) in group.Do, recovered %#v", tc.panicValue, recovered)
|
|
||||||
}()
|
|
||||||
|
|
||||||
_, _, _ = group.Do(tc.name, func() (interface{}, error) {
|
|
||||||
panic(tc.panicValue)
|
|
||||||
})
|
|
||||||
}()
|
|
||||||
|
|
||||||
if recovered == nil {
|
|
||||||
t.Fatal("expected a non-nil panic value")
|
|
||||||
}
|
|
||||||
|
|
||||||
err, ok := recovered.(error)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("recovered non-error type: %T", recovered)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !errors.Is(err, new(errValue)) && tc.wrappedErrorType {
|
|
||||||
t.Errorf("unexpected wrapped error type %T; want %T", err, new(errValue))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDo(t *testing.T) {
|
|
||||||
var g Group
|
|
||||||
v, err, _ := g.Do("key", func() (interface{}, error) {
|
|
||||||
return "bar", nil
|
|
||||||
})
|
|
||||||
if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
|
|
||||||
t.Errorf("Do = %v; want %v", got, want)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Do error = %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDoErr(t *testing.T) {
|
|
||||||
var g Group
|
|
||||||
someErr := errors.New("Some error")
|
|
||||||
v, err, _ := g.Do("key", func() (interface{}, error) {
|
|
||||||
return nil, someErr
|
|
||||||
})
|
|
||||||
if err != someErr {
|
|
||||||
t.Errorf("Do error = %v; want someErr %v", err, someErr)
|
|
||||||
}
|
|
||||||
if v != nil {
|
|
||||||
t.Errorf("unexpected non-nil value %#v", v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDoDupSuppress(t *testing.T) {
|
|
||||||
var g Group
|
|
||||||
var wg1, wg2 sync.WaitGroup
|
|
||||||
c := make(chan string, 1)
|
|
||||||
var calls int32
|
|
||||||
fn := func() (interface{}, error) {
|
|
||||||
if atomic.AddInt32(&calls, 1) == 1 {
|
|
||||||
// First invocation.
|
|
||||||
wg1.Done()
|
|
||||||
}
|
|
||||||
v := <-c
|
|
||||||
c <- v // pump; make available for any future calls
|
|
||||||
|
|
||||||
time.Sleep(10 * time.Millisecond) // let more goroutines enter Do
|
|
||||||
|
|
||||||
return v, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
const n = 10
|
|
||||||
wg1.Add(1)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
wg1.Add(1)
|
|
||||||
wg2.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg2.Done()
|
|
||||||
wg1.Done()
|
|
||||||
v, err, _ := g.Do("key", fn)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Do error: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s, _ := v.(string); s != "bar" {
|
|
||||||
t.Errorf("Do = %T %v; want %q", v, v, "bar")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
wg1.Wait()
|
|
||||||
// At least one goroutine is in fn now and all of them have at
|
|
||||||
// least reached the line before the Do.
|
|
||||||
c <- "bar"
|
|
||||||
wg2.Wait()
|
|
||||||
if got := atomic.LoadInt32(&calls); got <= 0 || got >= n {
|
|
||||||
t.Errorf("number of calls = %d; want over 0 and less than %d", got, n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that singleflight behaves correctly after Forget called.
|
|
||||||
// See https://github.com/golang/go/issues/31420
|
|
||||||
func TestForget(t *testing.T) {
|
|
||||||
var g Group
|
|
||||||
|
|
||||||
var (
|
|
||||||
firstStarted = make(chan struct{})
|
|
||||||
unblockFirst = make(chan struct{})
|
|
||||||
firstFinished = make(chan struct{})
|
|
||||||
)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
g.Do("key", func() (i interface{}, e error) {
|
|
||||||
close(firstStarted)
|
|
||||||
<-unblockFirst
|
|
||||||
close(firstFinished)
|
|
||||||
return
|
|
||||||
})
|
|
||||||
}()
|
|
||||||
<-firstStarted
|
|
||||||
g.Forget("key")
|
|
||||||
|
|
||||||
unblockSecond := make(chan struct{})
|
|
||||||
secondResult := g.DoChan("key", func() (i interface{}, e error) {
|
|
||||||
<-unblockSecond
|
|
||||||
return 2, nil
|
|
||||||
})
|
|
||||||
|
|
||||||
close(unblockFirst)
|
|
||||||
<-firstFinished
|
|
||||||
|
|
||||||
thirdResult := g.DoChan("key", func() (i interface{}, e error) {
|
|
||||||
return 3, nil
|
|
||||||
})
|
|
||||||
|
|
||||||
close(unblockSecond)
|
|
||||||
<-secondResult
|
|
||||||
r := <-thirdResult
|
|
||||||
if r.Val != 2 {
|
|
||||||
t.Errorf("We should receive result produced by second call, expected: 2, got %d", r.Val)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDoChan(t *testing.T) {
|
|
||||||
var g Group
|
|
||||||
ch := g.DoChan("key", func() (interface{}, error) {
|
|
||||||
return "bar", nil
|
|
||||||
})
|
|
||||||
|
|
||||||
res := <-ch
|
|
||||||
v := res.Val
|
|
||||||
err := res.Err
|
|
||||||
if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
|
|
||||||
t.Errorf("Do = %v; want %v", got, want)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Do error = %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test singleflight behaves correctly after Do panic.
|
|
||||||
// See https://github.com/golang/go/issues/41133
|
|
||||||
func TestPanicDo(t *testing.T) {
|
|
||||||
var g Group
|
|
||||||
fn := func() (interface{}, error) {
|
|
||||||
panic("invalid memory address or nil pointer dereference")
|
|
||||||
}
|
|
||||||
|
|
||||||
const n = 5
|
|
||||||
waited := int32(n)
|
|
||||||
panicCount := int32(0)
|
|
||||||
done := make(chan struct{})
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
go func() {
|
|
||||||
defer func() {
|
|
||||||
if err := recover(); err != nil {
|
|
||||||
t.Logf("Got panic: %v\n%s", err, debug.Stack())
|
|
||||||
atomic.AddInt32(&panicCount, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
if atomic.AddInt32(&waited, -1) == 0 {
|
|
||||||
close(done)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
g.Do("key", fn)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
if panicCount != n {
|
|
||||||
t.Errorf("Expect %d panic, but got %d", n, panicCount)
|
|
||||||
}
|
|
||||||
case <-time.After(time.Second):
|
|
||||||
t.Fatalf("Do hangs")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGoexitDo(t *testing.T) {
|
|
||||||
var g Group
|
|
||||||
fn := func() (interface{}, error) {
|
|
||||||
runtime.Goexit()
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
const n = 5
|
|
||||||
waited := int32(n)
|
|
||||||
done := make(chan struct{})
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
go func() {
|
|
||||||
var err error
|
|
||||||
defer func() {
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Error should be nil, but got: %v", err)
|
|
||||||
}
|
|
||||||
if atomic.AddInt32(&waited, -1) == 0 {
|
|
||||||
close(done)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
_, err, _ = g.Do("key", fn)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
case <-time.After(time.Second):
|
|
||||||
t.Fatalf("Do hangs")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func executable(t testing.TB) string {
|
|
||||||
exe, err := os.Executable()
|
|
||||||
if err != nil {
|
|
||||||
t.Skipf("skipping: test executable not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Control case: check whether exec.Command works at all.
|
|
||||||
// (For example, it might fail with a permission error on iOS.)
|
|
||||||
cmd := exec.Command(exe, "-test.list=^$")
|
|
||||||
cmd.Env = []string{}
|
|
||||||
if err := cmd.Run(); err != nil {
|
|
||||||
t.Skipf("skipping: exec appears not to work on %s: %v", runtime.GOOS, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return exe
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPanicDoChan(t *testing.T) {
|
|
||||||
if os.Getenv("TEST_PANIC_DOCHAN") != "" {
|
|
||||||
defer func() {
|
|
||||||
recover()
|
|
||||||
}()
|
|
||||||
|
|
||||||
g := new(Group)
|
|
||||||
ch := g.DoChan("", func() (interface{}, error) {
|
|
||||||
panic("Panicking in DoChan")
|
|
||||||
})
|
|
||||||
<-ch
|
|
||||||
t.Fatalf("DoChan unexpectedly returned")
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
cmd := exec.Command(executable(t), "-test.run="+t.Name(), "-test.v")
|
|
||||||
cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1")
|
|
||||||
out := new(bytes.Buffer)
|
|
||||||
cmd.Stdout = out
|
|
||||||
cmd.Stderr = out
|
|
||||||
if err := cmd.Start(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := cmd.Wait()
|
|
||||||
t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out)
|
|
||||||
if err == nil {
|
|
||||||
t.Errorf("Test subprocess passed; want a crash due to panic in DoChan")
|
|
||||||
}
|
|
||||||
if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) {
|
|
||||||
t.Errorf("Test subprocess failed with an unexpected failure mode.")
|
|
||||||
}
|
|
||||||
if !bytes.Contains(out.Bytes(), []byte("Panicking in DoChan")) {
|
|
||||||
t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in DoChan")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPanicDoSharedByDoChan(t *testing.T) {
|
|
||||||
if os.Getenv("TEST_PANIC_DOCHAN") != "" {
|
|
||||||
blocked := make(chan struct{})
|
|
||||||
unblock := make(chan struct{})
|
|
||||||
|
|
||||||
g := new(Group)
|
|
||||||
go func() {
|
|
||||||
defer func() {
|
|
||||||
recover()
|
|
||||||
}()
|
|
||||||
g.Do("", func() (interface{}, error) {
|
|
||||||
close(blocked)
|
|
||||||
<-unblock
|
|
||||||
panic("Panicking in Do")
|
|
||||||
})
|
|
||||||
}()
|
|
||||||
|
|
||||||
<-blocked
|
|
||||||
ch := g.DoChan("", func() (interface{}, error) {
|
|
||||||
panic("DoChan unexpectedly executed callback")
|
|
||||||
})
|
|
||||||
close(unblock)
|
|
||||||
<-ch
|
|
||||||
t.Fatalf("DoChan unexpectedly returned")
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
cmd := exec.Command(executable(t), "-test.run="+t.Name(), "-test.v")
|
|
||||||
cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1")
|
|
||||||
out := new(bytes.Buffer)
|
|
||||||
cmd.Stdout = out
|
|
||||||
cmd.Stderr = out
|
|
||||||
if err := cmd.Start(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := cmd.Wait()
|
|
||||||
t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out)
|
|
||||||
if err == nil {
|
|
||||||
t.Errorf("Test subprocess passed; want a crash due to panic in Do shared by DoChan")
|
|
||||||
}
|
|
||||||
if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) {
|
|
||||||
t.Errorf("Test subprocess failed with an unexpected failure mode.")
|
|
||||||
}
|
|
||||||
if !bytes.Contains(out.Bytes(), []byte("Panicking in Do")) {
|
|
||||||
t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in Do")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func ExampleGroup() {
|
|
||||||
g := new(Group)
|
|
||||||
|
|
||||||
block := make(chan struct{})
|
|
||||||
res1c := g.DoChan("key", func() (interface{}, error) {
|
|
||||||
<-block
|
|
||||||
return "func 1", nil
|
|
||||||
})
|
|
||||||
res2c := g.DoChan("key", func() (interface{}, error) {
|
|
||||||
<-block
|
|
||||||
return "func 2", nil
|
|
||||||
})
|
|
||||||
close(block)
|
|
||||||
|
|
||||||
res1 := <-res1c
|
|
||||||
res2 := <-res2c
|
|
||||||
|
|
||||||
// Results are shared by functions executed with duplicate keys.
|
|
||||||
fmt.Println("Shared:", res2.Shared)
|
|
||||||
// Only the first function is executed: it is registered and started with "key",
|
|
||||||
// and doesn't complete before the second function is registered with a duplicate key.
|
|
||||||
fmt.Println("Equal results:", res1.Val.(string) == res2.Val.(string))
|
|
||||||
fmt.Println("Result:", res1.Val)
|
|
||||||
|
|
||||||
// Output:
|
|
||||||
// Shared: true
|
|
||||||
// Equal results: true
|
|
||||||
// Result: func 1
|
|
||||||
}
|
|
||||||
|
|
@ -1,18 +0,0 @@
|
||||||
// Copyright 2019 The Go Authors. All rights reserved.
|
|
||||||
// Use of this source code is governed by a BSD-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
// Package syncmap provides a concurrent map implementation.
|
|
||||||
// This was the prototype for sync.Map which was added to the standard library's
|
|
||||||
// sync package in Go 1.9. https://golang.org/pkg/sync/#Map.
|
|
||||||
package syncmap
|
|
||||||
|
|
||||||
import "sync" // home to the standard library's sync.map implementation as of Go 1.9
|
|
||||||
|
|
||||||
// Map is a concurrent map with amortized-constant-time loads, stores, and deletes.
|
|
||||||
// It is safe for multiple goroutines to call a Map's methods concurrently.
|
|
||||||
//
|
|
||||||
// The zero Map is valid and empty.
|
|
||||||
//
|
|
||||||
// A Map must not be copied after first use.
|
|
||||||
type Map = sync.Map
|
|
||||||
|
|
@ -1,216 +0,0 @@
|
||||||
// Copyright 2016 The Go Authors. All rights reserved.
|
|
||||||
// Use of this source code is governed by a BSD-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
package syncmap_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"reflect"
|
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"golang.org/x/sync/syncmap"
|
|
||||||
)
|
|
||||||
|
|
||||||
type bench struct {
|
|
||||||
setup func(*testing.B, mapInterface)
|
|
||||||
perG func(b *testing.B, pb *testing.PB, i int, m mapInterface)
|
|
||||||
}
|
|
||||||
|
|
||||||
func benchMap(b *testing.B, bench bench) {
|
|
||||||
for _, m := range [...]mapInterface{&DeepCopyMap{}, &RWMutexMap{}, &syncmap.Map{}} {
|
|
||||||
b.Run(fmt.Sprintf("%T", m), func(b *testing.B) {
|
|
||||||
m = reflect.New(reflect.TypeOf(m).Elem()).Interface().(mapInterface)
|
|
||||||
if bench.setup != nil {
|
|
||||||
bench.setup(b, m)
|
|
||||||
}
|
|
||||||
|
|
||||||
b.ResetTimer()
|
|
||||||
|
|
||||||
var i int64
|
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
|
||||||
id := int(atomic.AddInt64(&i, 1) - 1)
|
|
||||||
bench.perG(b, pb, id*b.N, m)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func BenchmarkLoadMostlyHits(b *testing.B) {
|
|
||||||
const hits, misses = 1023, 1
|
|
||||||
|
|
||||||
benchMap(b, bench{
|
|
||||||
setup: func(_ *testing.B, m mapInterface) {
|
|
||||||
for i := 0; i < hits; i++ {
|
|
||||||
m.LoadOrStore(i, i)
|
|
||||||
}
|
|
||||||
// Prime the map to get it into a steady state.
|
|
||||||
for i := 0; i < hits*2; i++ {
|
|
||||||
m.Load(i % hits)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
|
|
||||||
for ; pb.Next(); i++ {
|
|
||||||
m.Load(i % (hits + misses))
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func BenchmarkLoadMostlyMisses(b *testing.B) {
|
|
||||||
const hits, misses = 1, 1023
|
|
||||||
|
|
||||||
benchMap(b, bench{
|
|
||||||
setup: func(_ *testing.B, m mapInterface) {
|
|
||||||
for i := 0; i < hits; i++ {
|
|
||||||
m.LoadOrStore(i, i)
|
|
||||||
}
|
|
||||||
// Prime the map to get it into a steady state.
|
|
||||||
for i := 0; i < hits*2; i++ {
|
|
||||||
m.Load(i % hits)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
|
|
||||||
for ; pb.Next(); i++ {
|
|
||||||
m.Load(i % (hits + misses))
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func BenchmarkLoadOrStoreBalanced(b *testing.B) {
|
|
||||||
const hits, misses = 128, 128
|
|
||||||
|
|
||||||
benchMap(b, bench{
|
|
||||||
setup: func(b *testing.B, m mapInterface) {
|
|
||||||
if _, ok := m.(*DeepCopyMap); ok {
|
|
||||||
b.Skip("DeepCopyMap has quadratic running time.")
|
|
||||||
}
|
|
||||||
for i := 0; i < hits; i++ {
|
|
||||||
m.LoadOrStore(i, i)
|
|
||||||
}
|
|
||||||
// Prime the map to get it into a steady state.
|
|
||||||
for i := 0; i < hits*2; i++ {
|
|
||||||
m.Load(i % hits)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
|
|
||||||
for ; pb.Next(); i++ {
|
|
||||||
j := i % (hits + misses)
|
|
||||||
if j < hits {
|
|
||||||
if _, ok := m.LoadOrStore(j, i); !ok {
|
|
||||||
b.Fatalf("unexpected miss for %v", j)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if v, loaded := m.LoadOrStore(i, i); loaded {
|
|
||||||
b.Fatalf("failed to store %v: existing value %v", i, v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func BenchmarkLoadOrStoreUnique(b *testing.B) {
|
|
||||||
benchMap(b, bench{
|
|
||||||
setup: func(b *testing.B, m mapInterface) {
|
|
||||||
if _, ok := m.(*DeepCopyMap); ok {
|
|
||||||
b.Skip("DeepCopyMap has quadratic running time.")
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
|
|
||||||
for ; pb.Next(); i++ {
|
|
||||||
m.LoadOrStore(i, i)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func BenchmarkLoadOrStoreCollision(b *testing.B) {
|
|
||||||
benchMap(b, bench{
|
|
||||||
setup: func(_ *testing.B, m mapInterface) {
|
|
||||||
m.LoadOrStore(0, 0)
|
|
||||||
},
|
|
||||||
|
|
||||||
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
|
|
||||||
for ; pb.Next(); i++ {
|
|
||||||
m.LoadOrStore(0, 0)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func BenchmarkRange(b *testing.B) {
|
|
||||||
const mapSize = 1 << 10
|
|
||||||
|
|
||||||
benchMap(b, bench{
|
|
||||||
setup: func(_ *testing.B, m mapInterface) {
|
|
||||||
for i := 0; i < mapSize; i++ {
|
|
||||||
m.Store(i, i)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
|
|
||||||
for ; pb.Next(); i++ {
|
|
||||||
m.Range(func(_, _ interface{}) bool { return true })
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// BenchmarkAdversarialAlloc tests performance when we store a new value
|
|
||||||
// immediately whenever the map is promoted to clean and otherwise load a
|
|
||||||
// unique, missing key.
|
|
||||||
//
|
|
||||||
// This forces the Load calls to always acquire the map's mutex.
|
|
||||||
func BenchmarkAdversarialAlloc(b *testing.B) {
|
|
||||||
benchMap(b, bench{
|
|
||||||
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
|
|
||||||
var stores, loadsSinceStore int64
|
|
||||||
for ; pb.Next(); i++ {
|
|
||||||
m.Load(i)
|
|
||||||
if loadsSinceStore++; loadsSinceStore > stores {
|
|
||||||
m.LoadOrStore(i, stores)
|
|
||||||
loadsSinceStore = 0
|
|
||||||
stores++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// BenchmarkAdversarialDelete tests performance when we periodically delete
|
|
||||||
// one key and add a different one in a large map.
|
|
||||||
//
|
|
||||||
// This forces the Load calls to always acquire the map's mutex and periodically
|
|
||||||
// makes a full copy of the map despite changing only one entry.
|
|
||||||
func BenchmarkAdversarialDelete(b *testing.B) {
|
|
||||||
const mapSize = 1 << 10
|
|
||||||
|
|
||||||
benchMap(b, bench{
|
|
||||||
setup: func(_ *testing.B, m mapInterface) {
|
|
||||||
for i := 0; i < mapSize; i++ {
|
|
||||||
m.Store(i, i)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
|
|
||||||
for ; pb.Next(); i++ {
|
|
||||||
m.Load(i)
|
|
||||||
|
|
||||||
if i%mapSize == 0 {
|
|
||||||
m.Range(func(k, _ interface{}) bool {
|
|
||||||
m.Delete(k)
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
m.Store(i, i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
@ -1,151 +0,0 @@
|
||||||
// Copyright 2016 The Go Authors. All rights reserved.
|
|
||||||
// Use of this source code is governed by a BSD-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
package syncmap_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
)
|
|
||||||
|
|
||||||
// This file contains reference map implementations for unit-tests.
|
|
||||||
|
|
||||||
// mapInterface is the interface Map implements.
|
|
||||||
type mapInterface interface {
|
|
||||||
Load(interface{}) (interface{}, bool)
|
|
||||||
Store(key, value interface{})
|
|
||||||
LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
|
|
||||||
Delete(interface{})
|
|
||||||
Range(func(key, value interface{}) (shouldContinue bool))
|
|
||||||
}
|
|
||||||
|
|
||||||
// RWMutexMap is an implementation of mapInterface using a sync.RWMutex.
|
|
||||||
type RWMutexMap struct {
|
|
||||||
mu sync.RWMutex
|
|
||||||
dirty map[interface{}]interface{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *RWMutexMap) Load(key interface{}) (value interface{}, ok bool) {
|
|
||||||
m.mu.RLock()
|
|
||||||
value, ok = m.dirty[key]
|
|
||||||
m.mu.RUnlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *RWMutexMap) Store(key, value interface{}) {
|
|
||||||
m.mu.Lock()
|
|
||||||
if m.dirty == nil {
|
|
||||||
m.dirty = make(map[interface{}]interface{})
|
|
||||||
}
|
|
||||||
m.dirty[key] = value
|
|
||||||
m.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *RWMutexMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
|
|
||||||
m.mu.Lock()
|
|
||||||
actual, loaded = m.dirty[key]
|
|
||||||
if !loaded {
|
|
||||||
actual = value
|
|
||||||
if m.dirty == nil {
|
|
||||||
m.dirty = make(map[interface{}]interface{})
|
|
||||||
}
|
|
||||||
m.dirty[key] = value
|
|
||||||
}
|
|
||||||
m.mu.Unlock()
|
|
||||||
return actual, loaded
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *RWMutexMap) Delete(key interface{}) {
|
|
||||||
m.mu.Lock()
|
|
||||||
delete(m.dirty, key)
|
|
||||||
m.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *RWMutexMap) Range(f func(key, value interface{}) (shouldContinue bool)) {
|
|
||||||
m.mu.RLock()
|
|
||||||
keys := make([]interface{}, 0, len(m.dirty))
|
|
||||||
for k := range m.dirty {
|
|
||||||
keys = append(keys, k)
|
|
||||||
}
|
|
||||||
m.mu.RUnlock()
|
|
||||||
|
|
||||||
for _, k := range keys {
|
|
||||||
v, ok := m.Load(k)
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if !f(k, v) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeepCopyMap is an implementation of mapInterface using a Mutex and
|
|
||||||
// atomic.Value. It makes deep copies of the map on every write to avoid
|
|
||||||
// acquiring the Mutex in Load.
|
|
||||||
type DeepCopyMap struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
clean atomic.Value
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *DeepCopyMap) Load(key interface{}) (value interface{}, ok bool) {
|
|
||||||
clean, _ := m.clean.Load().(map[interface{}]interface{})
|
|
||||||
value, ok = clean[key]
|
|
||||||
return value, ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *DeepCopyMap) Store(key, value interface{}) {
|
|
||||||
m.mu.Lock()
|
|
||||||
dirty := m.dirty()
|
|
||||||
dirty[key] = value
|
|
||||||
m.clean.Store(dirty)
|
|
||||||
m.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *DeepCopyMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
|
|
||||||
clean, _ := m.clean.Load().(map[interface{}]interface{})
|
|
||||||
actual, loaded = clean[key]
|
|
||||||
if loaded {
|
|
||||||
return actual, loaded
|
|
||||||
}
|
|
||||||
|
|
||||||
m.mu.Lock()
|
|
||||||
// Reload clean in case it changed while we were waiting on m.mu.
|
|
||||||
clean, _ = m.clean.Load().(map[interface{}]interface{})
|
|
||||||
actual, loaded = clean[key]
|
|
||||||
if !loaded {
|
|
||||||
dirty := m.dirty()
|
|
||||||
dirty[key] = value
|
|
||||||
actual = value
|
|
||||||
m.clean.Store(dirty)
|
|
||||||
}
|
|
||||||
m.mu.Unlock()
|
|
||||||
return actual, loaded
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *DeepCopyMap) Delete(key interface{}) {
|
|
||||||
m.mu.Lock()
|
|
||||||
dirty := m.dirty()
|
|
||||||
delete(dirty, key)
|
|
||||||
m.clean.Store(dirty)
|
|
||||||
m.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *DeepCopyMap) Range(f func(key, value interface{}) (shouldContinue bool)) {
|
|
||||||
clean, _ := m.clean.Load().(map[interface{}]interface{})
|
|
||||||
for k, v := range clean {
|
|
||||||
if !f(k, v) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *DeepCopyMap) dirty() map[interface{}]interface{} {
|
|
||||||
clean, _ := m.clean.Load().(map[interface{}]interface{})
|
|
||||||
dirty := make(map[interface{}]interface{}, len(clean)+1)
|
|
||||||
for k, v := range clean {
|
|
||||||
dirty[k] = v
|
|
||||||
}
|
|
||||||
return dirty
|
|
||||||
}
|
|
||||||
|
|
@ -1,172 +0,0 @@
|
||||||
// Copyright 2016 The Go Authors. All rights reserved.
|
|
||||||
// Use of this source code is governed by a BSD-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
package syncmap_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"math/rand"
|
|
||||||
"reflect"
|
|
||||||
"runtime"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"testing/quick"
|
|
||||||
|
|
||||||
"golang.org/x/sync/syncmap"
|
|
||||||
)
|
|
||||||
|
|
||||||
type mapOp string
|
|
||||||
|
|
||||||
const (
|
|
||||||
opLoad = mapOp("Load")
|
|
||||||
opStore = mapOp("Store")
|
|
||||||
opLoadOrStore = mapOp("LoadOrStore")
|
|
||||||
opDelete = mapOp("Delete")
|
|
||||||
)
|
|
||||||
|
|
||||||
var mapOps = [...]mapOp{opLoad, opStore, opLoadOrStore, opDelete}
|
|
||||||
|
|
||||||
// mapCall is a quick.Generator for calls on mapInterface.
|
|
||||||
type mapCall struct {
|
|
||||||
op mapOp
|
|
||||||
k, v interface{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c mapCall) apply(m mapInterface) (interface{}, bool) {
|
|
||||||
switch c.op {
|
|
||||||
case opLoad:
|
|
||||||
return m.Load(c.k)
|
|
||||||
case opStore:
|
|
||||||
m.Store(c.k, c.v)
|
|
||||||
return nil, false
|
|
||||||
case opLoadOrStore:
|
|
||||||
return m.LoadOrStore(c.k, c.v)
|
|
||||||
case opDelete:
|
|
||||||
m.Delete(c.k)
|
|
||||||
return nil, false
|
|
||||||
default:
|
|
||||||
panic("invalid mapOp")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type mapResult struct {
|
|
||||||
value interface{}
|
|
||||||
ok bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func randValue(r *rand.Rand) interface{} {
|
|
||||||
b := make([]byte, r.Intn(4))
|
|
||||||
for i := range b {
|
|
||||||
b[i] = 'a' + byte(rand.Intn(26))
|
|
||||||
}
|
|
||||||
return string(b)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mapCall) Generate(r *rand.Rand, size int) reflect.Value {
|
|
||||||
c := mapCall{op: mapOps[rand.Intn(len(mapOps))], k: randValue(r)}
|
|
||||||
switch c.op {
|
|
||||||
case opStore, opLoadOrStore:
|
|
||||||
c.v = randValue(r)
|
|
||||||
}
|
|
||||||
return reflect.ValueOf(c)
|
|
||||||
}
|
|
||||||
|
|
||||||
func applyCalls(m mapInterface, calls []mapCall) (results []mapResult, final map[interface{}]interface{}) {
|
|
||||||
for _, c := range calls {
|
|
||||||
v, ok := c.apply(m)
|
|
||||||
results = append(results, mapResult{v, ok})
|
|
||||||
}
|
|
||||||
|
|
||||||
final = make(map[interface{}]interface{})
|
|
||||||
m.Range(func(k, v interface{}) bool {
|
|
||||||
final[k] = v
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
return results, final
|
|
||||||
}
|
|
||||||
|
|
||||||
func applyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) {
|
|
||||||
return applyCalls(new(syncmap.Map), calls)
|
|
||||||
}
|
|
||||||
|
|
||||||
func applyRWMutexMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) {
|
|
||||||
return applyCalls(new(RWMutexMap), calls)
|
|
||||||
}
|
|
||||||
|
|
||||||
func applyDeepCopyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) {
|
|
||||||
return applyCalls(new(DeepCopyMap), calls)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMapMatchesRWMutex(t *testing.T) {
|
|
||||||
if err := quick.CheckEqual(applyMap, applyRWMutexMap, nil); err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMapMatchesDeepCopy(t *testing.T) {
|
|
||||||
if err := quick.CheckEqual(applyMap, applyDeepCopyMap, nil); err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConcurrentRange(t *testing.T) {
|
|
||||||
const mapSize = 1 << 10
|
|
||||||
|
|
||||||
m := new(syncmap.Map)
|
|
||||||
for n := int64(1); n <= mapSize; n++ {
|
|
||||||
m.Store(n, n)
|
|
||||||
}
|
|
||||||
|
|
||||||
done := make(chan struct{})
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
defer func() {
|
|
||||||
close(done)
|
|
||||||
wg.Wait()
|
|
||||||
}()
|
|
||||||
for g := int64(runtime.GOMAXPROCS(0)); g > 0; g-- {
|
|
||||||
r := rand.New(rand.NewSource(g))
|
|
||||||
wg.Add(1)
|
|
||||||
go func(g int64) {
|
|
||||||
defer wg.Done()
|
|
||||||
for i := int64(0); ; i++ {
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
for n := int64(1); n < mapSize; n++ {
|
|
||||||
if r.Int63n(mapSize) == 0 {
|
|
||||||
m.Store(n, n*i*g)
|
|
||||||
} else {
|
|
||||||
m.Load(n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(g)
|
|
||||||
}
|
|
||||||
|
|
||||||
iters := 1 << 10
|
|
||||||
if testing.Short() {
|
|
||||||
iters = 16
|
|
||||||
}
|
|
||||||
for n := iters; n > 0; n-- {
|
|
||||||
seen := make(map[int64]bool, mapSize)
|
|
||||||
|
|
||||||
m.Range(func(ki, vi interface{}) bool {
|
|
||||||
k, v := ki.(int64), vi.(int64)
|
|
||||||
if v%k != 0 {
|
|
||||||
t.Fatalf("while Storing multiples of %v, Range saw value %v", k, v)
|
|
||||||
}
|
|
||||||
if seen[k] {
|
|
||||||
t.Fatalf("Range visited key %v twice", k)
|
|
||||||
}
|
|
||||||
seen[k] = true
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
if len(seen) != mapSize {
|
|
||||||
t.Fatalf("Range visited %v elements of %v-element Map", len(seen), mapSize)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue