diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md deleted file mode 100644 index d0485e8..0000000 --- a/CONTRIBUTING.md +++ /dev/null @@ -1,26 +0,0 @@ -# Contributing to Go - -Go is an open source project. - -It is the work of hundreds of contributors. We appreciate your help! - -## Filing issues - -When [filing an issue](https://golang.org/issue/new), make sure to answer these five questions: - -1. What version of Go are you using (`go version`)? -2. What operating system and processor architecture are you using? -3. What did you do? -4. What did you expect to see? -5. What did you see instead? - -General questions should go to the [golang-nuts mailing list](https://groups.google.com/group/golang-nuts) instead of the issue tracker. -The gophers there will answer or ask you to file an issue if you've tripped over a bug. - -## Contributing code - -Please read the [Contribution Guidelines](https://golang.org/doc/contribute.html) -before sending patches. - -Unless otherwise noted, the Go source files are distributed under -the BSD-style license found in the LICENSE file. diff --git a/PATENTS b/PATENTS deleted file mode 100644 index 7330990..0000000 --- a/PATENTS +++ /dev/null @@ -1,22 +0,0 @@ -Additional IP Rights Grant (Patents) - -"This implementation" means the copyrightable works distributed by -Google as part of the Go project. - -Google hereby grants to You a perpetual, worldwide, non-exclusive, -no-charge, royalty-free, irrevocable (except as stated in this section) -patent license to make, have made, use, offer to sell, sell, import, -transfer and otherwise run, modify and propagate the contents of this -implementation of Go, where such license applies only to those patent -claims, both currently owned or controlled by Google and acquired in -the future, licensable by Google that are necessarily infringed by this -implementation of Go. This grant does not include claims that would be -infringed only as a consequence of further modification of this -implementation. If you or your agent or exclusive licensee institute or -order or agree to the institution of patent litigation against any -entity (including a cross-claim or counterclaim in a lawsuit) alleging -that this implementation of Go or any code incorporated within this -implementation of Go constitutes direct or contributory patent -infringement, or inducement of patent infringement, then any patent -rights granted to you under this License for this implementation of Go -shall terminate as of the date such litigation is filed. diff --git a/README.md b/README.md deleted file mode 100644 index 4cb3151..0000000 --- a/README.md +++ /dev/null @@ -1,17 +0,0 @@ -# Go Sync - -[![Go Reference](https://pkg.go.dev/badge/golang.org/x/sync.svg)](https://pkg.go.dev/golang.org/x/sync) - -This repository provides Go concurrency primitives in addition to the -ones provided by the language and "sync" and "sync/atomic" packages. - -## Report Issues / Send Patches - -This repository uses Gerrit for code changes. To learn how to submit changes to -this repository, see https://go.dev/doc/contribute. - -The git repository is https://go.googlesource.com/sync. - -The main issue tracker for the sync repository is located at -https://go.dev/issues. Prefix your issue with "x/sync:" in the -subject line, so it is easy to find. diff --git a/codereview.cfg b/codereview.cfg deleted file mode 100644 index 3f8b14b..0000000 --- a/codereview.cfg +++ /dev/null @@ -1 +0,0 @@ -issuerepo: golang/go diff --git a/errgroup/errgroup.go b/errgroup.go similarity index 77% rename from errgroup/errgroup.go rename to errgroup.go index b832259..1d8cffa 100644 --- a/errgroup/errgroup.go +++ b/errgroup.go @@ -18,7 +18,7 @@ import ( type token struct{} // A Group is a collection of goroutines working on subtasks that are part of -// the same overall task. +// the same overall task. A Group should not be reused for different tasks. // // A zero Group is valid, has no limit on the number of active goroutines, // and does not cancel on error. @@ -46,7 +46,7 @@ func (g *Group) done() { // returns a non-nil error or the first time Wait returns, whichever occurs // first. func WithContext(ctx context.Context) (*Group, context.Context) { - ctx, cancel := withCancelCause(ctx) + ctx, cancel := context.WithCancelCause(ctx) return &Group{cancel: cancel}, ctx } @@ -61,11 +61,14 @@ func (g *Group) Wait() error { } // Go calls the given function in a new goroutine. -// It blocks until the new goroutine can be added without the number of -// active goroutines in the group exceeding the configured limit. // -// The first call to return a non-nil error cancels the group's context, if the -// group was created by calling WithContext. The error will be returned by Wait. +// The first call to Go must happen before a Wait. +// It blocks until the new goroutine can be added without the number of +// goroutines in the group exceeding the configured limit. +// +// The first goroutine in the group that returns a non-nil error will +// cancel the associated Context, if any. The error will be returned +// by Wait. func (g *Group) Go(f func() error) { if g.sem != nil { g.sem <- token{} @@ -75,6 +78,18 @@ func (g *Group) Go(f func() error) { go func() { defer g.done() + // It is tempting to propagate panics from f() + // up to the goroutine that calls Wait, but + // it creates more problems than it solves: + // - it delays panics arbitrarily, + // making bugs harder to detect; + // - it turns f's panic stack into a mere value, + // hiding it from crash-monitoring tools; + // - it risks deadlocks that hide the panic entirely, + // if f's panic leaves the program in a state + // that prevents the Wait call from being reached. + // See #53757, #74275, #74304, #74306. + if err := f(); err != nil { g.errOnce.Do(func() { g.err = err diff --git a/errgroup/go120.go b/errgroup/go120.go deleted file mode 100644 index f93c740..0000000 --- a/errgroup/go120.go +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2023 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -//go:build go1.20 - -package errgroup - -import "context" - -func withCancelCause(parent context.Context) (context.Context, func(error)) { - return context.WithCancelCause(parent) -} diff --git a/errgroup/go120_test.go b/errgroup/go120_test.go deleted file mode 100644 index 068f104..0000000 --- a/errgroup/go120_test.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2023 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -//go:build go1.20 - -package errgroup_test - -import ( - "context" - "errors" - "testing" - - "golang.org/x/sync/errgroup" -) - -func TestCancelCause(t *testing.T) { - errDoom := errors.New("group_test: doomed") - - cases := []struct { - errs []error - want error - }{ - {want: nil}, - {errs: []error{nil}, want: nil}, - {errs: []error{errDoom}, want: errDoom}, - {errs: []error{errDoom, nil}, want: errDoom}, - } - - for _, tc := range cases { - g, ctx := errgroup.WithContext(context.Background()) - - for _, err := range tc.errs { - err := err - g.TryGo(func() error { return err }) - } - - if err := g.Wait(); err != tc.want { - t.Errorf("after %T.TryGo(func() error { return err }) for err in %v\n"+ - "g.Wait() = %v; want %v", - g, tc.errs, err, tc.want) - } - - if tc.want == nil { - tc.want = context.Canceled - } - - if err := context.Cause(ctx); err != tc.want { - t.Errorf("after %T.TryGo(func() error { return err }) for err in %v\n"+ - "context.Cause(ctx) = %v; tc.want %v", - g, tc.errs, err, tc.want) - } - } -} diff --git a/errgroup/pre_go120.go b/errgroup/pre_go120.go deleted file mode 100644 index 88ce334..0000000 --- a/errgroup/pre_go120.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright 2023 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -//go:build !go1.20 - -package errgroup - -import "context" - -func withCancelCause(parent context.Context) (context.Context, func(error)) { - ctx, cancel := context.WithCancel(parent) - return ctx, func(error) { cancel() } -} diff --git a/errgroup/errgroup_example_md5all_test.go b/errgroup_example_md5all_test.go similarity index 96% rename from errgroup/errgroup_example_md5all_test.go rename to errgroup_example_md5all_test.go index 739b336..454f72d 100644 --- a/errgroup/errgroup_example_md5all_test.go +++ b/errgroup_example_md5all_test.go @@ -8,12 +8,11 @@ import ( "context" "crypto/md5" "fmt" - "io/ioutil" "log" "os" "path/filepath" - "golang.org/x/sync/errgroup" + "codeberg.org/danjones000/errgroup" ) // Pipeline demonstrates the use of a Group to implement a multi-stage @@ -69,7 +68,7 @@ func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) for i := 0; i < numDigesters; i++ { g.Go(func() error { for path := range paths { - data, err := ioutil.ReadFile(path) + data, err := os.ReadFile(path) if err != nil { return err } diff --git a/errgroup/errgroup_test.go b/errgroup_test.go similarity index 85% rename from errgroup/errgroup_test.go rename to errgroup_test.go index 0358842..05e81e6 100644 --- a/errgroup/errgroup_test.go +++ b/errgroup_test.go @@ -14,7 +14,7 @@ import ( "testing" "time" - "golang.org/x/sync/errgroup" + "codeberg.org/danjones000/errgroup" ) var ( @@ -250,6 +250,45 @@ func TestGoLimit(t *testing.T) { } } +func TestCancelCause(t *testing.T) { + errDoom := errors.New("group_test: doomed") + + cases := []struct { + errs []error + want error + }{ + {want: nil}, + {errs: []error{nil}, want: nil}, + {errs: []error{errDoom}, want: errDoom}, + {errs: []error{errDoom, nil}, want: errDoom}, + } + + for _, tc := range cases { + g, ctx := errgroup.WithContext(context.Background()) + + for _, err := range tc.errs { + err := err + g.TryGo(func() error { return err }) + } + + if err := g.Wait(); err != tc.want { + t.Errorf("after %T.TryGo(func() error { return err }) for err in %v\n"+ + "g.Wait() = %v; want %v", + g, tc.errs, err, tc.want) + } + + if tc.want == nil { + tc.want = context.Canceled + } + + if err := context.Cause(ctx); err != tc.want { + t.Errorf("after %T.TryGo(func() error { return err }) for err in %v\n"+ + "context.Cause(ctx) = %v; tc.want %v", + g, tc.errs, err, tc.want) + } + } +} + func BenchmarkGo(b *testing.B) { fn := func() {} g := &errgroup.Group{} diff --git a/go.mod b/go.mod index 74bd0ac..8bb13f8 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ -module golang.org/x/sync +module codeberg.org/danjones000/errgroup -go 1.18 +go 1.25.0 diff --git a/semaphore/semaphore.go b/semaphore/semaphore.go deleted file mode 100644 index b618162..0000000 --- a/semaphore/semaphore.go +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright 2017 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package semaphore provides a weighted semaphore implementation. -package semaphore // import "golang.org/x/sync/semaphore" - -import ( - "container/list" - "context" - "sync" -) - -type waiter struct { - n int64 - ready chan<- struct{} // Closed when semaphore acquired. -} - -// NewWeighted creates a new weighted semaphore with the given -// maximum combined weight for concurrent access. -func NewWeighted(n int64) *Weighted { - w := &Weighted{size: n} - return w -} - -// Weighted provides a way to bound concurrent access to a resource. -// The callers can request access with a given weight. -type Weighted struct { - size int64 - cur int64 - mu sync.Mutex - waiters list.List -} - -// Acquire acquires the semaphore with a weight of n, blocking until resources -// are available or ctx is done. On success, returns nil. On failure, returns -// ctx.Err() and leaves the semaphore unchanged. -func (s *Weighted) Acquire(ctx context.Context, n int64) error { - done := ctx.Done() - - s.mu.Lock() - select { - case <-done: - // ctx becoming done has "happened before" acquiring the semaphore, - // whether it became done before the call began or while we were - // waiting for the mutex. We prefer to fail even if we could acquire - // the mutex without blocking. - s.mu.Unlock() - return ctx.Err() - default: - } - if s.size-s.cur >= n && s.waiters.Len() == 0 { - // Since we hold s.mu and haven't synchronized since checking done, if - // ctx becomes done before we return here, it becoming done must have - // "happened concurrently" with this call - it cannot "happen before" - // we return in this branch. So, we're ok to always acquire here. - s.cur += n - s.mu.Unlock() - return nil - } - - if n > s.size { - // Don't make other Acquire calls block on one that's doomed to fail. - s.mu.Unlock() - <-done - return ctx.Err() - } - - ready := make(chan struct{}) - w := waiter{n: n, ready: ready} - elem := s.waiters.PushBack(w) - s.mu.Unlock() - - select { - case <-done: - s.mu.Lock() - select { - case <-ready: - // Acquired the semaphore after we were canceled. - // Pretend we didn't and put the tokens back. - s.cur -= n - s.notifyWaiters() - default: - isFront := s.waiters.Front() == elem - s.waiters.Remove(elem) - // If we're at the front and there're extra tokens left, notify other waiters. - if isFront && s.size > s.cur { - s.notifyWaiters() - } - } - s.mu.Unlock() - return ctx.Err() - - case <-ready: - // Acquired the semaphore. Check that ctx isn't already done. - // We check the done channel instead of calling ctx.Err because we - // already have the channel, and ctx.Err is O(n) with the nesting - // depth of ctx. - select { - case <-done: - s.Release(n) - return ctx.Err() - default: - } - return nil - } -} - -// TryAcquire acquires the semaphore with a weight of n without blocking. -// On success, returns true. On failure, returns false and leaves the semaphore unchanged. -func (s *Weighted) TryAcquire(n int64) bool { - s.mu.Lock() - success := s.size-s.cur >= n && s.waiters.Len() == 0 - if success { - s.cur += n - } - s.mu.Unlock() - return success -} - -// Release releases the semaphore with a weight of n. -func (s *Weighted) Release(n int64) { - s.mu.Lock() - s.cur -= n - if s.cur < 0 { - s.mu.Unlock() - panic("semaphore: released more than held") - } - s.notifyWaiters() - s.mu.Unlock() -} - -func (s *Weighted) notifyWaiters() { - for { - next := s.waiters.Front() - if next == nil { - break // No more waiters blocked. - } - - w := next.Value.(waiter) - if s.size-s.cur < w.n { - // Not enough tokens for the next waiter. We could keep going (to try to - // find a waiter with a smaller request), but under load that could cause - // starvation for large requests; instead, we leave all remaining waiters - // blocked. - // - // Consider a semaphore used as a read-write lock, with N tokens, N - // readers, and one writer. Each reader can Acquire(1) to obtain a read - // lock. The writer can Acquire(N) to obtain a write lock, excluding all - // of the readers. If we allow the readers to jump ahead in the queue, - // the writer will starve — there is always one token available for every - // reader. - break - } - - s.cur += w.n - s.waiters.Remove(next) - close(w.ready) - } -} diff --git a/semaphore/semaphore_bench_test.go b/semaphore/semaphore_bench_test.go deleted file mode 100644 index aa64258..0000000 --- a/semaphore/semaphore_bench_test.go +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright 2017 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package semaphore_test - -import ( - "context" - "fmt" - "testing" - - "golang.org/x/sync/semaphore" -) - -// weighted is an interface matching a subset of *Weighted. It allows -// alternate implementations for testing and benchmarking. -type weighted interface { - Acquire(context.Context, int64) error - TryAcquire(int64) bool - Release(int64) -} - -// semChan implements Weighted using a channel for -// comparing against the condition variable-based implementation. -type semChan chan struct{} - -func newSemChan(n int64) semChan { - return semChan(make(chan struct{}, n)) -} - -func (s semChan) Acquire(_ context.Context, n int64) error { - for i := int64(0); i < n; i++ { - s <- struct{}{} - } - return nil -} - -func (s semChan) TryAcquire(n int64) bool { - if int64(len(s))+n > int64(cap(s)) { - return false - } - - for i := int64(0); i < n; i++ { - s <- struct{}{} - } - return true -} - -func (s semChan) Release(n int64) { - for i := int64(0); i < n; i++ { - <-s - } -} - -// acquireN calls Acquire(size) on sem N times and then calls Release(size) N times. -func acquireN(b *testing.B, sem weighted, size int64, N int) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - for j := 0; j < N; j++ { - sem.Acquire(context.Background(), size) - } - for j := 0; j < N; j++ { - sem.Release(size) - } - } -} - -// tryAcquireN calls TryAcquire(size) on sem N times and then calls Release(size) N times. -func tryAcquireN(b *testing.B, sem weighted, size int64, N int) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - for j := 0; j < N; j++ { - if !sem.TryAcquire(size) { - b.Fatalf("TryAcquire(%v) = false, want true", size) - } - } - for j := 0; j < N; j++ { - sem.Release(size) - } - } -} - -func BenchmarkNewSeq(b *testing.B) { - for _, cap := range []int64{1, 128} { - b.Run(fmt.Sprintf("Weighted-%d", cap), func(b *testing.B) { - for i := 0; i < b.N; i++ { - _ = semaphore.NewWeighted(cap) - } - }) - b.Run(fmt.Sprintf("semChan-%d", cap), func(b *testing.B) { - for i := 0; i < b.N; i++ { - _ = newSemChan(cap) - } - }) - } -} - -func BenchmarkAcquireSeq(b *testing.B) { - for _, c := range []struct { - cap, size int64 - N int - }{ - {1, 1, 1}, - {2, 1, 1}, - {16, 1, 1}, - {128, 1, 1}, - {2, 2, 1}, - {16, 2, 8}, - {128, 2, 64}, - {2, 1, 2}, - {16, 8, 2}, - {128, 64, 2}, - } { - for _, w := range []struct { - name string - w weighted - }{ - {"Weighted", semaphore.NewWeighted(c.cap)}, - {"semChan", newSemChan(c.cap)}, - } { - b.Run(fmt.Sprintf("%s-acquire-%d-%d-%d", w.name, c.cap, c.size, c.N), func(b *testing.B) { - acquireN(b, w.w, c.size, c.N) - }) - b.Run(fmt.Sprintf("%s-tryAcquire-%d-%d-%d", w.name, c.cap, c.size, c.N), func(b *testing.B) { - tryAcquireN(b, w.w, c.size, c.N) - }) - } - } -} diff --git a/semaphore/semaphore_example_test.go b/semaphore/semaphore_example_test.go deleted file mode 100644 index e75cd79..0000000 --- a/semaphore/semaphore_example_test.go +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright 2017 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package semaphore_test - -import ( - "context" - "fmt" - "log" - "runtime" - - "golang.org/x/sync/semaphore" -) - -// Example_workerPool demonstrates how to use a semaphore to limit the number of -// goroutines working on parallel tasks. -// -// This use of a semaphore mimics a typical “worker pool” pattern, but without -// the need to explicitly shut down idle workers when the work is done. -func Example_workerPool() { - ctx := context.TODO() - - var ( - maxWorkers = runtime.GOMAXPROCS(0) - sem = semaphore.NewWeighted(int64(maxWorkers)) - out = make([]int, 32) - ) - - // Compute the output using up to maxWorkers goroutines at a time. - for i := range out { - // When maxWorkers goroutines are in flight, Acquire blocks until one of the - // workers finishes. - if err := sem.Acquire(ctx, 1); err != nil { - log.Printf("Failed to acquire semaphore: %v", err) - break - } - - go func(i int) { - defer sem.Release(1) - out[i] = collatzSteps(i + 1) - }(i) - } - - // Acquire all of the tokens to wait for any remaining workers to finish. - // - // If you are already waiting for the workers by some other means (such as an - // errgroup.Group), you can omit this final Acquire call. - if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil { - log.Printf("Failed to acquire semaphore: %v", err) - } - - fmt.Println(out) - - // Output: - // [0 1 7 2 5 8 16 3 19 6 14 9 9 17 17 4 12 20 20 7 7 15 15 10 23 10 111 18 18 18 106 5] -} - -// collatzSteps computes the number of steps to reach 1 under the Collatz -// conjecture. (See https://en.wikipedia.org/wiki/Collatz_conjecture.) -func collatzSteps(n int) (steps int) { - if n <= 0 { - panic("nonpositive input") - } - - for ; n > 1; steps++ { - if steps < 0 { - panic("too many steps") - } - - if n%2 == 0 { - n /= 2 - continue - } - - const maxInt = int(^uint(0) >> 1) - if n > (maxInt-1)/3 { - panic("overflow") - } - n = 3*n + 1 - } - - return steps -} diff --git a/semaphore/semaphore_test.go b/semaphore/semaphore_test.go deleted file mode 100644 index 61012d6..0000000 --- a/semaphore/semaphore_test.go +++ /dev/null @@ -1,237 +0,0 @@ -// Copyright 2017 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package semaphore_test - -import ( - "context" - "math/rand" - "runtime" - "sync" - "testing" - "time" - - "golang.org/x/sync/errgroup" - "golang.org/x/sync/semaphore" -) - -const maxSleep = 1 * time.Millisecond - -func HammerWeighted(sem *semaphore.Weighted, n int64, loops int) { - for i := 0; i < loops; i++ { - sem.Acquire(context.Background(), n) - time.Sleep(time.Duration(rand.Int63n(int64(maxSleep/time.Nanosecond))) * time.Nanosecond) - sem.Release(n) - } -} - -func TestWeighted(t *testing.T) { - t.Parallel() - - n := runtime.GOMAXPROCS(0) - loops := 10000 / n - sem := semaphore.NewWeighted(int64(n)) - var wg sync.WaitGroup - wg.Add(n) - for i := 0; i < n; i++ { - i := i - go func() { - defer wg.Done() - HammerWeighted(sem, int64(i), loops) - }() - } - wg.Wait() -} - -func TestWeightedPanic(t *testing.T) { - t.Parallel() - - defer func() { - if recover() == nil { - t.Fatal("release of an unacquired weighted semaphore did not panic") - } - }() - w := semaphore.NewWeighted(1) - w.Release(1) -} - -func TestWeightedTryAcquire(t *testing.T) { - t.Parallel() - - ctx := context.Background() - sem := semaphore.NewWeighted(2) - tries := []bool{} - sem.Acquire(ctx, 1) - tries = append(tries, sem.TryAcquire(1)) - tries = append(tries, sem.TryAcquire(1)) - - sem.Release(2) - - tries = append(tries, sem.TryAcquire(1)) - sem.Acquire(ctx, 1) - tries = append(tries, sem.TryAcquire(1)) - - want := []bool{true, false, true, false} - for i := range tries { - if tries[i] != want[i] { - t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i]) - } - } -} - -func TestWeightedAcquire(t *testing.T) { - t.Parallel() - - ctx := context.Background() - sem := semaphore.NewWeighted(2) - tryAcquire := func(n int64) bool { - ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) - defer cancel() - return sem.Acquire(ctx, n) == nil - } - - tries := []bool{} - sem.Acquire(ctx, 1) - tries = append(tries, tryAcquire(1)) - tries = append(tries, tryAcquire(1)) - - sem.Release(2) - - tries = append(tries, tryAcquire(1)) - sem.Acquire(ctx, 1) - tries = append(tries, tryAcquire(1)) - - want := []bool{true, false, true, false} - for i := range tries { - if tries[i] != want[i] { - t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i]) - } - } -} - -func TestWeightedDoesntBlockIfTooBig(t *testing.T) { - t.Parallel() - - const n = 2 - sem := semaphore.NewWeighted(n) - { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go sem.Acquire(ctx, n+1) - } - - g, ctx := errgroup.WithContext(context.Background()) - for i := n * 3; i > 0; i-- { - g.Go(func() error { - err := sem.Acquire(ctx, 1) - if err == nil { - time.Sleep(1 * time.Millisecond) - sem.Release(1) - } - return err - }) - } - if err := g.Wait(); err != nil { - t.Errorf("semaphore.NewWeighted(%v) failed to AcquireCtx(_, 1) with AcquireCtx(_, %v) pending", n, n+1) - } -} - -// TestLargeAcquireDoesntStarve times out if a large call to Acquire starves. -// Merely returning from the test function indicates success. -func TestLargeAcquireDoesntStarve(t *testing.T) { - t.Parallel() - - ctx := context.Background() - n := int64(runtime.GOMAXPROCS(0)) - sem := semaphore.NewWeighted(n) - running := true - - var wg sync.WaitGroup - wg.Add(int(n)) - for i := n; i > 0; i-- { - sem.Acquire(ctx, 1) - go func() { - defer func() { - sem.Release(1) - wg.Done() - }() - for running { - time.Sleep(1 * time.Millisecond) - sem.Release(1) - sem.Acquire(ctx, 1) - } - }() - } - - sem.Acquire(ctx, n) - running = false - sem.Release(n) - wg.Wait() -} - -// translated from https://github.com/zhiqiangxu/util/blob/master/mutex/crwmutex_test.go#L43 -func TestAllocCancelDoesntStarve(t *testing.T) { - sem := semaphore.NewWeighted(10) - - // Block off a portion of the semaphore so that Acquire(_, 10) can eventually succeed. - sem.Acquire(context.Background(), 1) - - // In the background, Acquire(_, 10). - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go func() { - sem.Acquire(ctx, 10) - }() - - // Wait until the Acquire(_, 10) call blocks. - for sem.TryAcquire(1) { - sem.Release(1) - runtime.Gosched() - } - - // Now try to grab a read lock, and simultaneously unblock the Acquire(_, 10) call. - // Both Acquire calls should unblock and return, in either order. - go cancel() - - err := sem.Acquire(context.Background(), 1) - if err != nil { - t.Fatalf("Acquire(_, 1) failed unexpectedly: %v", err) - } - sem.Release(1) -} - -func TestWeightedAcquireCanceled(t *testing.T) { - // https://go.dev/issue/63615 - sem := semaphore.NewWeighted(2) - ctx, cancel := context.WithCancel(context.Background()) - sem.Acquire(context.Background(), 1) - ch := make(chan struct{}) - go func() { - // Synchronize with the Acquire(2) below. - for sem.TryAcquire(1) { - sem.Release(1) - } - // Now cancel ctx, and then release the token. - cancel() - sem.Release(1) - close(ch) - }() - // Since the context closing happens before enough tokens become available, - // this Acquire must fail. - if err := sem.Acquire(ctx, 2); err != context.Canceled { - t.Errorf("Acquire with canceled context returned wrong error: want context.Canceled, got %v", err) - } - // There must always be two tokens in the semaphore after the other - // goroutine releases the one we held at the start. - <-ch - if !sem.TryAcquire(2) { - t.Fatal("TryAcquire after canceled Acquire failed") - } - // Additionally verify that we don't acquire with a done context even when - // we wouldn't need to block to do so. - sem.Release(2) - if err := sem.Acquire(ctx, 1); err != context.Canceled { - t.Errorf("Acquire with canceled context returned wrong error: want context.Canceled, got %v", err) - } -} diff --git a/singleflight/singleflight.go b/singleflight/singleflight.go deleted file mode 100644 index 4051830..0000000 --- a/singleflight/singleflight.go +++ /dev/null @@ -1,214 +0,0 @@ -// Copyright 2013 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package singleflight provides a duplicate function call suppression -// mechanism. -package singleflight // import "golang.org/x/sync/singleflight" - -import ( - "bytes" - "errors" - "fmt" - "runtime" - "runtime/debug" - "sync" -) - -// errGoexit indicates the runtime.Goexit was called in -// the user given function. -var errGoexit = errors.New("runtime.Goexit was called") - -// A panicError is an arbitrary value recovered from a panic -// with the stack trace during the execution of given function. -type panicError struct { - value interface{} - stack []byte -} - -// Error implements error interface. -func (p *panicError) Error() string { - return fmt.Sprintf("%v\n\n%s", p.value, p.stack) -} - -func (p *panicError) Unwrap() error { - err, ok := p.value.(error) - if !ok { - return nil - } - - return err -} - -func newPanicError(v interface{}) error { - stack := debug.Stack() - - // The first line of the stack trace is of the form "goroutine N [status]:" - // but by the time the panic reaches Do the goroutine may no longer exist - // and its status will have changed. Trim out the misleading line. - if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { - stack = stack[line+1:] - } - return &panicError{value: v, stack: stack} -} - -// call is an in-flight or completed singleflight.Do call -type call struct { - wg sync.WaitGroup - - // These fields are written once before the WaitGroup is done - // and are only read after the WaitGroup is done. - val interface{} - err error - - // These fields are read and written with the singleflight - // mutex held before the WaitGroup is done, and are read but - // not written after the WaitGroup is done. - dups int - chans []chan<- Result -} - -// Group represents a class of work and forms a namespace in -// which units of work can be executed with duplicate suppression. -type Group struct { - mu sync.Mutex // protects m - m map[string]*call // lazily initialized -} - -// Result holds the results of Do, so they can be passed -// on a channel. -type Result struct { - Val interface{} - Err error - Shared bool -} - -// Do executes and returns the results of the given function, making -// sure that only one execution is in-flight for a given key at a -// time. If a duplicate comes in, the duplicate caller waits for the -// original to complete and receives the same results. -// The return value shared indicates whether v was given to multiple callers. -func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { - g.mu.Lock() - if g.m == nil { - g.m = make(map[string]*call) - } - if c, ok := g.m[key]; ok { - c.dups++ - g.mu.Unlock() - c.wg.Wait() - - if e, ok := c.err.(*panicError); ok { - panic(e) - } else if c.err == errGoexit { - runtime.Goexit() - } - return c.val, c.err, true - } - c := new(call) - c.wg.Add(1) - g.m[key] = c - g.mu.Unlock() - - g.doCall(c, key, fn) - return c.val, c.err, c.dups > 0 -} - -// DoChan is like Do but returns a channel that will receive the -// results when they are ready. -// -// The returned channel will not be closed. -func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { - ch := make(chan Result, 1) - g.mu.Lock() - if g.m == nil { - g.m = make(map[string]*call) - } - if c, ok := g.m[key]; ok { - c.dups++ - c.chans = append(c.chans, ch) - g.mu.Unlock() - return ch - } - c := &call{chans: []chan<- Result{ch}} - c.wg.Add(1) - g.m[key] = c - g.mu.Unlock() - - go g.doCall(c, key, fn) - - return ch -} - -// doCall handles the single call for a key. -func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { - normalReturn := false - recovered := false - - // use double-defer to distinguish panic from runtime.Goexit, - // more details see https://golang.org/cl/134395 - defer func() { - // the given function invoked runtime.Goexit - if !normalReturn && !recovered { - c.err = errGoexit - } - - g.mu.Lock() - defer g.mu.Unlock() - c.wg.Done() - if g.m[key] == c { - delete(g.m, key) - } - - if e, ok := c.err.(*panicError); ok { - // In order to prevent the waiting channels from being blocked forever, - // needs to ensure that this panic cannot be recovered. - if len(c.chans) > 0 { - go panic(e) - select {} // Keep this goroutine around so that it will appear in the crash dump. - } else { - panic(e) - } - } else if c.err == errGoexit { - // Already in the process of goexit, no need to call again - } else { - // Normal return - for _, ch := range c.chans { - ch <- Result{c.val, c.err, c.dups > 0} - } - } - }() - - func() { - defer func() { - if !normalReturn { - // Ideally, we would wait to take a stack trace until we've determined - // whether this is a panic or a runtime.Goexit. - // - // Unfortunately, the only way we can distinguish the two is to see - // whether the recover stopped the goroutine from terminating, and by - // the time we know that, the part of the stack trace relevant to the - // panic has been discarded. - if r := recover(); r != nil { - c.err = newPanicError(r) - } - } - }() - - c.val, c.err = fn() - normalReturn = true - }() - - if !normalReturn { - recovered = true - } -} - -// Forget tells the singleflight to forget about a key. Future calls -// to Do for this key will call the function rather than waiting for -// an earlier call to complete. -func (g *Group) Forget(key string) { - g.mu.Lock() - delete(g.m, key) - g.mu.Unlock() -} diff --git a/singleflight/singleflight_test.go b/singleflight/singleflight_test.go deleted file mode 100644 index 853ec42..0000000 --- a/singleflight/singleflight_test.go +++ /dev/null @@ -1,422 +0,0 @@ -// Copyright 2013 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package singleflight - -import ( - "bytes" - "errors" - "fmt" - "os" - "os/exec" - "runtime" - "runtime/debug" - "strings" - "sync" - "sync/atomic" - "testing" - "time" -) - -type errValue struct{} - -func (err *errValue) Error() string { - return "error value" -} - -func TestPanicErrorUnwrap(t *testing.T) { - t.Parallel() - - testCases := []struct { - name string - panicValue interface{} - wrappedErrorType bool - }{ - { - name: "panicError wraps non-error type", - panicValue: &panicError{value: "string value"}, - wrappedErrorType: false, - }, - { - name: "panicError wraps error type", - panicValue: &panicError{value: new(errValue)}, - wrappedErrorType: false, - }, - } - - for _, tc := range testCases { - tc := tc - - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - var recovered interface{} - - group := &Group{} - - func() { - defer func() { - recovered = recover() - t.Logf("after panic(%#v) in group.Do, recovered %#v", tc.panicValue, recovered) - }() - - _, _, _ = group.Do(tc.name, func() (interface{}, error) { - panic(tc.panicValue) - }) - }() - - if recovered == nil { - t.Fatal("expected a non-nil panic value") - } - - err, ok := recovered.(error) - if !ok { - t.Fatalf("recovered non-error type: %T", recovered) - } - - if !errors.Is(err, new(errValue)) && tc.wrappedErrorType { - t.Errorf("unexpected wrapped error type %T; want %T", err, new(errValue)) - } - }) - } -} - -func TestDo(t *testing.T) { - var g Group - v, err, _ := g.Do("key", func() (interface{}, error) { - return "bar", nil - }) - if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want { - t.Errorf("Do = %v; want %v", got, want) - } - if err != nil { - t.Errorf("Do error = %v", err) - } -} - -func TestDoErr(t *testing.T) { - var g Group - someErr := errors.New("Some error") - v, err, _ := g.Do("key", func() (interface{}, error) { - return nil, someErr - }) - if err != someErr { - t.Errorf("Do error = %v; want someErr %v", err, someErr) - } - if v != nil { - t.Errorf("unexpected non-nil value %#v", v) - } -} - -func TestDoDupSuppress(t *testing.T) { - var g Group - var wg1, wg2 sync.WaitGroup - c := make(chan string, 1) - var calls int32 - fn := func() (interface{}, error) { - if atomic.AddInt32(&calls, 1) == 1 { - // First invocation. - wg1.Done() - } - v := <-c - c <- v // pump; make available for any future calls - - time.Sleep(10 * time.Millisecond) // let more goroutines enter Do - - return v, nil - } - - const n = 10 - wg1.Add(1) - for i := 0; i < n; i++ { - wg1.Add(1) - wg2.Add(1) - go func() { - defer wg2.Done() - wg1.Done() - v, err, _ := g.Do("key", fn) - if err != nil { - t.Errorf("Do error: %v", err) - return - } - if s, _ := v.(string); s != "bar" { - t.Errorf("Do = %T %v; want %q", v, v, "bar") - } - }() - } - wg1.Wait() - // At least one goroutine is in fn now and all of them have at - // least reached the line before the Do. - c <- "bar" - wg2.Wait() - if got := atomic.LoadInt32(&calls); got <= 0 || got >= n { - t.Errorf("number of calls = %d; want over 0 and less than %d", got, n) - } -} - -// Test that singleflight behaves correctly after Forget called. -// See https://github.com/golang/go/issues/31420 -func TestForget(t *testing.T) { - var g Group - - var ( - firstStarted = make(chan struct{}) - unblockFirst = make(chan struct{}) - firstFinished = make(chan struct{}) - ) - - go func() { - g.Do("key", func() (i interface{}, e error) { - close(firstStarted) - <-unblockFirst - close(firstFinished) - return - }) - }() - <-firstStarted - g.Forget("key") - - unblockSecond := make(chan struct{}) - secondResult := g.DoChan("key", func() (i interface{}, e error) { - <-unblockSecond - return 2, nil - }) - - close(unblockFirst) - <-firstFinished - - thirdResult := g.DoChan("key", func() (i interface{}, e error) { - return 3, nil - }) - - close(unblockSecond) - <-secondResult - r := <-thirdResult - if r.Val != 2 { - t.Errorf("We should receive result produced by second call, expected: 2, got %d", r.Val) - } -} - -func TestDoChan(t *testing.T) { - var g Group - ch := g.DoChan("key", func() (interface{}, error) { - return "bar", nil - }) - - res := <-ch - v := res.Val - err := res.Err - if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want { - t.Errorf("Do = %v; want %v", got, want) - } - if err != nil { - t.Errorf("Do error = %v", err) - } -} - -// Test singleflight behaves correctly after Do panic. -// See https://github.com/golang/go/issues/41133 -func TestPanicDo(t *testing.T) { - var g Group - fn := func() (interface{}, error) { - panic("invalid memory address or nil pointer dereference") - } - - const n = 5 - waited := int32(n) - panicCount := int32(0) - done := make(chan struct{}) - for i := 0; i < n; i++ { - go func() { - defer func() { - if err := recover(); err != nil { - t.Logf("Got panic: %v\n%s", err, debug.Stack()) - atomic.AddInt32(&panicCount, 1) - } - - if atomic.AddInt32(&waited, -1) == 0 { - close(done) - } - }() - - g.Do("key", fn) - }() - } - - select { - case <-done: - if panicCount != n { - t.Errorf("Expect %d panic, but got %d", n, panicCount) - } - case <-time.After(time.Second): - t.Fatalf("Do hangs") - } -} - -func TestGoexitDo(t *testing.T) { - var g Group - fn := func() (interface{}, error) { - runtime.Goexit() - return nil, nil - } - - const n = 5 - waited := int32(n) - done := make(chan struct{}) - for i := 0; i < n; i++ { - go func() { - var err error - defer func() { - if err != nil { - t.Errorf("Error should be nil, but got: %v", err) - } - if atomic.AddInt32(&waited, -1) == 0 { - close(done) - } - }() - _, err, _ = g.Do("key", fn) - }() - } - - select { - case <-done: - case <-time.After(time.Second): - t.Fatalf("Do hangs") - } -} - -func executable(t testing.TB) string { - exe, err := os.Executable() - if err != nil { - t.Skipf("skipping: test executable not found") - } - - // Control case: check whether exec.Command works at all. - // (For example, it might fail with a permission error on iOS.) - cmd := exec.Command(exe, "-test.list=^$") - cmd.Env = []string{} - if err := cmd.Run(); err != nil { - t.Skipf("skipping: exec appears not to work on %s: %v", runtime.GOOS, err) - } - - return exe -} - -func TestPanicDoChan(t *testing.T) { - if os.Getenv("TEST_PANIC_DOCHAN") != "" { - defer func() { - recover() - }() - - g := new(Group) - ch := g.DoChan("", func() (interface{}, error) { - panic("Panicking in DoChan") - }) - <-ch - t.Fatalf("DoChan unexpectedly returned") - } - - t.Parallel() - - cmd := exec.Command(executable(t), "-test.run="+t.Name(), "-test.v") - cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1") - out := new(bytes.Buffer) - cmd.Stdout = out - cmd.Stderr = out - if err := cmd.Start(); err != nil { - t.Fatal(err) - } - - err := cmd.Wait() - t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out) - if err == nil { - t.Errorf("Test subprocess passed; want a crash due to panic in DoChan") - } - if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) { - t.Errorf("Test subprocess failed with an unexpected failure mode.") - } - if !bytes.Contains(out.Bytes(), []byte("Panicking in DoChan")) { - t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in DoChan") - } -} - -func TestPanicDoSharedByDoChan(t *testing.T) { - if os.Getenv("TEST_PANIC_DOCHAN") != "" { - blocked := make(chan struct{}) - unblock := make(chan struct{}) - - g := new(Group) - go func() { - defer func() { - recover() - }() - g.Do("", func() (interface{}, error) { - close(blocked) - <-unblock - panic("Panicking in Do") - }) - }() - - <-blocked - ch := g.DoChan("", func() (interface{}, error) { - panic("DoChan unexpectedly executed callback") - }) - close(unblock) - <-ch - t.Fatalf("DoChan unexpectedly returned") - } - - t.Parallel() - - cmd := exec.Command(executable(t), "-test.run="+t.Name(), "-test.v") - cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1") - out := new(bytes.Buffer) - cmd.Stdout = out - cmd.Stderr = out - if err := cmd.Start(); err != nil { - t.Fatal(err) - } - - err := cmd.Wait() - t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out) - if err == nil { - t.Errorf("Test subprocess passed; want a crash due to panic in Do shared by DoChan") - } - if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) { - t.Errorf("Test subprocess failed with an unexpected failure mode.") - } - if !bytes.Contains(out.Bytes(), []byte("Panicking in Do")) { - t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in Do") - } -} - -func ExampleGroup() { - g := new(Group) - - block := make(chan struct{}) - res1c := g.DoChan("key", func() (interface{}, error) { - <-block - return "func 1", nil - }) - res2c := g.DoChan("key", func() (interface{}, error) { - <-block - return "func 2", nil - }) - close(block) - - res1 := <-res1c - res2 := <-res2c - - // Results are shared by functions executed with duplicate keys. - fmt.Println("Shared:", res2.Shared) - // Only the first function is executed: it is registered and started with "key", - // and doesn't complete before the second function is registered with a duplicate key. - fmt.Println("Equal results:", res1.Val.(string) == res2.Val.(string)) - fmt.Println("Result:", res1.Val) - - // Output: - // Shared: true - // Equal results: true - // Result: func 1 -} diff --git a/syncmap/map.go b/syncmap/map.go deleted file mode 100644 index c9a07f3..0000000 --- a/syncmap/map.go +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright 2019 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package syncmap provides a concurrent map implementation. -// This was the prototype for sync.Map which was added to the standard library's -// sync package in Go 1.9. https://golang.org/pkg/sync/#Map. -package syncmap - -import "sync" // home to the standard library's sync.map implementation as of Go 1.9 - -// Map is a concurrent map with amortized-constant-time loads, stores, and deletes. -// It is safe for multiple goroutines to call a Map's methods concurrently. -// -// The zero Map is valid and empty. -// -// A Map must not be copied after first use. -type Map = sync.Map diff --git a/syncmap/map_bench_test.go b/syncmap/map_bench_test.go deleted file mode 100644 index b279b4f..0000000 --- a/syncmap/map_bench_test.go +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright 2016 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package syncmap_test - -import ( - "fmt" - "reflect" - "sync/atomic" - "testing" - - "golang.org/x/sync/syncmap" -) - -type bench struct { - setup func(*testing.B, mapInterface) - perG func(b *testing.B, pb *testing.PB, i int, m mapInterface) -} - -func benchMap(b *testing.B, bench bench) { - for _, m := range [...]mapInterface{&DeepCopyMap{}, &RWMutexMap{}, &syncmap.Map{}} { - b.Run(fmt.Sprintf("%T", m), func(b *testing.B) { - m = reflect.New(reflect.TypeOf(m).Elem()).Interface().(mapInterface) - if bench.setup != nil { - bench.setup(b, m) - } - - b.ResetTimer() - - var i int64 - b.RunParallel(func(pb *testing.PB) { - id := int(atomic.AddInt64(&i, 1) - 1) - bench.perG(b, pb, id*b.N, m) - }) - }) - } -} - -func BenchmarkLoadMostlyHits(b *testing.B) { - const hits, misses = 1023, 1 - - benchMap(b, bench{ - setup: func(_ *testing.B, m mapInterface) { - for i := 0; i < hits; i++ { - m.LoadOrStore(i, i) - } - // Prime the map to get it into a steady state. - for i := 0; i < hits*2; i++ { - m.Load(i % hits) - } - }, - - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - for ; pb.Next(); i++ { - m.Load(i % (hits + misses)) - } - }, - }) -} - -func BenchmarkLoadMostlyMisses(b *testing.B) { - const hits, misses = 1, 1023 - - benchMap(b, bench{ - setup: func(_ *testing.B, m mapInterface) { - for i := 0; i < hits; i++ { - m.LoadOrStore(i, i) - } - // Prime the map to get it into a steady state. - for i := 0; i < hits*2; i++ { - m.Load(i % hits) - } - }, - - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - for ; pb.Next(); i++ { - m.Load(i % (hits + misses)) - } - }, - }) -} - -func BenchmarkLoadOrStoreBalanced(b *testing.B) { - const hits, misses = 128, 128 - - benchMap(b, bench{ - setup: func(b *testing.B, m mapInterface) { - if _, ok := m.(*DeepCopyMap); ok { - b.Skip("DeepCopyMap has quadratic running time.") - } - for i := 0; i < hits; i++ { - m.LoadOrStore(i, i) - } - // Prime the map to get it into a steady state. - for i := 0; i < hits*2; i++ { - m.Load(i % hits) - } - }, - - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - for ; pb.Next(); i++ { - j := i % (hits + misses) - if j < hits { - if _, ok := m.LoadOrStore(j, i); !ok { - b.Fatalf("unexpected miss for %v", j) - } - } else { - if v, loaded := m.LoadOrStore(i, i); loaded { - b.Fatalf("failed to store %v: existing value %v", i, v) - } - } - } - }, - }) -} - -func BenchmarkLoadOrStoreUnique(b *testing.B) { - benchMap(b, bench{ - setup: func(b *testing.B, m mapInterface) { - if _, ok := m.(*DeepCopyMap); ok { - b.Skip("DeepCopyMap has quadratic running time.") - } - }, - - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - for ; pb.Next(); i++ { - m.LoadOrStore(i, i) - } - }, - }) -} - -func BenchmarkLoadOrStoreCollision(b *testing.B) { - benchMap(b, bench{ - setup: func(_ *testing.B, m mapInterface) { - m.LoadOrStore(0, 0) - }, - - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - for ; pb.Next(); i++ { - m.LoadOrStore(0, 0) - } - }, - }) -} - -func BenchmarkRange(b *testing.B) { - const mapSize = 1 << 10 - - benchMap(b, bench{ - setup: func(_ *testing.B, m mapInterface) { - for i := 0; i < mapSize; i++ { - m.Store(i, i) - } - }, - - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - for ; pb.Next(); i++ { - m.Range(func(_, _ interface{}) bool { return true }) - } - }, - }) -} - -// BenchmarkAdversarialAlloc tests performance when we store a new value -// immediately whenever the map is promoted to clean and otherwise load a -// unique, missing key. -// -// This forces the Load calls to always acquire the map's mutex. -func BenchmarkAdversarialAlloc(b *testing.B) { - benchMap(b, bench{ - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - var stores, loadsSinceStore int64 - for ; pb.Next(); i++ { - m.Load(i) - if loadsSinceStore++; loadsSinceStore > stores { - m.LoadOrStore(i, stores) - loadsSinceStore = 0 - stores++ - } - } - }, - }) -} - -// BenchmarkAdversarialDelete tests performance when we periodically delete -// one key and add a different one in a large map. -// -// This forces the Load calls to always acquire the map's mutex and periodically -// makes a full copy of the map despite changing only one entry. -func BenchmarkAdversarialDelete(b *testing.B) { - const mapSize = 1 << 10 - - benchMap(b, bench{ - setup: func(_ *testing.B, m mapInterface) { - for i := 0; i < mapSize; i++ { - m.Store(i, i) - } - }, - - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - for ; pb.Next(); i++ { - m.Load(i) - - if i%mapSize == 0 { - m.Range(func(k, _ interface{}) bool { - m.Delete(k) - return false - }) - m.Store(i, i) - } - } - }, - }) -} diff --git a/syncmap/map_reference_test.go b/syncmap/map_reference_test.go deleted file mode 100644 index 923c51b..0000000 --- a/syncmap/map_reference_test.go +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright 2016 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package syncmap_test - -import ( - "sync" - "sync/atomic" -) - -// This file contains reference map implementations for unit-tests. - -// mapInterface is the interface Map implements. -type mapInterface interface { - Load(interface{}) (interface{}, bool) - Store(key, value interface{}) - LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) - Delete(interface{}) - Range(func(key, value interface{}) (shouldContinue bool)) -} - -// RWMutexMap is an implementation of mapInterface using a sync.RWMutex. -type RWMutexMap struct { - mu sync.RWMutex - dirty map[interface{}]interface{} -} - -func (m *RWMutexMap) Load(key interface{}) (value interface{}, ok bool) { - m.mu.RLock() - value, ok = m.dirty[key] - m.mu.RUnlock() - return -} - -func (m *RWMutexMap) Store(key, value interface{}) { - m.mu.Lock() - if m.dirty == nil { - m.dirty = make(map[interface{}]interface{}) - } - m.dirty[key] = value - m.mu.Unlock() -} - -func (m *RWMutexMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { - m.mu.Lock() - actual, loaded = m.dirty[key] - if !loaded { - actual = value - if m.dirty == nil { - m.dirty = make(map[interface{}]interface{}) - } - m.dirty[key] = value - } - m.mu.Unlock() - return actual, loaded -} - -func (m *RWMutexMap) Delete(key interface{}) { - m.mu.Lock() - delete(m.dirty, key) - m.mu.Unlock() -} - -func (m *RWMutexMap) Range(f func(key, value interface{}) (shouldContinue bool)) { - m.mu.RLock() - keys := make([]interface{}, 0, len(m.dirty)) - for k := range m.dirty { - keys = append(keys, k) - } - m.mu.RUnlock() - - for _, k := range keys { - v, ok := m.Load(k) - if !ok { - continue - } - if !f(k, v) { - break - } - } -} - -// DeepCopyMap is an implementation of mapInterface using a Mutex and -// atomic.Value. It makes deep copies of the map on every write to avoid -// acquiring the Mutex in Load. -type DeepCopyMap struct { - mu sync.Mutex - clean atomic.Value -} - -func (m *DeepCopyMap) Load(key interface{}) (value interface{}, ok bool) { - clean, _ := m.clean.Load().(map[interface{}]interface{}) - value, ok = clean[key] - return value, ok -} - -func (m *DeepCopyMap) Store(key, value interface{}) { - m.mu.Lock() - dirty := m.dirty() - dirty[key] = value - m.clean.Store(dirty) - m.mu.Unlock() -} - -func (m *DeepCopyMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { - clean, _ := m.clean.Load().(map[interface{}]interface{}) - actual, loaded = clean[key] - if loaded { - return actual, loaded - } - - m.mu.Lock() - // Reload clean in case it changed while we were waiting on m.mu. - clean, _ = m.clean.Load().(map[interface{}]interface{}) - actual, loaded = clean[key] - if !loaded { - dirty := m.dirty() - dirty[key] = value - actual = value - m.clean.Store(dirty) - } - m.mu.Unlock() - return actual, loaded -} - -func (m *DeepCopyMap) Delete(key interface{}) { - m.mu.Lock() - dirty := m.dirty() - delete(dirty, key) - m.clean.Store(dirty) - m.mu.Unlock() -} - -func (m *DeepCopyMap) Range(f func(key, value interface{}) (shouldContinue bool)) { - clean, _ := m.clean.Load().(map[interface{}]interface{}) - for k, v := range clean { - if !f(k, v) { - break - } - } -} - -func (m *DeepCopyMap) dirty() map[interface{}]interface{} { - clean, _ := m.clean.Load().(map[interface{}]interface{}) - dirty := make(map[interface{}]interface{}, len(clean)+1) - for k, v := range clean { - dirty[k] = v - } - return dirty -} diff --git a/syncmap/map_test.go b/syncmap/map_test.go deleted file mode 100644 index bf69f50..0000000 --- a/syncmap/map_test.go +++ /dev/null @@ -1,172 +0,0 @@ -// Copyright 2016 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package syncmap_test - -import ( - "math/rand" - "reflect" - "runtime" - "sync" - "testing" - "testing/quick" - - "golang.org/x/sync/syncmap" -) - -type mapOp string - -const ( - opLoad = mapOp("Load") - opStore = mapOp("Store") - opLoadOrStore = mapOp("LoadOrStore") - opDelete = mapOp("Delete") -) - -var mapOps = [...]mapOp{opLoad, opStore, opLoadOrStore, opDelete} - -// mapCall is a quick.Generator for calls on mapInterface. -type mapCall struct { - op mapOp - k, v interface{} -} - -func (c mapCall) apply(m mapInterface) (interface{}, bool) { - switch c.op { - case opLoad: - return m.Load(c.k) - case opStore: - m.Store(c.k, c.v) - return nil, false - case opLoadOrStore: - return m.LoadOrStore(c.k, c.v) - case opDelete: - m.Delete(c.k) - return nil, false - default: - panic("invalid mapOp") - } -} - -type mapResult struct { - value interface{} - ok bool -} - -func randValue(r *rand.Rand) interface{} { - b := make([]byte, r.Intn(4)) - for i := range b { - b[i] = 'a' + byte(rand.Intn(26)) - } - return string(b) -} - -func (mapCall) Generate(r *rand.Rand, size int) reflect.Value { - c := mapCall{op: mapOps[rand.Intn(len(mapOps))], k: randValue(r)} - switch c.op { - case opStore, opLoadOrStore: - c.v = randValue(r) - } - return reflect.ValueOf(c) -} - -func applyCalls(m mapInterface, calls []mapCall) (results []mapResult, final map[interface{}]interface{}) { - for _, c := range calls { - v, ok := c.apply(m) - results = append(results, mapResult{v, ok}) - } - - final = make(map[interface{}]interface{}) - m.Range(func(k, v interface{}) bool { - final[k] = v - return true - }) - - return results, final -} - -func applyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { - return applyCalls(new(syncmap.Map), calls) -} - -func applyRWMutexMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { - return applyCalls(new(RWMutexMap), calls) -} - -func applyDeepCopyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { - return applyCalls(new(DeepCopyMap), calls) -} - -func TestMapMatchesRWMutex(t *testing.T) { - if err := quick.CheckEqual(applyMap, applyRWMutexMap, nil); err != nil { - t.Error(err) - } -} - -func TestMapMatchesDeepCopy(t *testing.T) { - if err := quick.CheckEqual(applyMap, applyDeepCopyMap, nil); err != nil { - t.Error(err) - } -} - -func TestConcurrentRange(t *testing.T) { - const mapSize = 1 << 10 - - m := new(syncmap.Map) - for n := int64(1); n <= mapSize; n++ { - m.Store(n, n) - } - - done := make(chan struct{}) - var wg sync.WaitGroup - defer func() { - close(done) - wg.Wait() - }() - for g := int64(runtime.GOMAXPROCS(0)); g > 0; g-- { - r := rand.New(rand.NewSource(g)) - wg.Add(1) - go func(g int64) { - defer wg.Done() - for i := int64(0); ; i++ { - select { - case <-done: - return - default: - } - for n := int64(1); n < mapSize; n++ { - if r.Int63n(mapSize) == 0 { - m.Store(n, n*i*g) - } else { - m.Load(n) - } - } - } - }(g) - } - - iters := 1 << 10 - if testing.Short() { - iters = 16 - } - for n := iters; n > 0; n-- { - seen := make(map[int64]bool, mapSize) - - m.Range(func(ki, vi interface{}) bool { - k, v := ki.(int64), vi.(int64) - if v%k != 0 { - t.Fatalf("while Storing multiples of %v, Range saw value %v", k, v) - } - if seen[k] { - t.Fatalf("Range visited key %v twice", k) - } - seen[k] = true - return true - }) - - if len(seen) != mapSize { - t.Fatalf("Range visited %v elements of %v-element Map", len(seen), mapSize) - } - } -}