// copy from singleflight.go // ... // call is an in-flight or completed singleflight.Do call type call struct { wg sync.WaitGroup
// These fields are written once before the WaitGroup is done // and are only read after the WaitGroup is done. val interface{} err error
// These fields are read and written with the singleflight // mutex held before the WaitGroup is done, and are read but // not written after the WaitGroup is done. dups int chans []chan<- Result }
// Group represents a class of work and forms a namespace in // which units of work can be executed with duplicate suppression. type Group struct { mu sync.Mutex // protects m m map[string]*call // lazily initialized }
// ...
func(g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ g.mu.Unlock() c.wg.Wait()
if e, ok := c.err.(*panicError); ok { panic(e) } elseif c.err == errGoexit { runtime.Goexit() } return c.val, c.err, true } c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock()
// doCall handles the single call for a key. func(g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { normalReturn := false recovered := false
// use double-defer to distinguish panic from runtime.Goexit, // more details see https://golang.org/cl/134395 deferfunc() { // the given function invoked runtime.Goexit if !normalReturn && !recovered { c.err = errGoexit }
g.mu.Lock() defer g.mu.Unlock() c.wg.Done() if g.m[key] == c { delete(g.m, key) }
if e, ok := c.err.(*panicError); ok { // In order to prevent the waiting channels from being blocked forever, // needs to ensure that this panic cannot be recovered. iflen(c.chans) > 0 { gopanic(e) select {} // Keep this goroutine around so that it will appear in the crash dump. } else { panic(e) } } elseif c.err == errGoexit { // Already in the process of goexit, no need to call again } else { // Normal return for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } } }()
func() { deferfunc() { if !normalReturn { // Ideally, we would wait to take a stack trace until we've determined // whether this is a panic or a runtime.Goexit. // // Unfortunately, the only way we can distinguish the two is to see // whether the recover stopped the goroutine from terminating, and by // the time we know that, the part of the stack trace relevant to the // panic has been discarded. if r := recover(); r != nil { c.err = newPanicError(r) } } }()