| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | package mutexes | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | 	"runtime" | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 	"sync" | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | 	"sync/atomic" | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | // locktype defines maskable mutexmap lock types. | 
					
						
							|  |  |  | type locktype uint8 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | const ( | 
					
						
							|  |  |  | 	// possible lock types. | 
					
						
							|  |  |  | 	lockTypeRead  = locktype(1) << 0 | 
					
						
							|  |  |  | 	lockTypeWrite = locktype(1) << 1 | 
					
						
							|  |  |  | 	lockTypeMap   = locktype(1) << 2 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// possible mutexmap states. | 
					
						
							|  |  |  | 	stateUnlockd = uint8(0) | 
					
						
							|  |  |  | 	stateRLocked = uint8(1) | 
					
						
							|  |  |  | 	stateLocked  = uint8(2) | 
					
						
							|  |  |  | 	stateInUse   = uint8(3) | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // permitLockType returns if provided locktype is permitted to go ahead in current state. | 
					
						
							|  |  |  | func permitLockType(state uint8, lt locktype) bool { | 
					
						
							|  |  |  | 	switch state { | 
					
						
							|  |  |  | 	// Unlocked state | 
					
						
							|  |  |  | 	// (all allowed) | 
					
						
							|  |  |  | 	case stateUnlockd: | 
					
						
							|  |  |  | 		return true | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Keys locked, no state lock. | 
					
						
							|  |  |  | 	// (don't allow map locks) | 
					
						
							|  |  |  | 	case stateInUse: | 
					
						
							|  |  |  | 		return lt&lockTypeMap == 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Read locked | 
					
						
							|  |  |  | 	// (only allow read locks) | 
					
						
							|  |  |  | 	case stateRLocked: | 
					
						
							|  |  |  | 		return lt&lockTypeRead != 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Write locked | 
					
						
							|  |  |  | 	// (none allowed) | 
					
						
							|  |  |  | 	case stateLocked: | 
					
						
							|  |  |  | 		return false | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// shouldn't reach here | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		panic("unexpected state") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | // MutexMap is a structure that allows having a map of self-evicting mutexes | 
					
						
							|  |  |  | // by key. You do not need to worry about managing the contents of the map, | 
					
						
							|  |  |  | // only requesting RLock/Lock for keys, and ensuring to call the returned | 
					
						
							|  |  |  | // unlock functions. | 
					
						
							| 
									
										
										
										
											2021-09-13 09:33:01 +01:00
										 |  |  | type MutexMap struct { | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 	mus   map[string]RWMutex | 
					
						
							|  |  |  | 	mapMu sync.Mutex | 
					
						
							|  |  |  | 	pool  sync.Pool | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | 	queue []func() | 
					
						
							|  |  |  | 	evict []func() | 
					
						
							|  |  |  | 	count int32 | 
					
						
							|  |  |  | 	maxmu int32 | 
					
						
							|  |  |  | 	state uint8 | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | // NewMap returns a new MutexMap instance with provided max no. open mutexes. | 
					
						
							|  |  |  | func NewMap(max int32) MutexMap { | 
					
						
							|  |  |  | 	if max < 1 { | 
					
						
							|  |  |  | 		// Default = 128 * GOMAXPROCS | 
					
						
							|  |  |  | 		procs := runtime.GOMAXPROCS(0) | 
					
						
							|  |  |  | 		max = int32(procs * 128) | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-09-13 09:33:01 +01:00
										 |  |  | 	return MutexMap{ | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | 		mus: make(map[string]RWMutex), | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 		pool: sync.Pool{ | 
					
						
							|  |  |  | 			New: func() interface{} { | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | 				return NewRW() | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 			}, | 
					
						
							|  |  |  | 		}, | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | 		maxmu: max, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // acquire will either acquire a mutex from pool or alloc. | 
					
						
							|  |  |  | func (mm *MutexMap) acquire() RWMutex { | 
					
						
							|  |  |  | 	return mm.pool.Get().(RWMutex) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // release will release provided mutex to pool. | 
					
						
							|  |  |  | func (mm *MutexMap) release(mu RWMutex) { | 
					
						
							|  |  |  | 	mm.pool.Put(mu) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // spinLock will wait (using a mutex to sleep thread) until 'cond()' returns true, | 
					
						
							|  |  |  | // returning with map lock. Note that 'cond' is performed within a map lock. | 
					
						
							|  |  |  | func (mm *MutexMap) spinLock(cond func() bool) { | 
					
						
							|  |  |  | 	mu := mm.acquire() | 
					
						
							|  |  |  | 	defer mm.release(mu) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		// Get map lock | 
					
						
							|  |  |  | 		mm.mapMu.Lock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Check if return | 
					
						
							|  |  |  | 		if cond() { | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Queue ourselves | 
					
						
							|  |  |  | 		unlock := mu.Lock() | 
					
						
							|  |  |  | 		mm.queue = append(mm.queue, unlock) | 
					
						
							|  |  |  | 		mm.mapMu.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Wait on notify | 
					
						
							|  |  |  | 		mu.Lock()() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // lockMutex will acquire a lock on the mutex at provided key, handling earlier allocated mutex if provided. Unlocks map on return. | 
					
						
							|  |  |  | func (mm *MutexMap) lockMutex(key string, lt locktype) func() { | 
					
						
							|  |  |  | 	var unlock func() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Incr counter | 
					
						
							|  |  |  | 	mm.count++ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Check for existing mutex at key | 
					
						
							|  |  |  | 	mu, ok := mm.mus[key] | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		// Alloc from pool | 
					
						
							|  |  |  | 		mu = mm.acquire() | 
					
						
							|  |  |  | 		mm.mus[key] = mu | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Queue mutex for eviction | 
					
						
							|  |  |  | 		mm.evict = append(mm.evict, func() { | 
					
						
							|  |  |  | 			delete(mm.mus, key) | 
					
						
							|  |  |  | 			mm.pool.Put(mu) | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// If no state, set in use. | 
					
						
							|  |  |  | 	// State will already have been | 
					
						
							|  |  |  | 	// set if this is from LockState{} | 
					
						
							|  |  |  | 	if mm.state == stateUnlockd { | 
					
						
							|  |  |  | 		mm.state = stateInUse | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	switch { | 
					
						
							|  |  |  | 	// Read lock | 
					
						
							|  |  |  | 	case lt&lockTypeRead != 0: | 
					
						
							|  |  |  | 		unlock = mu.RLock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Write lock | 
					
						
							|  |  |  | 	case lt&lockTypeWrite != 0: | 
					
						
							|  |  |  | 		unlock = mu.Lock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// shouldn't reach here | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		panic("unexpected lock type") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Unlock map + return | 
					
						
							|  |  |  | 	mm.mapMu.Unlock() | 
					
						
							|  |  |  | 	return func() { | 
					
						
							|  |  |  | 		mm.mapMu.Lock() | 
					
						
							|  |  |  | 		unlock() | 
					
						
							|  |  |  | 		go mm.onUnlock() | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | // onUnlock is performed as the final (async) stage of releasing an acquired key / map mutex. | 
					
						
							|  |  |  | func (mm *MutexMap) onUnlock() { | 
					
						
							|  |  |  | 	// Decr counter | 
					
						
							|  |  |  | 	mm.count-- | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if mm.count < 1 { | 
					
						
							|  |  |  | 		// Perform all queued evictions | 
					
						
							|  |  |  | 		for i := 0; i < len(mm.evict); i++ { | 
					
						
							|  |  |  | 			mm.evict[i]() | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | 		// Notify all waiting goroutines | 
					
						
							|  |  |  | 		for i := 0; i < len(mm.queue); i++ { | 
					
						
							|  |  |  | 			mm.queue[i]() | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | 		// Reset the map state | 
					
						
							|  |  |  | 		mm.evict = nil | 
					
						
							|  |  |  | 		mm.queue = nil | 
					
						
							|  |  |  | 		mm.state = stateUnlockd | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Finally, unlock | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 	mm.mapMu.Unlock() | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | // RLockMap acquires a read lock over the entire map, returning a lock state for acquiring key read locks. | 
					
						
							|  |  |  | // Please note that the 'unlock()' function will block until all keys locked from this state are unlocked. | 
					
						
							|  |  |  | func (mm *MutexMap) RLockMap() *LockState { | 
					
						
							|  |  |  | 	return mm.getMapLock(lockTypeRead) | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | // LockMap acquires a write lock over the entire map, returning a lock state for acquiring key read/write locks. | 
					
						
							|  |  |  | // Please note that the 'unlock()' function will block until all keys locked from this state are unlocked. | 
					
						
							|  |  |  | func (mm *MutexMap) LockMap() *LockState { | 
					
						
							|  |  |  | 	return mm.getMapLock(lockTypeWrite) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // RLock acquires a mutex read lock for supplied key, returning an RUnlock function. | 
					
						
							|  |  |  | func (mm *MutexMap) RLock(key string) (runlock func()) { | 
					
						
							|  |  |  | 	return mm.getLock(key, lockTypeRead) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Lock acquires a mutex write lock for supplied key, returning an Unlock function. | 
					
						
							|  |  |  | func (mm *MutexMap) Lock(key string) (unlock func()) { | 
					
						
							|  |  |  | 	return mm.getLock(key, lockTypeWrite) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // getLock will fetch lock of provided type, for given key, returning unlock function. | 
					
						
							|  |  |  | func (mm *MutexMap) getLock(key string, lt locktype) func() { | 
					
						
							|  |  |  | 	// Spin until achieve lock | 
					
						
							|  |  |  | 	mm.spinLock(func() bool { | 
					
						
							|  |  |  | 		return permitLockType(mm.state, lt) && | 
					
						
							|  |  |  | 			mm.count < mm.maxmu // not overloaded | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Perform actual mutex lock | 
					
						
							|  |  |  | 	return mm.lockMutex(key, lt) | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | // getMapLock will acquire a map lock of provided type, returning a LockState session. | 
					
						
							|  |  |  | func (mm *MutexMap) getMapLock(lt locktype) *LockState { | 
					
						
							|  |  |  | 	// Spin until achieve lock | 
					
						
							|  |  |  | 	mm.spinLock(func() bool { | 
					
						
							|  |  |  | 		return permitLockType(mm.state, lt|lockTypeMap) && | 
					
						
							|  |  |  | 			mm.count < mm.maxmu // not overloaded | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Incr counter | 
					
						
							|  |  |  | 	mm.count++ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	switch { | 
					
						
							|  |  |  | 	// Set read lock state | 
					
						
							|  |  |  | 	case lt&lockTypeRead != 0: | 
					
						
							|  |  |  | 		mm.state = stateRLocked | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Set write lock state | 
					
						
							|  |  |  | 	case lt&lockTypeWrite != 0: | 
					
						
							|  |  |  | 		mm.state = stateLocked | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		panic("unexpected lock type") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Unlock + return | 
					
						
							|  |  |  | 	mm.mapMu.Unlock() | 
					
						
							|  |  |  | 	return &LockState{ | 
					
						
							|  |  |  | 		mmap: mm, | 
					
						
							|  |  |  | 		ltyp: lt, | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | // LockState represents a window to a locked MutexMap. | 
					
						
							|  |  |  | type LockState struct { | 
					
						
							|  |  |  | 	wait sync.WaitGroup | 
					
						
							|  |  |  | 	mmap *MutexMap | 
					
						
							|  |  |  | 	done uint32 | 
					
						
							|  |  |  | 	ltyp locktype | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | // Lock: see MutexMap.Lock() definition. Will panic if map only read locked. | 
					
						
							|  |  |  | func (st *LockState) Lock(key string) (unlock func()) { | 
					
						
							|  |  |  | 	return st.getLock(key, lockTypeWrite) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // RLock: see MutexMap.RLock() definition. | 
					
						
							|  |  |  | func (st *LockState) RLock(key string) (runlock func()) { | 
					
						
							|  |  |  | 	return st.getLock(key, lockTypeRead) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // UnlockMap will close this state and release the currently locked map. | 
					
						
							|  |  |  | func (st *LockState) UnlockMap() { | 
					
						
							|  |  |  | 	// Set state to finished (or panic if already done) | 
					
						
							|  |  |  | 	if !atomic.CompareAndSwapUint32(&st.done, 0, 1) { | 
					
						
							|  |  |  | 		panic("called UnlockMap() on expired state") | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | 	// Wait until done | 
					
						
							|  |  |  | 	st.wait.Wait() | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | 	// Async reset map | 
					
						
							|  |  |  | 	st.mmap.mapMu.Lock() | 
					
						
							|  |  |  | 	go st.mmap.onUnlock() | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | // getLock: see MutexMap.getLock() definition. | 
					
						
							|  |  |  | func (st *LockState) getLock(key string, lt locktype) func() { | 
					
						
							|  |  |  | 	st.wait.Add(1) // track lock | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | 	// Check if closed, or if write lock is allowed | 
					
						
							|  |  |  | 	if atomic.LoadUint32(&st.done) == 1 { | 
					
						
							|  |  |  | 		panic("map lock closed") | 
					
						
							|  |  |  | 	} else if lt&lockTypeWrite != 0 && | 
					
						
							|  |  |  | 		st.ltyp&lockTypeWrite == 0 { | 
					
						
							|  |  |  | 		panic("called .Lock() on rlocked map") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Spin until achieve map lock | 
					
						
							|  |  |  | 	st.mmap.spinLock(func() bool { | 
					
						
							|  |  |  | 		return st.mmap.count < st.mmap.maxmu | 
					
						
							|  |  |  | 	}) // i.e. not overloaded | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-24 17:35:13 +01:00
										 |  |  | 	// Perform actual mutex lock | 
					
						
							|  |  |  | 	unlock := st.mmap.lockMutex(key, lt) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return func() { | 
					
						
							|  |  |  | 		unlock() | 
					
						
							|  |  |  | 		st.wait.Done() | 
					
						
							| 
									
										
										
										
											2021-09-11 20:12:47 +01:00
										 |  |  | 	} | 
					
						
							|  |  |  | } |