From 960bf1fb13c616a491486fc6322e26fe0830eaf0 Mon Sep 17 00:00:00 2001 From: Gopher Robot Date: Fri, 14 Feb 2025 21:16:47 +0000 Subject: [PATCH 01/10] all: upgrade go directive to at least 1.23.0 [generated] By now Go 1.24.0 has been released, and Go 1.22 is no longer supported per the Go Release Policy (https://go.dev/doc/devel/release#policy). For golang/go#69095. [git-generate] (cd . && go get go@1.23.0 && go mod tidy && go fix ./... && go mod edit -toolchain=none) Change-Id: I7e3b1e073a0a64e82e2b9e49387d5da0afd9d58b Reviewed-on: https://go-review.googlesource.com/c/sync/+/649835 Reviewed-by: Dmitri Shuralyov Reviewed-by: Cherry Mui LUCI-TryBot-Result: Go LUCI Auto-Submit: Gopher Robot --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 74bd0ac..d30ac75 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module golang.org/x/sync -go 1.18 +go 1.23.0 From b637f27e40ad6e222f3d301cc113e635f0fa08be Mon Sep 17 00:00:00 2001 From: Ian Lance Taylor Date: Tue, 4 Mar 2025 14:55:46 -0800 Subject: [PATCH 02/10] errgroup: drop support for Go versions before 1.20 Change-Id: I7de5dfae21c4ffe31d6c16e3df0fed3e2269cb16 Reviewed-on: https://go-review.googlesource.com/c/sync/+/654421 LUCI-TryBot-Result: Go LUCI Reviewed-by: Ian Lance Taylor Auto-Submit: Ian Lance Taylor Commit-Queue: Ian Lance Taylor Reviewed-by: Dmitri Shuralyov Reviewed-by: Dmitri Shuralyov --- errgroup/errgroup.go | 2 +- errgroup/errgroup_test.go | 39 ++++++++++++++++++++++++++++ errgroup/go120.go | 13 ---------- errgroup/go120_test.go | 54 --------------------------------------- errgroup/pre_go120.go | 14 ---------- 5 files changed, 40 insertions(+), 82 deletions(-) delete mode 100644 errgroup/go120.go delete mode 100644 errgroup/go120_test.go delete mode 100644 errgroup/pre_go120.go diff --git a/errgroup/errgroup.go b/errgroup/errgroup.go index b832259..a4ea5d1 100644 --- a/errgroup/errgroup.go +++ b/errgroup/errgroup.go @@ -46,7 +46,7 @@ func (g *Group) done() { // returns a non-nil error or the first time Wait returns, whichever occurs // first. func WithContext(ctx context.Context) (*Group, context.Context) { - ctx, cancel := withCancelCause(ctx) + ctx, cancel := context.WithCancelCause(ctx) return &Group{cancel: cancel}, ctx } diff --git a/errgroup/errgroup_test.go b/errgroup/errgroup_test.go index 0358842..2a491bf 100644 --- a/errgroup/errgroup_test.go +++ b/errgroup/errgroup_test.go @@ -250,6 +250,45 @@ func TestGoLimit(t *testing.T) { } } +func TestCancelCause(t *testing.T) { + errDoom := errors.New("group_test: doomed") + + cases := []struct { + errs []error + want error + }{ + {want: nil}, + {errs: []error{nil}, want: nil}, + {errs: []error{errDoom}, want: errDoom}, + {errs: []error{errDoom, nil}, want: errDoom}, + } + + for _, tc := range cases { + g, ctx := errgroup.WithContext(context.Background()) + + for _, err := range tc.errs { + err := err + g.TryGo(func() error { return err }) + } + + if err := g.Wait(); err != tc.want { + t.Errorf("after %T.TryGo(func() error { return err }) for err in %v\n"+ + "g.Wait() = %v; want %v", + g, tc.errs, err, tc.want) + } + + if tc.want == nil { + tc.want = context.Canceled + } + + if err := context.Cause(ctx); err != tc.want { + t.Errorf("after %T.TryGo(func() error { return err }) for err in %v\n"+ + "context.Cause(ctx) = %v; tc.want %v", + g, tc.errs, err, tc.want) + } + } +} + func BenchmarkGo(b *testing.B) { fn := func() {} g := &errgroup.Group{} diff --git a/errgroup/go120.go b/errgroup/go120.go deleted file mode 100644 index f93c740..0000000 --- a/errgroup/go120.go +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2023 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -//go:build go1.20 - -package errgroup - -import "context" - -func withCancelCause(parent context.Context) (context.Context, func(error)) { - return context.WithCancelCause(parent) -} diff --git a/errgroup/go120_test.go b/errgroup/go120_test.go deleted file mode 100644 index 068f104..0000000 --- a/errgroup/go120_test.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2023 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -//go:build go1.20 - -package errgroup_test - -import ( - "context" - "errors" - "testing" - - "golang.org/x/sync/errgroup" -) - -func TestCancelCause(t *testing.T) { - errDoom := errors.New("group_test: doomed") - - cases := []struct { - errs []error - want error - }{ - {want: nil}, - {errs: []error{nil}, want: nil}, - {errs: []error{errDoom}, want: errDoom}, - {errs: []error{errDoom, nil}, want: errDoom}, - } - - for _, tc := range cases { - g, ctx := errgroup.WithContext(context.Background()) - - for _, err := range tc.errs { - err := err - g.TryGo(func() error { return err }) - } - - if err := g.Wait(); err != tc.want { - t.Errorf("after %T.TryGo(func() error { return err }) for err in %v\n"+ - "g.Wait() = %v; want %v", - g, tc.errs, err, tc.want) - } - - if tc.want == nil { - tc.want = context.Canceled - } - - if err := context.Cause(ctx); err != tc.want { - t.Errorf("after %T.TryGo(func() error { return err }) for err in %v\n"+ - "context.Cause(ctx) = %v; tc.want %v", - g, tc.errs, err, tc.want) - } - } -} diff --git a/errgroup/pre_go120.go b/errgroup/pre_go120.go deleted file mode 100644 index 88ce334..0000000 --- a/errgroup/pre_go120.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright 2023 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -//go:build !go1.20 - -package errgroup - -import "context" - -func withCancelCause(parent context.Context) (context.Context, func(error)) { - ctx, cancel := context.WithCancel(parent) - return ctx, func(error) { cancel() } -} From 396f3a06ea2a49eb410f12e244c0dd77095d0de9 Mon Sep 17 00:00:00 2001 From: Sean Liao Date: Fri, 21 Mar 2025 19:51:13 +0000 Subject: [PATCH 03/10] errgroup: document calling Go before Wait Fixes golang/go#70284 Change-Id: I31d60ea182226c032b0ffbddcbb5b53675dfd5e6 Reviewed-on: https://go-review.googlesource.com/c/sync/+/660075 LUCI-TryBot-Result: Go LUCI Reviewed-by: Alan Donovan Auto-Submit: Damien Neil Reviewed-by: Damien Neil --- errgroup/errgroup.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/errgroup/errgroup.go b/errgroup/errgroup.go index a4ea5d1..f8c3c09 100644 --- a/errgroup/errgroup.go +++ b/errgroup/errgroup.go @@ -18,7 +18,7 @@ import ( type token struct{} // A Group is a collection of goroutines working on subtasks that are part of -// the same overall task. +// the same overall task. A Group should not be reused for different tasks. // // A zero Group is valid, has no limit on the number of active goroutines, // and does not cancel on error. @@ -61,6 +61,7 @@ func (g *Group) Wait() error { } // Go calls the given function in a new goroutine. +// The first call to Go must happen before a Wait. // It blocks until the new goroutine can be added without the number of // active goroutines in the group exceeding the configured limit. // From 506c70f97318aa991ec5a898685660c880c166ca Mon Sep 17 00:00:00 2001 From: qiulaidongfeng <2645477756@qq.com> Date: Mon, 27 Jan 2025 16:58:51 +0800 Subject: [PATCH 04/10] errgroup: propagate panic and Goexit through Wait Recovered panic values are wrapped and saved in Group. Goexits are detected by a sentinel value set after the given function returns normally. Wait propagates the first instance of a panic or Goexit. According to the runtime.Goexit after the code will not be executed, with a bool, if f not call runtime.Goexit, is true, determine whether to propagate runtime.Goexit. Fixes golang/go#53757 Change-Id: Ic6426fc014fd1c4368ebaceef5b0d6163770a099 Reviewed-on: https://go-review.googlesource.com/c/sync/+/644575 Reviewed-by: Sean Liao Auto-Submit: Alan Donovan Commit-Queue: Alan Donovan Reviewed-by: Alan Donovan Reviewed-by: Dmitri Shuralyov LUCI-TryBot-Result: Go LUCI --- errgroup/errgroup.go | 107 +++++++++++++++++++++++++++++++------- errgroup/errgroup_test.go | 64 +++++++++++++++++++++++ 2 files changed, 153 insertions(+), 18 deletions(-) diff --git a/errgroup/errgroup.go b/errgroup/errgroup.go index f8c3c09..cfafed5 100644 --- a/errgroup/errgroup.go +++ b/errgroup/errgroup.go @@ -12,6 +12,8 @@ package errgroup import ( "context" "fmt" + "runtime" + "runtime/debug" "sync" ) @@ -31,6 +33,10 @@ type Group struct { errOnce sync.Once err error + + mu sync.Mutex + panicValue any // = PanicError | PanicValue; non-nil if some Group.Go goroutine panicked. + abnormal bool // some Group.Go goroutine terminated abnormally (panic or goexit). } func (g *Group) done() { @@ -50,13 +56,22 @@ func WithContext(ctx context.Context) (*Group, context.Context) { return &Group{cancel: cancel}, ctx } -// Wait blocks until all function calls from the Go method have returned, then -// returns the first non-nil error (if any) from them. +// Wait blocks until all function calls from the Go method have returned +// normally, then returns the first non-nil error (if any) from them. +// +// If any of the calls panics, Wait panics with a [PanicValue]; +// and if any of them calls [runtime.Goexit], Wait calls runtime.Goexit. func (g *Group) Wait() error { g.wg.Wait() if g.cancel != nil { g.cancel(g.err) } + if g.panicValue != nil { + panic(g.panicValue) + } + if g.abnormal { + runtime.Goexit() + } return g.err } @@ -65,18 +80,56 @@ func (g *Group) Wait() error { // It blocks until the new goroutine can be added without the number of // active goroutines in the group exceeding the configured limit. // -// The first call to return a non-nil error cancels the group's context, if the -// group was created by calling WithContext. The error will be returned by Wait. +// It blocks until the new goroutine can be added without the number of +// goroutines in the group exceeding the configured limit. +// +// The first goroutine in the group that returns a non-nil error, panics, or +// invokes [runtime.Goexit] will cancel the associated Context, if any. func (g *Group) Go(f func() error) { if g.sem != nil { g.sem <- token{} } + g.add(f) +} + +func (g *Group) add(f func() error) { g.wg.Add(1) go func() { defer g.done() + normalReturn := false + defer func() { + if normalReturn { + return + } + v := recover() + g.mu.Lock() + defer g.mu.Unlock() + if !g.abnormal { + if g.cancel != nil { + g.cancel(g.err) + } + g.abnormal = true + } + if v != nil && g.panicValue == nil { + switch v := v.(type) { + case error: + g.panicValue = PanicError{ + Recovered: v, + Stack: debug.Stack(), + } + default: + g.panicValue = PanicValue{ + Recovered: v, + Stack: debug.Stack(), + } + } + } + }() - if err := f(); err != nil { + err := f() + normalReturn = true + if err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { @@ -101,19 +154,7 @@ func (g *Group) TryGo(f func() error) bool { } } - g.wg.Add(1) - go func() { - defer g.done() - - if err := f(); err != nil { - g.errOnce.Do(func() { - g.err = err - if g.cancel != nil { - g.cancel(g.err) - } - }) - } - }() + g.add(f) return true } @@ -135,3 +176,33 @@ func (g *Group) SetLimit(n int) { } g.sem = make(chan token, n) } + +// PanicError wraps an error recovered from an unhandled panic +// when calling a function passed to Go or TryGo. +type PanicError struct { + Recovered error + Stack []byte // result of call to [debug.Stack] +} + +func (p PanicError) Error() string { + // A Go Error method conventionally does not include a stack dump, so omit it + // here. (Callers who care can extract it from the Stack field.) + return fmt.Sprintf("recovered from errgroup.Group: %v", p.Recovered) +} + +func (p PanicError) Unwrap() error { return p.Recovered } + +// PanicValue wraps a value that does not implement the error interface, +// recovered from an unhandled panic when calling a function passed to Go or +// TryGo. +type PanicValue struct { + Recovered any + Stack []byte // result of call to [debug.Stack] +} + +func (p PanicValue) String() string { + if len(p.Stack) > 0 { + return fmt.Sprintf("recovered from errgroup.Group: %v\n%s", p.Recovered, p.Stack) + } + return fmt.Sprintf("recovered from errgroup.Group: %v", p.Recovered) +} diff --git a/errgroup/errgroup_test.go b/errgroup/errgroup_test.go index 2a491bf..4684259 100644 --- a/errgroup/errgroup_test.go +++ b/errgroup/errgroup_test.go @@ -10,6 +10,7 @@ import ( "fmt" "net/http" "os" + "strings" "sync/atomic" "testing" "time" @@ -289,6 +290,69 @@ func TestCancelCause(t *testing.T) { } } +func TestPanic(t *testing.T) { + t.Run("error", func(t *testing.T) { + g := &errgroup.Group{} + p := errors.New("") + g.Go(func() error { + panic(p) + }) + defer func() { + err := recover() + if err == nil { + t.Fatalf("should propagate panic through Wait") + } + pe, ok := err.(errgroup.PanicError) + if !ok { + t.Fatalf("type should is errgroup.PanicError, but is %T", err) + } + if pe.Recovered != p { + t.Fatalf("got %v, want %v", pe.Recovered, p) + } + if !strings.Contains(string(pe.Stack), "TestPanic.func") { + t.Log(string(pe.Stack)) + t.Fatalf("stack trace incomplete") + } + }() + g.Wait() + }) + t.Run("any", func(t *testing.T) { + g := &errgroup.Group{} + g.Go(func() error { + panic(1) + }) + defer func() { + err := recover() + if err == nil { + t.Fatalf("should propagate panic through Wait") + } + pe, ok := err.(errgroup.PanicValue) + if !ok { + t.Fatalf("type should is errgroup.PanicValue, but is %T", err) + } + if pe.Recovered != 1 { + t.Fatalf("got %v, want %v", pe.Recovered, 1) + } + if !strings.Contains(string(pe.Stack), "TestPanic.func") { + t.Log(string(pe.Stack)) + t.Fatalf("stack trace incomplete") + } + }() + g.Wait() + }) +} + +func TestGoexit(t *testing.T) { + g := &errgroup.Group{} + g.Go(func() error { + t.Skip() + t.Fatalf("Goexit fail") + return nil + }) + g.Wait() + t.Fatalf("should call runtime.Goexit from Wait") +} + func BenchmarkGo(b *testing.B) { fn := func() {} g := &errgroup.Group{} From d1ac909e84c04f4326f620436b3894b3f5de0bd4 Mon Sep 17 00:00:00 2001 From: qiulaidongfeng <2645477756@qq.com> Date: Wed, 14 May 2025 21:43:26 +0800 Subject: [PATCH 05/10] sync/errgroup: PanicError.Error print stack trace Because it is useful to print the stack when a nil pointer dereference occurs. Fixes golang/go#73710 Change-Id: I106ea0bdd70c2a293f5ea889edef9b5ba9db2fbd Reviewed-on: https://go-review.googlesource.com/c/sync/+/672635 Reviewed-by: Damien Neil Auto-Submit: Damien Neil Auto-Submit: Alan Donovan Reviewed-by: Alan Donovan TryBot-Bypass: Damien Neil --- errgroup/errgroup.go | 5 +++-- errgroup/errgroup_test.go | 6 +++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/errgroup/errgroup.go b/errgroup/errgroup.go index cfafed5..a6b6ad2 100644 --- a/errgroup/errgroup.go +++ b/errgroup/errgroup.go @@ -185,8 +185,9 @@ type PanicError struct { } func (p PanicError) Error() string { - // A Go Error method conventionally does not include a stack dump, so omit it - // here. (Callers who care can extract it from the Stack field.) + if len(p.Stack) > 0 { + return fmt.Sprintf("recovered from errgroup.Group: %v\n%s", p.Recovered, p.Stack) + } return fmt.Sprintf("recovered from errgroup.Group: %v", p.Recovered) } diff --git a/errgroup/errgroup_test.go b/errgroup/errgroup_test.go index 4684259..2165bb7 100644 --- a/errgroup/errgroup_test.go +++ b/errgroup/errgroup_test.go @@ -309,9 +309,9 @@ func TestPanic(t *testing.T) { if pe.Recovered != p { t.Fatalf("got %v, want %v", pe.Recovered, p) } - if !strings.Contains(string(pe.Stack), "TestPanic.func") { - t.Log(string(pe.Stack)) - t.Fatalf("stack trace incomplete") + if !strings.Contains(pe.Error(), "TestPanic.func") { + t.Log(pe.Error()) + t.Fatalf("stack trace incomplete, does not contain TestPanic.func") } }() g.Wait() From 1869c690bf11da5dd230e188d03a612a4a3f8ba6 Mon Sep 17 00:00:00 2001 From: Iliya Lyan <68940374+12ya@users.noreply.github.com> Date: Tue, 20 May 2025 23:05:19 +0000 Subject: [PATCH 06/10] all: replace deprecated ioutil Change-Id: I1beb9f5e759127a48c4e5ea0613a5a466886b7c5 GitHub-Last-Rev: 58038b6cdd6f289eb1e35e44a7c60ad150be3a6f GitHub-Pull-Request: golang/sync#28 Reviewed-on: https://go-review.googlesource.com/c/sync/+/674815 Reviewed-by: Alan Donovan LUCI-TryBot-Result: Go LUCI Reviewed-by: Sean Liao Auto-Submit: Alan Donovan Reviewed-by: Dmitri Shuralyov --- errgroup/errgroup_example_md5all_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/errgroup/errgroup_example_md5all_test.go b/errgroup/errgroup_example_md5all_test.go index 739b336..e2fc15b 100644 --- a/errgroup/errgroup_example_md5all_test.go +++ b/errgroup/errgroup_example_md5all_test.go @@ -8,7 +8,6 @@ import ( "context" "crypto/md5" "fmt" - "io/ioutil" "log" "os" "path/filepath" @@ -69,7 +68,7 @@ func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) for i := 0; i < numDigesters; i++ { g.Go(func() error { for path := range paths { - data, err := ioutil.ReadFile(path) + data, err := os.ReadFile(path) if err != nil { return err } From 8a14946fb031f4bf6096242b5e6ae6f7316d47d8 Mon Sep 17 00:00:00 2001 From: xieyuschen Date: Wed, 28 May 2025 18:52:56 +0800 Subject: [PATCH 07/10] errgroup: remove duplicated comment Change-Id: I5cdcc5034ccd87b939a406693e97485553ab60fa Reviewed-on: https://go-review.googlesource.com/c/sync/+/676715 Reviewed-by: Dmitri Shuralyov Reviewed-by: Alan Donovan Auto-Submit: Alan Donovan LUCI-TryBot-Result: Go LUCI --- errgroup/errgroup.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/errgroup/errgroup.go b/errgroup/errgroup.go index a6b6ad2..cb6bb9a 100644 --- a/errgroup/errgroup.go +++ b/errgroup/errgroup.go @@ -76,10 +76,8 @@ func (g *Group) Wait() error { } // Go calls the given function in a new goroutine. -// The first call to Go must happen before a Wait. -// It blocks until the new goroutine can be added without the number of -// active goroutines in the group exceeding the configured limit. // +// The first call to Go must happen before a Wait. // It blocks until the new goroutine can be added without the number of // goroutines in the group exceeding the configured limit. // From 7fad2c9213e0821bd78435a9c106806f2fc383f1 Mon Sep 17 00:00:00 2001 From: Alan Donovan Date: Fri, 20 Jun 2025 10:15:53 -0400 Subject: [PATCH 08/10] errgroup: revert propagation of panics This change reverts CL 644575, which caused panics in the f() call after group.Go(f) to be propagated to the subsequent group.Wait call. This caused more problems than it solved. Also: - preserve some of the doc comment wording of Group.Go. - leave a "tsunami stone" comment in Group.Go. Fixes golang/go#53757 Updates golang/go#74275 Updates golang/go#74304 Updates golang/go#74306 Change-Id: I6e3992510944db7d69c72eaf241aedf8b84e62dd Reviewed-on: https://go-review.googlesource.com/c/sync/+/682935 LUCI-TryBot-Result: Go LUCI Reviewed-by: qiu laidongfeng2 <2645477756@qq.com> Reviewed-by: Junyang Shao Reviewed-by: Sean Liao Auto-Submit: Sean Liao --- errgroup/errgroup.go | 118 ++++++++++---------------------------- errgroup/errgroup_test.go | 64 --------------------- 2 files changed, 31 insertions(+), 151 deletions(-) diff --git a/errgroup/errgroup.go b/errgroup/errgroup.go index cb6bb9a..1d8cffa 100644 --- a/errgroup/errgroup.go +++ b/errgroup/errgroup.go @@ -12,8 +12,6 @@ package errgroup import ( "context" "fmt" - "runtime" - "runtime/debug" "sync" ) @@ -33,10 +31,6 @@ type Group struct { errOnce sync.Once err error - - mu sync.Mutex - panicValue any // = PanicError | PanicValue; non-nil if some Group.Go goroutine panicked. - abnormal bool // some Group.Go goroutine terminated abnormally (panic or goexit). } func (g *Group) done() { @@ -56,22 +50,13 @@ func WithContext(ctx context.Context) (*Group, context.Context) { return &Group{cancel: cancel}, ctx } -// Wait blocks until all function calls from the Go method have returned -// normally, then returns the first non-nil error (if any) from them. -// -// If any of the calls panics, Wait panics with a [PanicValue]; -// and if any of them calls [runtime.Goexit], Wait calls runtime.Goexit. +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. func (g *Group) Wait() error { g.wg.Wait() if g.cancel != nil { g.cancel(g.err) } - if g.panicValue != nil { - panic(g.panicValue) - } - if g.abnormal { - runtime.Goexit() - } return g.err } @@ -81,53 +66,31 @@ func (g *Group) Wait() error { // It blocks until the new goroutine can be added without the number of // goroutines in the group exceeding the configured limit. // -// The first goroutine in the group that returns a non-nil error, panics, or -// invokes [runtime.Goexit] will cancel the associated Context, if any. +// The first goroutine in the group that returns a non-nil error will +// cancel the associated Context, if any. The error will be returned +// by Wait. func (g *Group) Go(f func() error) { if g.sem != nil { g.sem <- token{} } - g.add(f) -} - -func (g *Group) add(f func() error) { g.wg.Add(1) go func() { defer g.done() - normalReturn := false - defer func() { - if normalReturn { - return - } - v := recover() - g.mu.Lock() - defer g.mu.Unlock() - if !g.abnormal { - if g.cancel != nil { - g.cancel(g.err) - } - g.abnormal = true - } - if v != nil && g.panicValue == nil { - switch v := v.(type) { - case error: - g.panicValue = PanicError{ - Recovered: v, - Stack: debug.Stack(), - } - default: - g.panicValue = PanicValue{ - Recovered: v, - Stack: debug.Stack(), - } - } - } - }() - err := f() - normalReturn = true - if err != nil { + // It is tempting to propagate panics from f() + // up to the goroutine that calls Wait, but + // it creates more problems than it solves: + // - it delays panics arbitrarily, + // making bugs harder to detect; + // - it turns f's panic stack into a mere value, + // hiding it from crash-monitoring tools; + // - it risks deadlocks that hide the panic entirely, + // if f's panic leaves the program in a state + // that prevents the Wait call from being reached. + // See #53757, #74275, #74304, #74306. + + if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { @@ -152,7 +115,19 @@ func (g *Group) TryGo(f func() error) bool { } } - g.add(f) + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() return true } @@ -174,34 +149,3 @@ func (g *Group) SetLimit(n int) { } g.sem = make(chan token, n) } - -// PanicError wraps an error recovered from an unhandled panic -// when calling a function passed to Go or TryGo. -type PanicError struct { - Recovered error - Stack []byte // result of call to [debug.Stack] -} - -func (p PanicError) Error() string { - if len(p.Stack) > 0 { - return fmt.Sprintf("recovered from errgroup.Group: %v\n%s", p.Recovered, p.Stack) - } - return fmt.Sprintf("recovered from errgroup.Group: %v", p.Recovered) -} - -func (p PanicError) Unwrap() error { return p.Recovered } - -// PanicValue wraps a value that does not implement the error interface, -// recovered from an unhandled panic when calling a function passed to Go or -// TryGo. -type PanicValue struct { - Recovered any - Stack []byte // result of call to [debug.Stack] -} - -func (p PanicValue) String() string { - if len(p.Stack) > 0 { - return fmt.Sprintf("recovered from errgroup.Group: %v\n%s", p.Recovered, p.Stack) - } - return fmt.Sprintf("recovered from errgroup.Group: %v", p.Recovered) -} diff --git a/errgroup/errgroup_test.go b/errgroup/errgroup_test.go index 2165bb7..2a491bf 100644 --- a/errgroup/errgroup_test.go +++ b/errgroup/errgroup_test.go @@ -10,7 +10,6 @@ import ( "fmt" "net/http" "os" - "strings" "sync/atomic" "testing" "time" @@ -290,69 +289,6 @@ func TestCancelCause(t *testing.T) { } } -func TestPanic(t *testing.T) { - t.Run("error", func(t *testing.T) { - g := &errgroup.Group{} - p := errors.New("") - g.Go(func() error { - panic(p) - }) - defer func() { - err := recover() - if err == nil { - t.Fatalf("should propagate panic through Wait") - } - pe, ok := err.(errgroup.PanicError) - if !ok { - t.Fatalf("type should is errgroup.PanicError, but is %T", err) - } - if pe.Recovered != p { - t.Fatalf("got %v, want %v", pe.Recovered, p) - } - if !strings.Contains(pe.Error(), "TestPanic.func") { - t.Log(pe.Error()) - t.Fatalf("stack trace incomplete, does not contain TestPanic.func") - } - }() - g.Wait() - }) - t.Run("any", func(t *testing.T) { - g := &errgroup.Group{} - g.Go(func() error { - panic(1) - }) - defer func() { - err := recover() - if err == nil { - t.Fatalf("should propagate panic through Wait") - } - pe, ok := err.(errgroup.PanicValue) - if !ok { - t.Fatalf("type should is errgroup.PanicValue, but is %T", err) - } - if pe.Recovered != 1 { - t.Fatalf("got %v, want %v", pe.Recovered, 1) - } - if !strings.Contains(string(pe.Stack), "TestPanic.func") { - t.Log(string(pe.Stack)) - t.Fatalf("stack trace incomplete") - } - }() - g.Wait() - }) -} - -func TestGoexit(t *testing.T) { - g := &errgroup.Group{} - g.Go(func() error { - t.Skip() - t.Fatalf("Goexit fail") - return nil - }) - g.Wait() - t.Fatalf("should call runtime.Goexit from Wait") -} - func BenchmarkGo(b *testing.B) { fn := func() {} g := &errgroup.Group{} From 04914c200cb38d4ea960ee6a4c314a028c632991 Mon Sep 17 00:00:00 2001 From: Gopher Robot Date: Wed, 13 Aug 2025 14:21:38 +0000 Subject: [PATCH 09/10] all: upgrade go directive to at least 1.24.0 [generated] By now Go 1.25.0 has been released, and Go 1.23 is no longer supported per the Go Release Policy (see https://go.dev/doc/devel/release#policy). For golang/go#69095. [git-generate] (cd . && go get go@1.24.0 && go mod tidy && go fix ./... && go mod edit -toolchain=none) Change-Id: Ifa2b9ecc1efe475dfe4d60f41fb3ad2c63896d12 Reviewed-on: https://go-review.googlesource.com/c/sync/+/695358 Reviewed-by: Dmitri Shuralyov LUCI-TryBot-Result: Go LUCI Auto-Submit: Gopher Robot Reviewed-by: David Chase --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index d30ac75..23f559b 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module golang.org/x/sync -go 1.23.0 +go 1.24.0 From 86842326fb17681bc8b4ce3ac35920ecf49db7fc Mon Sep 17 00:00:00 2001 From: Dan Jones Date: Sun, 7 Sep 2025 19:32:45 -0500 Subject: [PATCH 10/10] =?UTF-8?q?=F0=9F=9A=9A=20Move=20errgroup=20to=20new?= =?UTF-8?q?=20package?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CONTRIBUTING.md | 26 -- PATENTS | 22 - README.md | 17 - codereview.cfg | 1 - errgroup/errgroup.go => errgroup.go | 0 ...test.go => errgroup_example_md5all_test.go | 2 +- errgroup/errgroup_test.go => errgroup_test.go | 2 +- go.mod | 4 +- semaphore/semaphore.go | 160 ------- semaphore/semaphore_bench_test.go | 129 ------ semaphore/semaphore_example_test.go | 84 ---- semaphore/semaphore_test.go | 237 ---------- singleflight/singleflight.go | 214 --------- singleflight/singleflight_test.go | 422 ------------------ syncmap/map.go | 18 - syncmap/map_bench_test.go | 216 --------- syncmap/map_reference_test.go | 151 ------- syncmap/map_test.go | 172 ------- 18 files changed, 4 insertions(+), 1873 deletions(-) delete mode 100644 CONTRIBUTING.md delete mode 100644 PATENTS delete mode 100644 README.md delete mode 100644 codereview.cfg rename errgroup/errgroup.go => errgroup.go (100%) rename errgroup/errgroup_example_md5all_test.go => errgroup_example_md5all_test.go (98%) rename errgroup/errgroup_test.go => errgroup_test.go (99%) delete mode 100644 semaphore/semaphore.go delete mode 100644 semaphore/semaphore_bench_test.go delete mode 100644 semaphore/semaphore_example_test.go delete mode 100644 semaphore/semaphore_test.go delete mode 100644 singleflight/singleflight.go delete mode 100644 singleflight/singleflight_test.go delete mode 100644 syncmap/map.go delete mode 100644 syncmap/map_bench_test.go delete mode 100644 syncmap/map_reference_test.go delete mode 100644 syncmap/map_test.go diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md deleted file mode 100644 index d0485e8..0000000 --- a/CONTRIBUTING.md +++ /dev/null @@ -1,26 +0,0 @@ -# Contributing to Go - -Go is an open source project. - -It is the work of hundreds of contributors. We appreciate your help! - -## Filing issues - -When [filing an issue](https://golang.org/issue/new), make sure to answer these five questions: - -1. What version of Go are you using (`go version`)? -2. What operating system and processor architecture are you using? -3. What did you do? -4. What did you expect to see? -5. What did you see instead? - -General questions should go to the [golang-nuts mailing list](https://groups.google.com/group/golang-nuts) instead of the issue tracker. -The gophers there will answer or ask you to file an issue if you've tripped over a bug. - -## Contributing code - -Please read the [Contribution Guidelines](https://golang.org/doc/contribute.html) -before sending patches. - -Unless otherwise noted, the Go source files are distributed under -the BSD-style license found in the LICENSE file. diff --git a/PATENTS b/PATENTS deleted file mode 100644 index 7330990..0000000 --- a/PATENTS +++ /dev/null @@ -1,22 +0,0 @@ -Additional IP Rights Grant (Patents) - -"This implementation" means the copyrightable works distributed by -Google as part of the Go project. - -Google hereby grants to You a perpetual, worldwide, non-exclusive, -no-charge, royalty-free, irrevocable (except as stated in this section) -patent license to make, have made, use, offer to sell, sell, import, -transfer and otherwise run, modify and propagate the contents of this -implementation of Go, where such license applies only to those patent -claims, both currently owned or controlled by Google and acquired in -the future, licensable by Google that are necessarily infringed by this -implementation of Go. This grant does not include claims that would be -infringed only as a consequence of further modification of this -implementation. If you or your agent or exclusive licensee institute or -order or agree to the institution of patent litigation against any -entity (including a cross-claim or counterclaim in a lawsuit) alleging -that this implementation of Go or any code incorporated within this -implementation of Go constitutes direct or contributory patent -infringement, or inducement of patent infringement, then any patent -rights granted to you under this License for this implementation of Go -shall terminate as of the date such litigation is filed. diff --git a/README.md b/README.md deleted file mode 100644 index 4cb3151..0000000 --- a/README.md +++ /dev/null @@ -1,17 +0,0 @@ -# Go Sync - -[![Go Reference](https://pkg.go.dev/badge/golang.org/x/sync.svg)](https://pkg.go.dev/golang.org/x/sync) - -This repository provides Go concurrency primitives in addition to the -ones provided by the language and "sync" and "sync/atomic" packages. - -## Report Issues / Send Patches - -This repository uses Gerrit for code changes. To learn how to submit changes to -this repository, see https://go.dev/doc/contribute. - -The git repository is https://go.googlesource.com/sync. - -The main issue tracker for the sync repository is located at -https://go.dev/issues. Prefix your issue with "x/sync:" in the -subject line, so it is easy to find. diff --git a/codereview.cfg b/codereview.cfg deleted file mode 100644 index 3f8b14b..0000000 --- a/codereview.cfg +++ /dev/null @@ -1 +0,0 @@ -issuerepo: golang/go diff --git a/errgroup/errgroup.go b/errgroup.go similarity index 100% rename from errgroup/errgroup.go rename to errgroup.go diff --git a/errgroup/errgroup_example_md5all_test.go b/errgroup_example_md5all_test.go similarity index 98% rename from errgroup/errgroup_example_md5all_test.go rename to errgroup_example_md5all_test.go index e2fc15b..454f72d 100644 --- a/errgroup/errgroup_example_md5all_test.go +++ b/errgroup_example_md5all_test.go @@ -12,7 +12,7 @@ import ( "os" "path/filepath" - "golang.org/x/sync/errgroup" + "codeberg.org/danjones000/errgroup" ) // Pipeline demonstrates the use of a Group to implement a multi-stage diff --git a/errgroup/errgroup_test.go b/errgroup_test.go similarity index 99% rename from errgroup/errgroup_test.go rename to errgroup_test.go index 2a491bf..05e81e6 100644 --- a/errgroup/errgroup_test.go +++ b/errgroup_test.go @@ -14,7 +14,7 @@ import ( "testing" "time" - "golang.org/x/sync/errgroup" + "codeberg.org/danjones000/errgroup" ) var ( diff --git a/go.mod b/go.mod index 23f559b..8bb13f8 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ -module golang.org/x/sync +module codeberg.org/danjones000/errgroup -go 1.24.0 +go 1.25.0 diff --git a/semaphore/semaphore.go b/semaphore/semaphore.go deleted file mode 100644 index b618162..0000000 --- a/semaphore/semaphore.go +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright 2017 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package semaphore provides a weighted semaphore implementation. -package semaphore // import "golang.org/x/sync/semaphore" - -import ( - "container/list" - "context" - "sync" -) - -type waiter struct { - n int64 - ready chan<- struct{} // Closed when semaphore acquired. -} - -// NewWeighted creates a new weighted semaphore with the given -// maximum combined weight for concurrent access. -func NewWeighted(n int64) *Weighted { - w := &Weighted{size: n} - return w -} - -// Weighted provides a way to bound concurrent access to a resource. -// The callers can request access with a given weight. -type Weighted struct { - size int64 - cur int64 - mu sync.Mutex - waiters list.List -} - -// Acquire acquires the semaphore with a weight of n, blocking until resources -// are available or ctx is done. On success, returns nil. On failure, returns -// ctx.Err() and leaves the semaphore unchanged. -func (s *Weighted) Acquire(ctx context.Context, n int64) error { - done := ctx.Done() - - s.mu.Lock() - select { - case <-done: - // ctx becoming done has "happened before" acquiring the semaphore, - // whether it became done before the call began or while we were - // waiting for the mutex. We prefer to fail even if we could acquire - // the mutex without blocking. - s.mu.Unlock() - return ctx.Err() - default: - } - if s.size-s.cur >= n && s.waiters.Len() == 0 { - // Since we hold s.mu and haven't synchronized since checking done, if - // ctx becomes done before we return here, it becoming done must have - // "happened concurrently" with this call - it cannot "happen before" - // we return in this branch. So, we're ok to always acquire here. - s.cur += n - s.mu.Unlock() - return nil - } - - if n > s.size { - // Don't make other Acquire calls block on one that's doomed to fail. - s.mu.Unlock() - <-done - return ctx.Err() - } - - ready := make(chan struct{}) - w := waiter{n: n, ready: ready} - elem := s.waiters.PushBack(w) - s.mu.Unlock() - - select { - case <-done: - s.mu.Lock() - select { - case <-ready: - // Acquired the semaphore after we were canceled. - // Pretend we didn't and put the tokens back. - s.cur -= n - s.notifyWaiters() - default: - isFront := s.waiters.Front() == elem - s.waiters.Remove(elem) - // If we're at the front and there're extra tokens left, notify other waiters. - if isFront && s.size > s.cur { - s.notifyWaiters() - } - } - s.mu.Unlock() - return ctx.Err() - - case <-ready: - // Acquired the semaphore. Check that ctx isn't already done. - // We check the done channel instead of calling ctx.Err because we - // already have the channel, and ctx.Err is O(n) with the nesting - // depth of ctx. - select { - case <-done: - s.Release(n) - return ctx.Err() - default: - } - return nil - } -} - -// TryAcquire acquires the semaphore with a weight of n without blocking. -// On success, returns true. On failure, returns false and leaves the semaphore unchanged. -func (s *Weighted) TryAcquire(n int64) bool { - s.mu.Lock() - success := s.size-s.cur >= n && s.waiters.Len() == 0 - if success { - s.cur += n - } - s.mu.Unlock() - return success -} - -// Release releases the semaphore with a weight of n. -func (s *Weighted) Release(n int64) { - s.mu.Lock() - s.cur -= n - if s.cur < 0 { - s.mu.Unlock() - panic("semaphore: released more than held") - } - s.notifyWaiters() - s.mu.Unlock() -} - -func (s *Weighted) notifyWaiters() { - for { - next := s.waiters.Front() - if next == nil { - break // No more waiters blocked. - } - - w := next.Value.(waiter) - if s.size-s.cur < w.n { - // Not enough tokens for the next waiter. We could keep going (to try to - // find a waiter with a smaller request), but under load that could cause - // starvation for large requests; instead, we leave all remaining waiters - // blocked. - // - // Consider a semaphore used as a read-write lock, with N tokens, N - // readers, and one writer. Each reader can Acquire(1) to obtain a read - // lock. The writer can Acquire(N) to obtain a write lock, excluding all - // of the readers. If we allow the readers to jump ahead in the queue, - // the writer will starve — there is always one token available for every - // reader. - break - } - - s.cur += w.n - s.waiters.Remove(next) - close(w.ready) - } -} diff --git a/semaphore/semaphore_bench_test.go b/semaphore/semaphore_bench_test.go deleted file mode 100644 index aa64258..0000000 --- a/semaphore/semaphore_bench_test.go +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright 2017 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package semaphore_test - -import ( - "context" - "fmt" - "testing" - - "golang.org/x/sync/semaphore" -) - -// weighted is an interface matching a subset of *Weighted. It allows -// alternate implementations for testing and benchmarking. -type weighted interface { - Acquire(context.Context, int64) error - TryAcquire(int64) bool - Release(int64) -} - -// semChan implements Weighted using a channel for -// comparing against the condition variable-based implementation. -type semChan chan struct{} - -func newSemChan(n int64) semChan { - return semChan(make(chan struct{}, n)) -} - -func (s semChan) Acquire(_ context.Context, n int64) error { - for i := int64(0); i < n; i++ { - s <- struct{}{} - } - return nil -} - -func (s semChan) TryAcquire(n int64) bool { - if int64(len(s))+n > int64(cap(s)) { - return false - } - - for i := int64(0); i < n; i++ { - s <- struct{}{} - } - return true -} - -func (s semChan) Release(n int64) { - for i := int64(0); i < n; i++ { - <-s - } -} - -// acquireN calls Acquire(size) on sem N times and then calls Release(size) N times. -func acquireN(b *testing.B, sem weighted, size int64, N int) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - for j := 0; j < N; j++ { - sem.Acquire(context.Background(), size) - } - for j := 0; j < N; j++ { - sem.Release(size) - } - } -} - -// tryAcquireN calls TryAcquire(size) on sem N times and then calls Release(size) N times. -func tryAcquireN(b *testing.B, sem weighted, size int64, N int) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - for j := 0; j < N; j++ { - if !sem.TryAcquire(size) { - b.Fatalf("TryAcquire(%v) = false, want true", size) - } - } - for j := 0; j < N; j++ { - sem.Release(size) - } - } -} - -func BenchmarkNewSeq(b *testing.B) { - for _, cap := range []int64{1, 128} { - b.Run(fmt.Sprintf("Weighted-%d", cap), func(b *testing.B) { - for i := 0; i < b.N; i++ { - _ = semaphore.NewWeighted(cap) - } - }) - b.Run(fmt.Sprintf("semChan-%d", cap), func(b *testing.B) { - for i := 0; i < b.N; i++ { - _ = newSemChan(cap) - } - }) - } -} - -func BenchmarkAcquireSeq(b *testing.B) { - for _, c := range []struct { - cap, size int64 - N int - }{ - {1, 1, 1}, - {2, 1, 1}, - {16, 1, 1}, - {128, 1, 1}, - {2, 2, 1}, - {16, 2, 8}, - {128, 2, 64}, - {2, 1, 2}, - {16, 8, 2}, - {128, 64, 2}, - } { - for _, w := range []struct { - name string - w weighted - }{ - {"Weighted", semaphore.NewWeighted(c.cap)}, - {"semChan", newSemChan(c.cap)}, - } { - b.Run(fmt.Sprintf("%s-acquire-%d-%d-%d", w.name, c.cap, c.size, c.N), func(b *testing.B) { - acquireN(b, w.w, c.size, c.N) - }) - b.Run(fmt.Sprintf("%s-tryAcquire-%d-%d-%d", w.name, c.cap, c.size, c.N), func(b *testing.B) { - tryAcquireN(b, w.w, c.size, c.N) - }) - } - } -} diff --git a/semaphore/semaphore_example_test.go b/semaphore/semaphore_example_test.go deleted file mode 100644 index e75cd79..0000000 --- a/semaphore/semaphore_example_test.go +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright 2017 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package semaphore_test - -import ( - "context" - "fmt" - "log" - "runtime" - - "golang.org/x/sync/semaphore" -) - -// Example_workerPool demonstrates how to use a semaphore to limit the number of -// goroutines working on parallel tasks. -// -// This use of a semaphore mimics a typical “worker pool” pattern, but without -// the need to explicitly shut down idle workers when the work is done. -func Example_workerPool() { - ctx := context.TODO() - - var ( - maxWorkers = runtime.GOMAXPROCS(0) - sem = semaphore.NewWeighted(int64(maxWorkers)) - out = make([]int, 32) - ) - - // Compute the output using up to maxWorkers goroutines at a time. - for i := range out { - // When maxWorkers goroutines are in flight, Acquire blocks until one of the - // workers finishes. - if err := sem.Acquire(ctx, 1); err != nil { - log.Printf("Failed to acquire semaphore: %v", err) - break - } - - go func(i int) { - defer sem.Release(1) - out[i] = collatzSteps(i + 1) - }(i) - } - - // Acquire all of the tokens to wait for any remaining workers to finish. - // - // If you are already waiting for the workers by some other means (such as an - // errgroup.Group), you can omit this final Acquire call. - if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil { - log.Printf("Failed to acquire semaphore: %v", err) - } - - fmt.Println(out) - - // Output: - // [0 1 7 2 5 8 16 3 19 6 14 9 9 17 17 4 12 20 20 7 7 15 15 10 23 10 111 18 18 18 106 5] -} - -// collatzSteps computes the number of steps to reach 1 under the Collatz -// conjecture. (See https://en.wikipedia.org/wiki/Collatz_conjecture.) -func collatzSteps(n int) (steps int) { - if n <= 0 { - panic("nonpositive input") - } - - for ; n > 1; steps++ { - if steps < 0 { - panic("too many steps") - } - - if n%2 == 0 { - n /= 2 - continue - } - - const maxInt = int(^uint(0) >> 1) - if n > (maxInt-1)/3 { - panic("overflow") - } - n = 3*n + 1 - } - - return steps -} diff --git a/semaphore/semaphore_test.go b/semaphore/semaphore_test.go deleted file mode 100644 index 61012d6..0000000 --- a/semaphore/semaphore_test.go +++ /dev/null @@ -1,237 +0,0 @@ -// Copyright 2017 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package semaphore_test - -import ( - "context" - "math/rand" - "runtime" - "sync" - "testing" - "time" - - "golang.org/x/sync/errgroup" - "golang.org/x/sync/semaphore" -) - -const maxSleep = 1 * time.Millisecond - -func HammerWeighted(sem *semaphore.Weighted, n int64, loops int) { - for i := 0; i < loops; i++ { - sem.Acquire(context.Background(), n) - time.Sleep(time.Duration(rand.Int63n(int64(maxSleep/time.Nanosecond))) * time.Nanosecond) - sem.Release(n) - } -} - -func TestWeighted(t *testing.T) { - t.Parallel() - - n := runtime.GOMAXPROCS(0) - loops := 10000 / n - sem := semaphore.NewWeighted(int64(n)) - var wg sync.WaitGroup - wg.Add(n) - for i := 0; i < n; i++ { - i := i - go func() { - defer wg.Done() - HammerWeighted(sem, int64(i), loops) - }() - } - wg.Wait() -} - -func TestWeightedPanic(t *testing.T) { - t.Parallel() - - defer func() { - if recover() == nil { - t.Fatal("release of an unacquired weighted semaphore did not panic") - } - }() - w := semaphore.NewWeighted(1) - w.Release(1) -} - -func TestWeightedTryAcquire(t *testing.T) { - t.Parallel() - - ctx := context.Background() - sem := semaphore.NewWeighted(2) - tries := []bool{} - sem.Acquire(ctx, 1) - tries = append(tries, sem.TryAcquire(1)) - tries = append(tries, sem.TryAcquire(1)) - - sem.Release(2) - - tries = append(tries, sem.TryAcquire(1)) - sem.Acquire(ctx, 1) - tries = append(tries, sem.TryAcquire(1)) - - want := []bool{true, false, true, false} - for i := range tries { - if tries[i] != want[i] { - t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i]) - } - } -} - -func TestWeightedAcquire(t *testing.T) { - t.Parallel() - - ctx := context.Background() - sem := semaphore.NewWeighted(2) - tryAcquire := func(n int64) bool { - ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) - defer cancel() - return sem.Acquire(ctx, n) == nil - } - - tries := []bool{} - sem.Acquire(ctx, 1) - tries = append(tries, tryAcquire(1)) - tries = append(tries, tryAcquire(1)) - - sem.Release(2) - - tries = append(tries, tryAcquire(1)) - sem.Acquire(ctx, 1) - tries = append(tries, tryAcquire(1)) - - want := []bool{true, false, true, false} - for i := range tries { - if tries[i] != want[i] { - t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i]) - } - } -} - -func TestWeightedDoesntBlockIfTooBig(t *testing.T) { - t.Parallel() - - const n = 2 - sem := semaphore.NewWeighted(n) - { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go sem.Acquire(ctx, n+1) - } - - g, ctx := errgroup.WithContext(context.Background()) - for i := n * 3; i > 0; i-- { - g.Go(func() error { - err := sem.Acquire(ctx, 1) - if err == nil { - time.Sleep(1 * time.Millisecond) - sem.Release(1) - } - return err - }) - } - if err := g.Wait(); err != nil { - t.Errorf("semaphore.NewWeighted(%v) failed to AcquireCtx(_, 1) with AcquireCtx(_, %v) pending", n, n+1) - } -} - -// TestLargeAcquireDoesntStarve times out if a large call to Acquire starves. -// Merely returning from the test function indicates success. -func TestLargeAcquireDoesntStarve(t *testing.T) { - t.Parallel() - - ctx := context.Background() - n := int64(runtime.GOMAXPROCS(0)) - sem := semaphore.NewWeighted(n) - running := true - - var wg sync.WaitGroup - wg.Add(int(n)) - for i := n; i > 0; i-- { - sem.Acquire(ctx, 1) - go func() { - defer func() { - sem.Release(1) - wg.Done() - }() - for running { - time.Sleep(1 * time.Millisecond) - sem.Release(1) - sem.Acquire(ctx, 1) - } - }() - } - - sem.Acquire(ctx, n) - running = false - sem.Release(n) - wg.Wait() -} - -// translated from https://github.com/zhiqiangxu/util/blob/master/mutex/crwmutex_test.go#L43 -func TestAllocCancelDoesntStarve(t *testing.T) { - sem := semaphore.NewWeighted(10) - - // Block off a portion of the semaphore so that Acquire(_, 10) can eventually succeed. - sem.Acquire(context.Background(), 1) - - // In the background, Acquire(_, 10). - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go func() { - sem.Acquire(ctx, 10) - }() - - // Wait until the Acquire(_, 10) call blocks. - for sem.TryAcquire(1) { - sem.Release(1) - runtime.Gosched() - } - - // Now try to grab a read lock, and simultaneously unblock the Acquire(_, 10) call. - // Both Acquire calls should unblock and return, in either order. - go cancel() - - err := sem.Acquire(context.Background(), 1) - if err != nil { - t.Fatalf("Acquire(_, 1) failed unexpectedly: %v", err) - } - sem.Release(1) -} - -func TestWeightedAcquireCanceled(t *testing.T) { - // https://go.dev/issue/63615 - sem := semaphore.NewWeighted(2) - ctx, cancel := context.WithCancel(context.Background()) - sem.Acquire(context.Background(), 1) - ch := make(chan struct{}) - go func() { - // Synchronize with the Acquire(2) below. - for sem.TryAcquire(1) { - sem.Release(1) - } - // Now cancel ctx, and then release the token. - cancel() - sem.Release(1) - close(ch) - }() - // Since the context closing happens before enough tokens become available, - // this Acquire must fail. - if err := sem.Acquire(ctx, 2); err != context.Canceled { - t.Errorf("Acquire with canceled context returned wrong error: want context.Canceled, got %v", err) - } - // There must always be two tokens in the semaphore after the other - // goroutine releases the one we held at the start. - <-ch - if !sem.TryAcquire(2) { - t.Fatal("TryAcquire after canceled Acquire failed") - } - // Additionally verify that we don't acquire with a done context even when - // we wouldn't need to block to do so. - sem.Release(2) - if err := sem.Acquire(ctx, 1); err != context.Canceled { - t.Errorf("Acquire with canceled context returned wrong error: want context.Canceled, got %v", err) - } -} diff --git a/singleflight/singleflight.go b/singleflight/singleflight.go deleted file mode 100644 index 4051830..0000000 --- a/singleflight/singleflight.go +++ /dev/null @@ -1,214 +0,0 @@ -// Copyright 2013 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package singleflight provides a duplicate function call suppression -// mechanism. -package singleflight // import "golang.org/x/sync/singleflight" - -import ( - "bytes" - "errors" - "fmt" - "runtime" - "runtime/debug" - "sync" -) - -// errGoexit indicates the runtime.Goexit was called in -// the user given function. -var errGoexit = errors.New("runtime.Goexit was called") - -// A panicError is an arbitrary value recovered from a panic -// with the stack trace during the execution of given function. -type panicError struct { - value interface{} - stack []byte -} - -// Error implements error interface. -func (p *panicError) Error() string { - return fmt.Sprintf("%v\n\n%s", p.value, p.stack) -} - -func (p *panicError) Unwrap() error { - err, ok := p.value.(error) - if !ok { - return nil - } - - return err -} - -func newPanicError(v interface{}) error { - stack := debug.Stack() - - // The first line of the stack trace is of the form "goroutine N [status]:" - // but by the time the panic reaches Do the goroutine may no longer exist - // and its status will have changed. Trim out the misleading line. - if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { - stack = stack[line+1:] - } - return &panicError{value: v, stack: stack} -} - -// call is an in-flight or completed singleflight.Do call -type call struct { - wg sync.WaitGroup - - // These fields are written once before the WaitGroup is done - // and are only read after the WaitGroup is done. - val interface{} - err error - - // These fields are read and written with the singleflight - // mutex held before the WaitGroup is done, and are read but - // not written after the WaitGroup is done. - dups int - chans []chan<- Result -} - -// Group represents a class of work and forms a namespace in -// which units of work can be executed with duplicate suppression. -type Group struct { - mu sync.Mutex // protects m - m map[string]*call // lazily initialized -} - -// Result holds the results of Do, so they can be passed -// on a channel. -type Result struct { - Val interface{} - Err error - Shared bool -} - -// Do executes and returns the results of the given function, making -// sure that only one execution is in-flight for a given key at a -// time. If a duplicate comes in, the duplicate caller waits for the -// original to complete and receives the same results. -// The return value shared indicates whether v was given to multiple callers. -func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { - g.mu.Lock() - if g.m == nil { - g.m = make(map[string]*call) - } - if c, ok := g.m[key]; ok { - c.dups++ - g.mu.Unlock() - c.wg.Wait() - - if e, ok := c.err.(*panicError); ok { - panic(e) - } else if c.err == errGoexit { - runtime.Goexit() - } - return c.val, c.err, true - } - c := new(call) - c.wg.Add(1) - g.m[key] = c - g.mu.Unlock() - - g.doCall(c, key, fn) - return c.val, c.err, c.dups > 0 -} - -// DoChan is like Do but returns a channel that will receive the -// results when they are ready. -// -// The returned channel will not be closed. -func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { - ch := make(chan Result, 1) - g.mu.Lock() - if g.m == nil { - g.m = make(map[string]*call) - } - if c, ok := g.m[key]; ok { - c.dups++ - c.chans = append(c.chans, ch) - g.mu.Unlock() - return ch - } - c := &call{chans: []chan<- Result{ch}} - c.wg.Add(1) - g.m[key] = c - g.mu.Unlock() - - go g.doCall(c, key, fn) - - return ch -} - -// doCall handles the single call for a key. -func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { - normalReturn := false - recovered := false - - // use double-defer to distinguish panic from runtime.Goexit, - // more details see https://golang.org/cl/134395 - defer func() { - // the given function invoked runtime.Goexit - if !normalReturn && !recovered { - c.err = errGoexit - } - - g.mu.Lock() - defer g.mu.Unlock() - c.wg.Done() - if g.m[key] == c { - delete(g.m, key) - } - - if e, ok := c.err.(*panicError); ok { - // In order to prevent the waiting channels from being blocked forever, - // needs to ensure that this panic cannot be recovered. - if len(c.chans) > 0 { - go panic(e) - select {} // Keep this goroutine around so that it will appear in the crash dump. - } else { - panic(e) - } - } else if c.err == errGoexit { - // Already in the process of goexit, no need to call again - } else { - // Normal return - for _, ch := range c.chans { - ch <- Result{c.val, c.err, c.dups > 0} - } - } - }() - - func() { - defer func() { - if !normalReturn { - // Ideally, we would wait to take a stack trace until we've determined - // whether this is a panic or a runtime.Goexit. - // - // Unfortunately, the only way we can distinguish the two is to see - // whether the recover stopped the goroutine from terminating, and by - // the time we know that, the part of the stack trace relevant to the - // panic has been discarded. - if r := recover(); r != nil { - c.err = newPanicError(r) - } - } - }() - - c.val, c.err = fn() - normalReturn = true - }() - - if !normalReturn { - recovered = true - } -} - -// Forget tells the singleflight to forget about a key. Future calls -// to Do for this key will call the function rather than waiting for -// an earlier call to complete. -func (g *Group) Forget(key string) { - g.mu.Lock() - delete(g.m, key) - g.mu.Unlock() -} diff --git a/singleflight/singleflight_test.go b/singleflight/singleflight_test.go deleted file mode 100644 index 853ec42..0000000 --- a/singleflight/singleflight_test.go +++ /dev/null @@ -1,422 +0,0 @@ -// Copyright 2013 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package singleflight - -import ( - "bytes" - "errors" - "fmt" - "os" - "os/exec" - "runtime" - "runtime/debug" - "strings" - "sync" - "sync/atomic" - "testing" - "time" -) - -type errValue struct{} - -func (err *errValue) Error() string { - return "error value" -} - -func TestPanicErrorUnwrap(t *testing.T) { - t.Parallel() - - testCases := []struct { - name string - panicValue interface{} - wrappedErrorType bool - }{ - { - name: "panicError wraps non-error type", - panicValue: &panicError{value: "string value"}, - wrappedErrorType: false, - }, - { - name: "panicError wraps error type", - panicValue: &panicError{value: new(errValue)}, - wrappedErrorType: false, - }, - } - - for _, tc := range testCases { - tc := tc - - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - var recovered interface{} - - group := &Group{} - - func() { - defer func() { - recovered = recover() - t.Logf("after panic(%#v) in group.Do, recovered %#v", tc.panicValue, recovered) - }() - - _, _, _ = group.Do(tc.name, func() (interface{}, error) { - panic(tc.panicValue) - }) - }() - - if recovered == nil { - t.Fatal("expected a non-nil panic value") - } - - err, ok := recovered.(error) - if !ok { - t.Fatalf("recovered non-error type: %T", recovered) - } - - if !errors.Is(err, new(errValue)) && tc.wrappedErrorType { - t.Errorf("unexpected wrapped error type %T; want %T", err, new(errValue)) - } - }) - } -} - -func TestDo(t *testing.T) { - var g Group - v, err, _ := g.Do("key", func() (interface{}, error) { - return "bar", nil - }) - if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want { - t.Errorf("Do = %v; want %v", got, want) - } - if err != nil { - t.Errorf("Do error = %v", err) - } -} - -func TestDoErr(t *testing.T) { - var g Group - someErr := errors.New("Some error") - v, err, _ := g.Do("key", func() (interface{}, error) { - return nil, someErr - }) - if err != someErr { - t.Errorf("Do error = %v; want someErr %v", err, someErr) - } - if v != nil { - t.Errorf("unexpected non-nil value %#v", v) - } -} - -func TestDoDupSuppress(t *testing.T) { - var g Group - var wg1, wg2 sync.WaitGroup - c := make(chan string, 1) - var calls int32 - fn := func() (interface{}, error) { - if atomic.AddInt32(&calls, 1) == 1 { - // First invocation. - wg1.Done() - } - v := <-c - c <- v // pump; make available for any future calls - - time.Sleep(10 * time.Millisecond) // let more goroutines enter Do - - return v, nil - } - - const n = 10 - wg1.Add(1) - for i := 0; i < n; i++ { - wg1.Add(1) - wg2.Add(1) - go func() { - defer wg2.Done() - wg1.Done() - v, err, _ := g.Do("key", fn) - if err != nil { - t.Errorf("Do error: %v", err) - return - } - if s, _ := v.(string); s != "bar" { - t.Errorf("Do = %T %v; want %q", v, v, "bar") - } - }() - } - wg1.Wait() - // At least one goroutine is in fn now and all of them have at - // least reached the line before the Do. - c <- "bar" - wg2.Wait() - if got := atomic.LoadInt32(&calls); got <= 0 || got >= n { - t.Errorf("number of calls = %d; want over 0 and less than %d", got, n) - } -} - -// Test that singleflight behaves correctly after Forget called. -// See https://github.com/golang/go/issues/31420 -func TestForget(t *testing.T) { - var g Group - - var ( - firstStarted = make(chan struct{}) - unblockFirst = make(chan struct{}) - firstFinished = make(chan struct{}) - ) - - go func() { - g.Do("key", func() (i interface{}, e error) { - close(firstStarted) - <-unblockFirst - close(firstFinished) - return - }) - }() - <-firstStarted - g.Forget("key") - - unblockSecond := make(chan struct{}) - secondResult := g.DoChan("key", func() (i interface{}, e error) { - <-unblockSecond - return 2, nil - }) - - close(unblockFirst) - <-firstFinished - - thirdResult := g.DoChan("key", func() (i interface{}, e error) { - return 3, nil - }) - - close(unblockSecond) - <-secondResult - r := <-thirdResult - if r.Val != 2 { - t.Errorf("We should receive result produced by second call, expected: 2, got %d", r.Val) - } -} - -func TestDoChan(t *testing.T) { - var g Group - ch := g.DoChan("key", func() (interface{}, error) { - return "bar", nil - }) - - res := <-ch - v := res.Val - err := res.Err - if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want { - t.Errorf("Do = %v; want %v", got, want) - } - if err != nil { - t.Errorf("Do error = %v", err) - } -} - -// Test singleflight behaves correctly after Do panic. -// See https://github.com/golang/go/issues/41133 -func TestPanicDo(t *testing.T) { - var g Group - fn := func() (interface{}, error) { - panic("invalid memory address or nil pointer dereference") - } - - const n = 5 - waited := int32(n) - panicCount := int32(0) - done := make(chan struct{}) - for i := 0; i < n; i++ { - go func() { - defer func() { - if err := recover(); err != nil { - t.Logf("Got panic: %v\n%s", err, debug.Stack()) - atomic.AddInt32(&panicCount, 1) - } - - if atomic.AddInt32(&waited, -1) == 0 { - close(done) - } - }() - - g.Do("key", fn) - }() - } - - select { - case <-done: - if panicCount != n { - t.Errorf("Expect %d panic, but got %d", n, panicCount) - } - case <-time.After(time.Second): - t.Fatalf("Do hangs") - } -} - -func TestGoexitDo(t *testing.T) { - var g Group - fn := func() (interface{}, error) { - runtime.Goexit() - return nil, nil - } - - const n = 5 - waited := int32(n) - done := make(chan struct{}) - for i := 0; i < n; i++ { - go func() { - var err error - defer func() { - if err != nil { - t.Errorf("Error should be nil, but got: %v", err) - } - if atomic.AddInt32(&waited, -1) == 0 { - close(done) - } - }() - _, err, _ = g.Do("key", fn) - }() - } - - select { - case <-done: - case <-time.After(time.Second): - t.Fatalf("Do hangs") - } -} - -func executable(t testing.TB) string { - exe, err := os.Executable() - if err != nil { - t.Skipf("skipping: test executable not found") - } - - // Control case: check whether exec.Command works at all. - // (For example, it might fail with a permission error on iOS.) - cmd := exec.Command(exe, "-test.list=^$") - cmd.Env = []string{} - if err := cmd.Run(); err != nil { - t.Skipf("skipping: exec appears not to work on %s: %v", runtime.GOOS, err) - } - - return exe -} - -func TestPanicDoChan(t *testing.T) { - if os.Getenv("TEST_PANIC_DOCHAN") != "" { - defer func() { - recover() - }() - - g := new(Group) - ch := g.DoChan("", func() (interface{}, error) { - panic("Panicking in DoChan") - }) - <-ch - t.Fatalf("DoChan unexpectedly returned") - } - - t.Parallel() - - cmd := exec.Command(executable(t), "-test.run="+t.Name(), "-test.v") - cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1") - out := new(bytes.Buffer) - cmd.Stdout = out - cmd.Stderr = out - if err := cmd.Start(); err != nil { - t.Fatal(err) - } - - err := cmd.Wait() - t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out) - if err == nil { - t.Errorf("Test subprocess passed; want a crash due to panic in DoChan") - } - if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) { - t.Errorf("Test subprocess failed with an unexpected failure mode.") - } - if !bytes.Contains(out.Bytes(), []byte("Panicking in DoChan")) { - t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in DoChan") - } -} - -func TestPanicDoSharedByDoChan(t *testing.T) { - if os.Getenv("TEST_PANIC_DOCHAN") != "" { - blocked := make(chan struct{}) - unblock := make(chan struct{}) - - g := new(Group) - go func() { - defer func() { - recover() - }() - g.Do("", func() (interface{}, error) { - close(blocked) - <-unblock - panic("Panicking in Do") - }) - }() - - <-blocked - ch := g.DoChan("", func() (interface{}, error) { - panic("DoChan unexpectedly executed callback") - }) - close(unblock) - <-ch - t.Fatalf("DoChan unexpectedly returned") - } - - t.Parallel() - - cmd := exec.Command(executable(t), "-test.run="+t.Name(), "-test.v") - cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1") - out := new(bytes.Buffer) - cmd.Stdout = out - cmd.Stderr = out - if err := cmd.Start(); err != nil { - t.Fatal(err) - } - - err := cmd.Wait() - t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out) - if err == nil { - t.Errorf("Test subprocess passed; want a crash due to panic in Do shared by DoChan") - } - if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) { - t.Errorf("Test subprocess failed with an unexpected failure mode.") - } - if !bytes.Contains(out.Bytes(), []byte("Panicking in Do")) { - t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in Do") - } -} - -func ExampleGroup() { - g := new(Group) - - block := make(chan struct{}) - res1c := g.DoChan("key", func() (interface{}, error) { - <-block - return "func 1", nil - }) - res2c := g.DoChan("key", func() (interface{}, error) { - <-block - return "func 2", nil - }) - close(block) - - res1 := <-res1c - res2 := <-res2c - - // Results are shared by functions executed with duplicate keys. - fmt.Println("Shared:", res2.Shared) - // Only the first function is executed: it is registered and started with "key", - // and doesn't complete before the second function is registered with a duplicate key. - fmt.Println("Equal results:", res1.Val.(string) == res2.Val.(string)) - fmt.Println("Result:", res1.Val) - - // Output: - // Shared: true - // Equal results: true - // Result: func 1 -} diff --git a/syncmap/map.go b/syncmap/map.go deleted file mode 100644 index c9a07f3..0000000 --- a/syncmap/map.go +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright 2019 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package syncmap provides a concurrent map implementation. -// This was the prototype for sync.Map which was added to the standard library's -// sync package in Go 1.9. https://golang.org/pkg/sync/#Map. -package syncmap - -import "sync" // home to the standard library's sync.map implementation as of Go 1.9 - -// Map is a concurrent map with amortized-constant-time loads, stores, and deletes. -// It is safe for multiple goroutines to call a Map's methods concurrently. -// -// The zero Map is valid and empty. -// -// A Map must not be copied after first use. -type Map = sync.Map diff --git a/syncmap/map_bench_test.go b/syncmap/map_bench_test.go deleted file mode 100644 index b279b4f..0000000 --- a/syncmap/map_bench_test.go +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright 2016 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package syncmap_test - -import ( - "fmt" - "reflect" - "sync/atomic" - "testing" - - "golang.org/x/sync/syncmap" -) - -type bench struct { - setup func(*testing.B, mapInterface) - perG func(b *testing.B, pb *testing.PB, i int, m mapInterface) -} - -func benchMap(b *testing.B, bench bench) { - for _, m := range [...]mapInterface{&DeepCopyMap{}, &RWMutexMap{}, &syncmap.Map{}} { - b.Run(fmt.Sprintf("%T", m), func(b *testing.B) { - m = reflect.New(reflect.TypeOf(m).Elem()).Interface().(mapInterface) - if bench.setup != nil { - bench.setup(b, m) - } - - b.ResetTimer() - - var i int64 - b.RunParallel(func(pb *testing.PB) { - id := int(atomic.AddInt64(&i, 1) - 1) - bench.perG(b, pb, id*b.N, m) - }) - }) - } -} - -func BenchmarkLoadMostlyHits(b *testing.B) { - const hits, misses = 1023, 1 - - benchMap(b, bench{ - setup: func(_ *testing.B, m mapInterface) { - for i := 0; i < hits; i++ { - m.LoadOrStore(i, i) - } - // Prime the map to get it into a steady state. - for i := 0; i < hits*2; i++ { - m.Load(i % hits) - } - }, - - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - for ; pb.Next(); i++ { - m.Load(i % (hits + misses)) - } - }, - }) -} - -func BenchmarkLoadMostlyMisses(b *testing.B) { - const hits, misses = 1, 1023 - - benchMap(b, bench{ - setup: func(_ *testing.B, m mapInterface) { - for i := 0; i < hits; i++ { - m.LoadOrStore(i, i) - } - // Prime the map to get it into a steady state. - for i := 0; i < hits*2; i++ { - m.Load(i % hits) - } - }, - - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - for ; pb.Next(); i++ { - m.Load(i % (hits + misses)) - } - }, - }) -} - -func BenchmarkLoadOrStoreBalanced(b *testing.B) { - const hits, misses = 128, 128 - - benchMap(b, bench{ - setup: func(b *testing.B, m mapInterface) { - if _, ok := m.(*DeepCopyMap); ok { - b.Skip("DeepCopyMap has quadratic running time.") - } - for i := 0; i < hits; i++ { - m.LoadOrStore(i, i) - } - // Prime the map to get it into a steady state. - for i := 0; i < hits*2; i++ { - m.Load(i % hits) - } - }, - - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - for ; pb.Next(); i++ { - j := i % (hits + misses) - if j < hits { - if _, ok := m.LoadOrStore(j, i); !ok { - b.Fatalf("unexpected miss for %v", j) - } - } else { - if v, loaded := m.LoadOrStore(i, i); loaded { - b.Fatalf("failed to store %v: existing value %v", i, v) - } - } - } - }, - }) -} - -func BenchmarkLoadOrStoreUnique(b *testing.B) { - benchMap(b, bench{ - setup: func(b *testing.B, m mapInterface) { - if _, ok := m.(*DeepCopyMap); ok { - b.Skip("DeepCopyMap has quadratic running time.") - } - }, - - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - for ; pb.Next(); i++ { - m.LoadOrStore(i, i) - } - }, - }) -} - -func BenchmarkLoadOrStoreCollision(b *testing.B) { - benchMap(b, bench{ - setup: func(_ *testing.B, m mapInterface) { - m.LoadOrStore(0, 0) - }, - - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - for ; pb.Next(); i++ { - m.LoadOrStore(0, 0) - } - }, - }) -} - -func BenchmarkRange(b *testing.B) { - const mapSize = 1 << 10 - - benchMap(b, bench{ - setup: func(_ *testing.B, m mapInterface) { - for i := 0; i < mapSize; i++ { - m.Store(i, i) - } - }, - - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - for ; pb.Next(); i++ { - m.Range(func(_, _ interface{}) bool { return true }) - } - }, - }) -} - -// BenchmarkAdversarialAlloc tests performance when we store a new value -// immediately whenever the map is promoted to clean and otherwise load a -// unique, missing key. -// -// This forces the Load calls to always acquire the map's mutex. -func BenchmarkAdversarialAlloc(b *testing.B) { - benchMap(b, bench{ - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - var stores, loadsSinceStore int64 - for ; pb.Next(); i++ { - m.Load(i) - if loadsSinceStore++; loadsSinceStore > stores { - m.LoadOrStore(i, stores) - loadsSinceStore = 0 - stores++ - } - } - }, - }) -} - -// BenchmarkAdversarialDelete tests performance when we periodically delete -// one key and add a different one in a large map. -// -// This forces the Load calls to always acquire the map's mutex and periodically -// makes a full copy of the map despite changing only one entry. -func BenchmarkAdversarialDelete(b *testing.B) { - const mapSize = 1 << 10 - - benchMap(b, bench{ - setup: func(_ *testing.B, m mapInterface) { - for i := 0; i < mapSize; i++ { - m.Store(i, i) - } - }, - - perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { - for ; pb.Next(); i++ { - m.Load(i) - - if i%mapSize == 0 { - m.Range(func(k, _ interface{}) bool { - m.Delete(k) - return false - }) - m.Store(i, i) - } - } - }, - }) -} diff --git a/syncmap/map_reference_test.go b/syncmap/map_reference_test.go deleted file mode 100644 index 923c51b..0000000 --- a/syncmap/map_reference_test.go +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright 2016 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package syncmap_test - -import ( - "sync" - "sync/atomic" -) - -// This file contains reference map implementations for unit-tests. - -// mapInterface is the interface Map implements. -type mapInterface interface { - Load(interface{}) (interface{}, bool) - Store(key, value interface{}) - LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) - Delete(interface{}) - Range(func(key, value interface{}) (shouldContinue bool)) -} - -// RWMutexMap is an implementation of mapInterface using a sync.RWMutex. -type RWMutexMap struct { - mu sync.RWMutex - dirty map[interface{}]interface{} -} - -func (m *RWMutexMap) Load(key interface{}) (value interface{}, ok bool) { - m.mu.RLock() - value, ok = m.dirty[key] - m.mu.RUnlock() - return -} - -func (m *RWMutexMap) Store(key, value interface{}) { - m.mu.Lock() - if m.dirty == nil { - m.dirty = make(map[interface{}]interface{}) - } - m.dirty[key] = value - m.mu.Unlock() -} - -func (m *RWMutexMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { - m.mu.Lock() - actual, loaded = m.dirty[key] - if !loaded { - actual = value - if m.dirty == nil { - m.dirty = make(map[interface{}]interface{}) - } - m.dirty[key] = value - } - m.mu.Unlock() - return actual, loaded -} - -func (m *RWMutexMap) Delete(key interface{}) { - m.mu.Lock() - delete(m.dirty, key) - m.mu.Unlock() -} - -func (m *RWMutexMap) Range(f func(key, value interface{}) (shouldContinue bool)) { - m.mu.RLock() - keys := make([]interface{}, 0, len(m.dirty)) - for k := range m.dirty { - keys = append(keys, k) - } - m.mu.RUnlock() - - for _, k := range keys { - v, ok := m.Load(k) - if !ok { - continue - } - if !f(k, v) { - break - } - } -} - -// DeepCopyMap is an implementation of mapInterface using a Mutex and -// atomic.Value. It makes deep copies of the map on every write to avoid -// acquiring the Mutex in Load. -type DeepCopyMap struct { - mu sync.Mutex - clean atomic.Value -} - -func (m *DeepCopyMap) Load(key interface{}) (value interface{}, ok bool) { - clean, _ := m.clean.Load().(map[interface{}]interface{}) - value, ok = clean[key] - return value, ok -} - -func (m *DeepCopyMap) Store(key, value interface{}) { - m.mu.Lock() - dirty := m.dirty() - dirty[key] = value - m.clean.Store(dirty) - m.mu.Unlock() -} - -func (m *DeepCopyMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { - clean, _ := m.clean.Load().(map[interface{}]interface{}) - actual, loaded = clean[key] - if loaded { - return actual, loaded - } - - m.mu.Lock() - // Reload clean in case it changed while we were waiting on m.mu. - clean, _ = m.clean.Load().(map[interface{}]interface{}) - actual, loaded = clean[key] - if !loaded { - dirty := m.dirty() - dirty[key] = value - actual = value - m.clean.Store(dirty) - } - m.mu.Unlock() - return actual, loaded -} - -func (m *DeepCopyMap) Delete(key interface{}) { - m.mu.Lock() - dirty := m.dirty() - delete(dirty, key) - m.clean.Store(dirty) - m.mu.Unlock() -} - -func (m *DeepCopyMap) Range(f func(key, value interface{}) (shouldContinue bool)) { - clean, _ := m.clean.Load().(map[interface{}]interface{}) - for k, v := range clean { - if !f(k, v) { - break - } - } -} - -func (m *DeepCopyMap) dirty() map[interface{}]interface{} { - clean, _ := m.clean.Load().(map[interface{}]interface{}) - dirty := make(map[interface{}]interface{}, len(clean)+1) - for k, v := range clean { - dirty[k] = v - } - return dirty -} diff --git a/syncmap/map_test.go b/syncmap/map_test.go deleted file mode 100644 index bf69f50..0000000 --- a/syncmap/map_test.go +++ /dev/null @@ -1,172 +0,0 @@ -// Copyright 2016 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package syncmap_test - -import ( - "math/rand" - "reflect" - "runtime" - "sync" - "testing" - "testing/quick" - - "golang.org/x/sync/syncmap" -) - -type mapOp string - -const ( - opLoad = mapOp("Load") - opStore = mapOp("Store") - opLoadOrStore = mapOp("LoadOrStore") - opDelete = mapOp("Delete") -) - -var mapOps = [...]mapOp{opLoad, opStore, opLoadOrStore, opDelete} - -// mapCall is a quick.Generator for calls on mapInterface. -type mapCall struct { - op mapOp - k, v interface{} -} - -func (c mapCall) apply(m mapInterface) (interface{}, bool) { - switch c.op { - case opLoad: - return m.Load(c.k) - case opStore: - m.Store(c.k, c.v) - return nil, false - case opLoadOrStore: - return m.LoadOrStore(c.k, c.v) - case opDelete: - m.Delete(c.k) - return nil, false - default: - panic("invalid mapOp") - } -} - -type mapResult struct { - value interface{} - ok bool -} - -func randValue(r *rand.Rand) interface{} { - b := make([]byte, r.Intn(4)) - for i := range b { - b[i] = 'a' + byte(rand.Intn(26)) - } - return string(b) -} - -func (mapCall) Generate(r *rand.Rand, size int) reflect.Value { - c := mapCall{op: mapOps[rand.Intn(len(mapOps))], k: randValue(r)} - switch c.op { - case opStore, opLoadOrStore: - c.v = randValue(r) - } - return reflect.ValueOf(c) -} - -func applyCalls(m mapInterface, calls []mapCall) (results []mapResult, final map[interface{}]interface{}) { - for _, c := range calls { - v, ok := c.apply(m) - results = append(results, mapResult{v, ok}) - } - - final = make(map[interface{}]interface{}) - m.Range(func(k, v interface{}) bool { - final[k] = v - return true - }) - - return results, final -} - -func applyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { - return applyCalls(new(syncmap.Map), calls) -} - -func applyRWMutexMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { - return applyCalls(new(RWMutexMap), calls) -} - -func applyDeepCopyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { - return applyCalls(new(DeepCopyMap), calls) -} - -func TestMapMatchesRWMutex(t *testing.T) { - if err := quick.CheckEqual(applyMap, applyRWMutexMap, nil); err != nil { - t.Error(err) - } -} - -func TestMapMatchesDeepCopy(t *testing.T) { - if err := quick.CheckEqual(applyMap, applyDeepCopyMap, nil); err != nil { - t.Error(err) - } -} - -func TestConcurrentRange(t *testing.T) { - const mapSize = 1 << 10 - - m := new(syncmap.Map) - for n := int64(1); n <= mapSize; n++ { - m.Store(n, n) - } - - done := make(chan struct{}) - var wg sync.WaitGroup - defer func() { - close(done) - wg.Wait() - }() - for g := int64(runtime.GOMAXPROCS(0)); g > 0; g-- { - r := rand.New(rand.NewSource(g)) - wg.Add(1) - go func(g int64) { - defer wg.Done() - for i := int64(0); ; i++ { - select { - case <-done: - return - default: - } - for n := int64(1); n < mapSize; n++ { - if r.Int63n(mapSize) == 0 { - m.Store(n, n*i*g) - } else { - m.Load(n) - } - } - } - }(g) - } - - iters := 1 << 10 - if testing.Short() { - iters = 16 - } - for n := iters; n > 0; n-- { - seen := make(map[int64]bool, mapSize) - - m.Range(func(ki, vi interface{}) bool { - k, v := ki.(int64), vi.(int64) - if v%k != 0 { - t.Fatalf("while Storing multiples of %v, Range saw value %v", k, v) - } - if seen[k] { - t.Fatalf("Range visited key %v twice", k) - } - seen[k] = true - return true - }) - - if len(seen) != mapSize { - t.Fatalf("Range visited %v elements of %v-element Map", len(seen), mapSize) - } - } -}