diff --git a/syncmap/map.go b/syncmap/map.go new file mode 100644 index 0000000..1efbcb2 --- /dev/null +++ b/syncmap/map.go @@ -0,0 +1,192 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package syncmap provides a concurrent map implementation. +// It is a prototype for a proposed addition to the sync package +// in the standard library. +// (https://golang.org/issue/18177) +package syncmap + +import ( + "sync" + "sync/atomic" +) + +// Map is a concurrent map with amortized-constant-time operations. +// It is safe for multiple goroutines to call a Map's methods concurrently. +// +// The zero Map is valid and empty. +// +// A Map must not be copied after first use. +type Map struct { + mu sync.Mutex + + // clean is a copy of the map's contents that will never be overwritten, and + // is thus safe for concurrent lookups without additional synchronization. + // + // A nil clean map indicates that the current map contents are stored in the + // dirty field instead. + // If clean is non-nil, its contents are up-to-date. + // + // clean is always safe to load, but must only be stored with mu held. + clean atomic.Value // map[interface{}]interface{} + + // dirty is a copy of the map to which all writes occur. + // + // A nil dirty map indicates that the current map contents are either empty or + // stored in the clean field. + // + // If the dirty map is nil, the next write to the map will initialize it by + // making a deep copy of the clean map, then setting the clean map to nil. + dirty map[interface{}]interface{} + + // misses counts the number of Load calls for which the clean map was nil + // since the last write. + // + // Once enough Load misses have occurred to cover the cost of a copy, the + // dirty map will be promoted to clean and any subsequent writes will make + // a new copy. + misses int +} + +// Load returns the value stored in the map for a key, or nil if no +// value is present. +// The ok result indicates whether value was found in the map. +func (m *Map) Load(key interface{}) (value interface{}, ok bool) { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + if clean != nil { + value, ok = clean[key] + return value, ok + } + + m.mu.Lock() + if m.dirty == nil { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + if clean == nil { + // Completely empty — promote to clean immediately. + m.clean.Store(map[interface{}]interface{}{}) + } else { + value, ok = clean[key] + } + m.mu.Unlock() + return value, ok + } + value, ok = m.dirty[key] + m.missLocked() + m.mu.Unlock() + return value, ok +} + +// Store sets the value for a key. +func (m *Map) Store(key, value interface{}) { + m.mu.Lock() + m.dirtyLocked() + m.dirty[key] = value + m.mu.Unlock() +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { + // Avoid locking if it's a clean hit. + clean, _ := m.clean.Load().(map[interface{}]interface{}) + actual, loaded = clean[key] + if loaded { + return actual, true + } + + m.mu.Lock() + if m.dirty == nil { + // Reload clean in case it changed while we were waiting on m.mu. + clean, _ := m.clean.Load().(map[interface{}]interface{}) + actual, loaded = clean[key] + if loaded { + m.mu.Unlock() + return actual, true + } + } else { + actual, loaded = m.dirty[key] + if loaded { + m.missLocked() + m.mu.Unlock() + return actual, true + } + } + + m.dirtyLocked() + m.dirty[key] = value + actual = value + m.mu.Unlock() + return actual, false +} + +// Delete deletes the value for a key. +func (m *Map) Delete(key interface{}) { + m.mu.Lock() + m.dirtyLocked() + delete(m.dirty, key) + m.mu.Unlock() +} + +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +// +// Calls to other Map methods may block until Range returns. +// The function f must not call any other methods on the Map. +// +// Range does not necessarily correspond to any consistent snapshot of the Map's +// contents: no key will be visited more than once, but if the value for any key +// is stored or deleted concurrently, Range may reflect any mapping for that key +// from any point during the Range call. +func (m *Map) Range(f func(key, value interface{}) bool) { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + if clean == nil { + m.mu.Lock() + if m.dirty == nil { + clean, _ = m.clean.Load().(map[interface{}]interface{}) + if clean == nil { + // Completely empty — add an empty map to bypass m.mu next time. + m.clean.Store(map[interface{}]interface{}{}) + } + } else { + // Range is already O(N), so a call to Range amortizes an entire copy of + // the map. If it is dirty, we can promote it to clean immediately! + clean = m.dirty + m.clean.Store(clean) + m.dirty = nil + } + m.mu.Unlock() + } + + for k, v := range clean { + if !f(k, v) { + break + } + } +} + +func (m *Map) missLocked() { + if m.misses++; m.misses >= len(m.dirty) { + m.clean.Store(m.dirty) + m.dirty = nil + } +} + +// dirtyLocked prepares the map for a subsequent write. +// It ensures that the dirty field is non-nil and clean is nil by making a deep +// copy of clean. +func (m *Map) dirtyLocked() { + m.misses = 0 + if m.dirty != nil { + return + } + + clean, _ := m.clean.Load().(map[interface{}]interface{}) + m.dirty = make(map[interface{}]interface{}, len(clean)) + for k, v := range clean { + m.dirty[k] = v + } + m.clean.Store(map[interface{}]interface{}(nil)) +} diff --git a/syncmap/map_bench_test.go b/syncmap/map_bench_test.go new file mode 100644 index 0000000..ac06274 --- /dev/null +++ b/syncmap/map_bench_test.go @@ -0,0 +1,199 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package syncmap_test + +import ( + "fmt" + "reflect" + "sync/atomic" + "testing" + + "golang.org/x/sync/syncmap" +) + +type bench struct { + setup func(*testing.B, mapInterface) + perG func(b *testing.B, pb *testing.PB, i int, m mapInterface) +} + +func benchMap(b *testing.B, bench bench) { + for _, m := range [...]mapInterface{&DeepCopyMap{}, &RWMutexMap{}, &syncmap.Map{}} { + b.Run(fmt.Sprintf("%T", m), func(b *testing.B) { + m = reflect.New(reflect.TypeOf(m).Elem()).Interface().(mapInterface) + if bench.setup != nil { + bench.setup(b, m) + } + + b.ResetTimer() + + var i int64 + b.RunParallel(func(pb *testing.PB) { + id := int(atomic.AddInt64(&i, 1) - 1) + bench.perG(b, pb, id*b.N, m) + }) + }) + } +} + +func BenchmarkLoadMostlyHits(b *testing.B) { + const hits, misses = 1023, 1 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Load(i % (hits + misses)) + } + }, + }) +} + +func BenchmarkLoadMostlyMisses(b *testing.B) { + const hits, misses = 1, 1023 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Load(i % (hits + misses)) + } + }, + }) +} + +func BenchmarkLoadOrStoreBalanced(b *testing.B) { + const hits, misses = 128, 128 + + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + j := i % (hits + misses) + if j < hits { + if _, ok := m.LoadOrStore(j, i); !ok { + b.Fatalf("unexpected miss for %v", j) + } + } else { + if v, loaded := m.LoadOrStore(i, i); loaded { + b.Fatalf("failed to store %v: existing value %v", i, v) + } + } + } + }, + }) +} + +func BenchmarkLoadOrStoreUnique(b *testing.B) { + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.LoadOrStore(i, i) + } + }, + }) +} + +func BenchmarkLoadOrStoreCollision(b *testing.B) { + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + m.LoadOrStore(0, 0) + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.LoadOrStore(0, 0) + } + }, + }) +} + +// BenchmarkAdversarialAlloc tests performance when we store a new value +// immediately whenever the map is promoted to clean. +// +// This forces the Load calls to always acquire the map's mutex. +func BenchmarkAdversarialAlloc(b *testing.B) { + benchMap(b, bench{ + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + var stores, loadsSinceStore int64 + for ; pb.Next(); i++ { + m.Load(i) + if loadsSinceStore++; loadsSinceStore > stores { + m.LoadOrStore(i, stores) + loadsSinceStore = 0 + stores++ + } + } + }, + }) +} + +// BenchmarkAdversarialDelete tests performance when we delete and restore a +// value immediately after a large map has been promoted. +// +// This forces the Load calls to always acquire the map's mutex and periodically +// makes a full copy of the map despite changing only one entry. +func BenchmarkAdversarialDelete(b *testing.B) { + const mapSize = 1 << 10 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < mapSize; i++ { + m.Store(i, i) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Load(i) + + if i%mapSize == 0 { + var key int + m.Range(func(k, _ interface{}) bool { + key = k.(int) + return false + }) + m.Delete(key) + m.Store(key, key) + } + } + }, + }) +} diff --git a/syncmap/map_reference_test.go b/syncmap/map_reference_test.go new file mode 100644 index 0000000..f3a4977 --- /dev/null +++ b/syncmap/map_reference_test.go @@ -0,0 +1,142 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package syncmap_test + +import ( + "sync" + "sync/atomic" +) + +// This file contains reference map implementations for unit-tests. + +// mapInterface is the interface Map implements. +type mapInterface interface { + Load(interface{}) (interface{}, bool) + Store(key, value interface{}) + LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) + Delete(interface{}) + Range(func(key, value interface{}) (shouldContinue bool)) +} + +// RWMutexMap is an implementation of mapInterface using a sync.RWMutex. +type RWMutexMap struct { + mu sync.RWMutex + dirty map[interface{}]interface{} +} + +func (m *RWMutexMap) Load(key interface{}) (value interface{}, ok bool) { + m.mu.RLock() + value, ok = m.dirty[key] + m.mu.RUnlock() + return +} + +func (m *RWMutexMap) Store(key, value interface{}) { + m.mu.Lock() + if m.dirty == nil { + m.dirty = make(map[interface{}]interface{}) + } + m.dirty[key] = value + m.mu.Unlock() +} + +func (m *RWMutexMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { + m.mu.Lock() + actual, loaded = m.dirty[key] + if !loaded { + actual = value + if m.dirty == nil { + m.dirty = make(map[interface{}]interface{}) + } + m.dirty[key] = value + } + m.mu.Unlock() + return actual, loaded +} + +func (m *RWMutexMap) Delete(key interface{}) { + m.mu.Lock() + delete(m.dirty, key) + m.mu.Unlock() +} + +func (m *RWMutexMap) Range(f func(key, value interface{}) (shouldContinue bool)) { + m.mu.RLock() + defer m.mu.RUnlock() + for k, v := range m.dirty { + if !f(k, v) { + break + } + } +} + +// DeepCopyMap is an implementation of mapInterface using a Mutex and +// atomic.Value. It makes deep copies of the map on every write to avoid +// acquiring the Mutex in Load. +type DeepCopyMap struct { + mu sync.Mutex + clean atomic.Value +} + +func (m *DeepCopyMap) Load(key interface{}) (value interface{}, ok bool) { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + value, ok = clean[key] + return value, ok +} + +func (m *DeepCopyMap) Store(key, value interface{}) { + m.mu.Lock() + dirty := m.dirty() + dirty[key] = value + m.clean.Store(dirty) + m.mu.Unlock() +} + +func (m *DeepCopyMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + actual, loaded = clean[key] + if loaded { + return actual, loaded + } + + m.mu.Lock() + // Reload clean in case it changed while we were waiting on m.mu. + clean, _ = m.clean.Load().(map[interface{}]interface{}) + actual, loaded = clean[key] + if !loaded { + dirty := m.dirty() + dirty[key] = value + actual = value + m.clean.Store(dirty) + } + m.mu.Unlock() + return actual, loaded +} + +func (m *DeepCopyMap) Delete(key interface{}) { + m.mu.Lock() + dirty := m.dirty() + delete(dirty, key) + m.clean.Store(dirty) + m.mu.Unlock() +} + +func (m *DeepCopyMap) Range(f func(key, value interface{}) (shouldContinue bool)) { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + for k, v := range clean { + if !f(k, v) { + break + } + } +} + +func (m *DeepCopyMap) dirty() map[interface{}]interface{} { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + dirty := make(map[interface{}]interface{}, len(clean)+1) + for k, v := range clean { + dirty[k] = v + } + return dirty +} diff --git a/syncmap/map_test.go b/syncmap/map_test.go new file mode 100644 index 0000000..54448a0 --- /dev/null +++ b/syncmap/map_test.go @@ -0,0 +1,116 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package syncmap_test + +import ( + "fmt" + "math/rand" + "reflect" + "testing" + "testing/quick" + + "golang.org/x/sync/syncmap" +) + +// mapCall is a quick.Generator for calls on mapInterface. +type mapCall struct { + key interface{} + apply func(mapInterface) (interface{}, bool) + desc string +} + +type mapResult struct { + value interface{} + ok bool +} + +var stringType = reflect.TypeOf("") + +func randValue(r *rand.Rand) interface{} { + k, ok := quick.Value(stringType, r) + if !ok { + panic(fmt.Sprintf("quick.Value(%v, _) failed", stringType)) + } + return k.Interface() +} + +func (mapCall) Generate(r *rand.Rand, size int) reflect.Value { + k := randValue(r) + + var ( + app func(mapInterface) (interface{}, bool) + desc string + ) + switch rand.Intn(4) { + case 0: + app = func(m mapInterface) (interface{}, bool) { + return m.Load(k) + } + desc = fmt.Sprintf("Load(%q)", k) + + case 1: + v := randValue(r) + app = func(m mapInterface) (interface{}, bool) { + m.Store(k, v) + return nil, false + } + desc = fmt.Sprintf("Store(%q, %q)", k, v) + + case 2: + v := randValue(r) + app = func(m mapInterface) (interface{}, bool) { + return m.LoadOrStore(k, v) + } + desc = fmt.Sprintf("LoadOrStore(%q, %q)", k, v) + + case 3: + app = func(m mapInterface) (interface{}, bool) { + m.Delete(k) + return nil, false + } + desc = fmt.Sprintf("Delete(%q)", k) + } + + return reflect.ValueOf(mapCall{k, app, desc}) +} + +func applyCalls(m mapInterface, calls []mapCall) (results []mapResult, final map[interface{}]interface{}) { + for _, c := range calls { + v, ok := c.apply(m) + results = append(results, mapResult{v, ok}) + } + + final = make(map[interface{}]interface{}) + m.Range(func(k, v interface{}) bool { + final[k] = v + return true + }) + + return results, final +} + +func applyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { + return applyCalls(new(syncmap.Map), calls) +} + +func applyRWMutexMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { + return applyCalls(new(RWMutexMap), calls) +} + +func applyDeepCopyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { + return applyCalls(new(DeepCopyMap), calls) +} + +func TestMapMatchesRWMutex(t *testing.T) { + if err := quick.CheckEqual(applyMap, applyRWMutexMap, nil); err != nil { + t.Error(err) + } +} + +func TestMapMatchesDeepCopy(t *testing.T) { + if err := quick.CheckEqual(applyMap, applyRWMutexMap, nil); err != nil { + t.Error(err) + } +}