errgroup: revert propagation of panics
This change reverts CL 644575, which caused panics in the f() call after group.Go(f) to be propagated to the subsequent group.Wait call. This caused more problems than it solved. Also: - preserve some of the doc comment wording of Group.Go. - leave a "tsunami stone" comment in Group.Go. Fixes golang/go#53757 Updates golang/go#74275 Updates golang/go#74304 Updates golang/go#74306 Change-Id: I6e3992510944db7d69c72eaf241aedf8b84e62dd Reviewed-on: https://go-review.googlesource.com/c/sync/+/682935 LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Reviewed-by: qiu laidongfeng2 <2645477756@qq.com> Reviewed-by: Junyang Shao <shaojunyang@google.com> Reviewed-by: Sean Liao <sean@liao.dev> Auto-Submit: Sean Liao <sean@liao.dev>
This commit is contained in:
parent
8a14946fb0
commit
7fad2c9213
2 changed files with 31 additions and 151 deletions
|
|
@ -12,8 +12,6 @@ package errgroup
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime"
|
|
||||||
"runtime/debug"
|
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -33,10 +31,6 @@ type Group struct {
|
||||||
|
|
||||||
errOnce sync.Once
|
errOnce sync.Once
|
||||||
err error
|
err error
|
||||||
|
|
||||||
mu sync.Mutex
|
|
||||||
panicValue any // = PanicError | PanicValue; non-nil if some Group.Go goroutine panicked.
|
|
||||||
abnormal bool // some Group.Go goroutine terminated abnormally (panic or goexit).
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Group) done() {
|
func (g *Group) done() {
|
||||||
|
|
@ -56,22 +50,13 @@ func WithContext(ctx context.Context) (*Group, context.Context) {
|
||||||
return &Group{cancel: cancel}, ctx
|
return &Group{cancel: cancel}, ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait blocks until all function calls from the Go method have returned
|
// Wait blocks until all function calls from the Go method have returned, then
|
||||||
// normally, then returns the first non-nil error (if any) from them.
|
// returns the first non-nil error (if any) from them.
|
||||||
//
|
|
||||||
// If any of the calls panics, Wait panics with a [PanicValue];
|
|
||||||
// and if any of them calls [runtime.Goexit], Wait calls runtime.Goexit.
|
|
||||||
func (g *Group) Wait() error {
|
func (g *Group) Wait() error {
|
||||||
g.wg.Wait()
|
g.wg.Wait()
|
||||||
if g.cancel != nil {
|
if g.cancel != nil {
|
||||||
g.cancel(g.err)
|
g.cancel(g.err)
|
||||||
}
|
}
|
||||||
if g.panicValue != nil {
|
|
||||||
panic(g.panicValue)
|
|
||||||
}
|
|
||||||
if g.abnormal {
|
|
||||||
runtime.Goexit()
|
|
||||||
}
|
|
||||||
return g.err
|
return g.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -81,53 +66,31 @@ func (g *Group) Wait() error {
|
||||||
// It blocks until the new goroutine can be added without the number of
|
// It blocks until the new goroutine can be added without the number of
|
||||||
// goroutines in the group exceeding the configured limit.
|
// goroutines in the group exceeding the configured limit.
|
||||||
//
|
//
|
||||||
// The first goroutine in the group that returns a non-nil error, panics, or
|
// The first goroutine in the group that returns a non-nil error will
|
||||||
// invokes [runtime.Goexit] will cancel the associated Context, if any.
|
// cancel the associated Context, if any. The error will be returned
|
||||||
|
// by Wait.
|
||||||
func (g *Group) Go(f func() error) {
|
func (g *Group) Go(f func() error) {
|
||||||
if g.sem != nil {
|
if g.sem != nil {
|
||||||
g.sem <- token{}
|
g.sem <- token{}
|
||||||
}
|
}
|
||||||
|
|
||||||
g.add(f)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Group) add(f func() error) {
|
|
||||||
g.wg.Add(1)
|
g.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer g.done()
|
defer g.done()
|
||||||
normalReturn := false
|
|
||||||
defer func() {
|
|
||||||
if normalReturn {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
v := recover()
|
|
||||||
g.mu.Lock()
|
|
||||||
defer g.mu.Unlock()
|
|
||||||
if !g.abnormal {
|
|
||||||
if g.cancel != nil {
|
|
||||||
g.cancel(g.err)
|
|
||||||
}
|
|
||||||
g.abnormal = true
|
|
||||||
}
|
|
||||||
if v != nil && g.panicValue == nil {
|
|
||||||
switch v := v.(type) {
|
|
||||||
case error:
|
|
||||||
g.panicValue = PanicError{
|
|
||||||
Recovered: v,
|
|
||||||
Stack: debug.Stack(),
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
g.panicValue = PanicValue{
|
|
||||||
Recovered: v,
|
|
||||||
Stack: debug.Stack(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
err := f()
|
// It is tempting to propagate panics from f()
|
||||||
normalReturn = true
|
// up to the goroutine that calls Wait, but
|
||||||
if err != nil {
|
// it creates more problems than it solves:
|
||||||
|
// - it delays panics arbitrarily,
|
||||||
|
// making bugs harder to detect;
|
||||||
|
// - it turns f's panic stack into a mere value,
|
||||||
|
// hiding it from crash-monitoring tools;
|
||||||
|
// - it risks deadlocks that hide the panic entirely,
|
||||||
|
// if f's panic leaves the program in a state
|
||||||
|
// that prevents the Wait call from being reached.
|
||||||
|
// See #53757, #74275, #74304, #74306.
|
||||||
|
|
||||||
|
if err := f(); err != nil {
|
||||||
g.errOnce.Do(func() {
|
g.errOnce.Do(func() {
|
||||||
g.err = err
|
g.err = err
|
||||||
if g.cancel != nil {
|
if g.cancel != nil {
|
||||||
|
|
@ -152,7 +115,19 @@ func (g *Group) TryGo(f func() error) bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
g.add(f)
|
g.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer g.done()
|
||||||
|
|
||||||
|
if err := f(); err != nil {
|
||||||
|
g.errOnce.Do(func() {
|
||||||
|
g.err = err
|
||||||
|
if g.cancel != nil {
|
||||||
|
g.cancel(g.err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}()
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -174,34 +149,3 @@ func (g *Group) SetLimit(n int) {
|
||||||
}
|
}
|
||||||
g.sem = make(chan token, n)
|
g.sem = make(chan token, n)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PanicError wraps an error recovered from an unhandled panic
|
|
||||||
// when calling a function passed to Go or TryGo.
|
|
||||||
type PanicError struct {
|
|
||||||
Recovered error
|
|
||||||
Stack []byte // result of call to [debug.Stack]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p PanicError) Error() string {
|
|
||||||
if len(p.Stack) > 0 {
|
|
||||||
return fmt.Sprintf("recovered from errgroup.Group: %v\n%s", p.Recovered, p.Stack)
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("recovered from errgroup.Group: %v", p.Recovered)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p PanicError) Unwrap() error { return p.Recovered }
|
|
||||||
|
|
||||||
// PanicValue wraps a value that does not implement the error interface,
|
|
||||||
// recovered from an unhandled panic when calling a function passed to Go or
|
|
||||||
// TryGo.
|
|
||||||
type PanicValue struct {
|
|
||||||
Recovered any
|
|
||||||
Stack []byte // result of call to [debug.Stack]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p PanicValue) String() string {
|
|
||||||
if len(p.Stack) > 0 {
|
|
||||||
return fmt.Sprintf("recovered from errgroup.Group: %v\n%s", p.Recovered, p.Stack)
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("recovered from errgroup.Group: %v", p.Recovered)
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -290,69 +289,6 @@ func TestCancelCause(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPanic(t *testing.T) {
|
|
||||||
t.Run("error", func(t *testing.T) {
|
|
||||||
g := &errgroup.Group{}
|
|
||||||
p := errors.New("")
|
|
||||||
g.Go(func() error {
|
|
||||||
panic(p)
|
|
||||||
})
|
|
||||||
defer func() {
|
|
||||||
err := recover()
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("should propagate panic through Wait")
|
|
||||||
}
|
|
||||||
pe, ok := err.(errgroup.PanicError)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("type should is errgroup.PanicError, but is %T", err)
|
|
||||||
}
|
|
||||||
if pe.Recovered != p {
|
|
||||||
t.Fatalf("got %v, want %v", pe.Recovered, p)
|
|
||||||
}
|
|
||||||
if !strings.Contains(pe.Error(), "TestPanic.func") {
|
|
||||||
t.Log(pe.Error())
|
|
||||||
t.Fatalf("stack trace incomplete, does not contain TestPanic.func")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
g.Wait()
|
|
||||||
})
|
|
||||||
t.Run("any", func(t *testing.T) {
|
|
||||||
g := &errgroup.Group{}
|
|
||||||
g.Go(func() error {
|
|
||||||
panic(1)
|
|
||||||
})
|
|
||||||
defer func() {
|
|
||||||
err := recover()
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("should propagate panic through Wait")
|
|
||||||
}
|
|
||||||
pe, ok := err.(errgroup.PanicValue)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("type should is errgroup.PanicValue, but is %T", err)
|
|
||||||
}
|
|
||||||
if pe.Recovered != 1 {
|
|
||||||
t.Fatalf("got %v, want %v", pe.Recovered, 1)
|
|
||||||
}
|
|
||||||
if !strings.Contains(string(pe.Stack), "TestPanic.func") {
|
|
||||||
t.Log(string(pe.Stack))
|
|
||||||
t.Fatalf("stack trace incomplete")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
g.Wait()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGoexit(t *testing.T) {
|
|
||||||
g := &errgroup.Group{}
|
|
||||||
g.Go(func() error {
|
|
||||||
t.Skip()
|
|
||||||
t.Fatalf("Goexit fail")
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
g.Wait()
|
|
||||||
t.Fatalf("should call runtime.Goexit from Wait")
|
|
||||||
}
|
|
||||||
|
|
||||||
func BenchmarkGo(b *testing.B) {
|
func BenchmarkGo(b *testing.B) {
|
||||||
fn := func() {}
|
fn := func() {}
|
||||||
g := &errgroup.Group{}
|
g := &errgroup.Group{}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue