mctx: make GetSetMutableValue lock on the key itself, not the entire map, to allow more throughput when being used on multiple keys at once
This commit is contained in:
parent
6e8338a5f8
commit
92ecb09707
60
mctx/ctx.go
60
mctx/ctx.go
@ -50,6 +50,11 @@ func WithTimeout(parent Context, d time.Duration) (Context, CancelFunc) {
|
|||||||
|
|
||||||
type ctxKey int
|
type ctxKey int
|
||||||
|
|
||||||
|
type mutVal struct {
|
||||||
|
l sync.RWMutex
|
||||||
|
v interface{}
|
||||||
|
}
|
||||||
|
|
||||||
type ctxState struct {
|
type ctxState struct {
|
||||||
path []string
|
path []string
|
||||||
l sync.RWMutex
|
l sync.RWMutex
|
||||||
@ -57,7 +62,7 @@ type ctxState struct {
|
|||||||
children map[string]Context
|
children map[string]Context
|
||||||
|
|
||||||
mutL sync.RWMutex
|
mutL sync.RWMutex
|
||||||
mutVals map[interface{}]interface{}
|
mutVals map[interface{}]*mutVal
|
||||||
}
|
}
|
||||||
|
|
||||||
func getCtxState(ctx Context) *ctxState {
|
func getCtxState(ctx Context) *ctxState {
|
||||||
@ -147,6 +152,9 @@ func ChildOf(ctx Context, name string) Context {
|
|||||||
return childCtx
|
return childCtx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// code related to mutable values
|
||||||
|
|
||||||
// MutableValue acts like the Value method, except that it only deals with
|
// MutableValue acts like the Value method, except that it only deals with
|
||||||
// keys/values set by SetMutableValue.
|
// keys/values set by SetMutableValue.
|
||||||
func MutableValue(ctx Context, key interface{}) interface{} {
|
func MutableValue(ctx Context, key interface{}) interface{} {
|
||||||
@ -155,8 +163,12 @@ func MutableValue(ctx Context, key interface{}) interface{} {
|
|||||||
defer s.mutL.RUnlock()
|
defer s.mutL.RUnlock()
|
||||||
if s.mutVals == nil {
|
if s.mutVals == nil {
|
||||||
return nil
|
return nil
|
||||||
|
} else if mVal, ok := s.mutVals[key]; ok {
|
||||||
|
mVal.l.RLock()
|
||||||
|
defer mVal.l.RUnlock()
|
||||||
|
return mVal.v
|
||||||
}
|
}
|
||||||
return s.mutVals[key]
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetSetMutableValue is used to interact with a mutable value on the context in
|
// GetSetMutableValue is used to interact with a mutable value on the context in
|
||||||
@ -172,43 +184,47 @@ func MutableValue(ctx Context, key interface{}) interface{} {
|
|||||||
// Children of this context will _not_ inherit any of its mutable values.
|
// Children of this context will _not_ inherit any of its mutable values.
|
||||||
//
|
//
|
||||||
// Within the callback it is fine to call other functions/methods on the
|
// Within the callback it is fine to call other functions/methods on the
|
||||||
// Context, except for those related to mutable values (e.g. MutableValue and
|
// Context, except for those related to mutable values for this same key (e.g.
|
||||||
// SetMutableValue).
|
// MutableValue and SetMutableValue).
|
||||||
func GetSetMutableValue(
|
func GetSetMutableValue(
|
||||||
ctx Context, noCallbackIfSet bool,
|
ctx Context, noCallbackIfSet bool,
|
||||||
key interface{}, fn func(interface{}) interface{},
|
key interface{}, fn func(interface{}) interface{},
|
||||||
) interface{} {
|
) interface{} {
|
||||||
s := getCtxState(ctx)
|
s := getCtxState(ctx)
|
||||||
|
|
||||||
|
// if noCallbackIfSet, do a fast lookup with MutableValue first.
|
||||||
if noCallbackIfSet {
|
if noCallbackIfSet {
|
||||||
s.mutL.RLock()
|
if v := MutableValue(ctx, key); v != nil {
|
||||||
if s.mutVals != nil && s.mutVals[key] != nil {
|
return v
|
||||||
defer s.mutL.RUnlock()
|
|
||||||
return s.mutVals[key]
|
|
||||||
}
|
}
|
||||||
s.mutL.RUnlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
s.mutL.Lock()
|
s.mutL.Lock()
|
||||||
defer s.mutL.Unlock()
|
|
||||||
|
|
||||||
if s.mutVals == nil {
|
if s.mutVals == nil {
|
||||||
s.mutVals = map[interface{}]interface{}{}
|
s.mutVals = map[interface{}]*mutVal{}
|
||||||
}
|
}
|
||||||
val := s.mutVals[key]
|
mVal, ok := s.mutVals[key]
|
||||||
|
if !ok {
|
||||||
|
mVal = new(mutVal)
|
||||||
|
s.mutVals[key] = mVal
|
||||||
|
}
|
||||||
|
s.mutL.Unlock()
|
||||||
|
|
||||||
|
mVal.l.Lock()
|
||||||
|
defer mVal.l.Unlock()
|
||||||
|
|
||||||
// It's possible something happened between the first check inside the
|
// It's possible something happened between the first check inside the
|
||||||
// read-lock and now, so double check this case. It's still good to have the
|
// read-lock and now, so double check this case. It's still good to have the
|
||||||
// read-lock check there, it'll handle 99% of the cases.
|
// read-lock check there, it'll handle 99% of the cases.
|
||||||
if noCallbackIfSet && val != nil {
|
if noCallbackIfSet && mVal.v != nil {
|
||||||
return val
|
return mVal.v
|
||||||
}
|
}
|
||||||
|
|
||||||
val = fn(val)
|
mVal.v = fn(mVal.v)
|
||||||
if val == nil {
|
|
||||||
delete(s.mutVals, key)
|
// TODO if the new v is nil then key could be deleted out of mutVals. But
|
||||||
} else {
|
// doing so would be weird in the case that there's another routine which
|
||||||
s.mutVals[key] = val
|
// has already pulled this same mVal out of mutVals and is waiting on its
|
||||||
}
|
// mutex.
|
||||||
return val
|
return mVal.v
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package mctx
|
package mctx
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
. "testing"
|
. "testing"
|
||||||
|
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mtest/massert"
|
"github.com/mediocregopher/mediocre-go-lib/mtest/massert"
|
||||||
@ -77,3 +78,36 @@ func TestMutableValues(t *T) {
|
|||||||
|
|
||||||
massert.Fatal(t, massert.All(aa...))
|
massert.Fatal(t, massert.All(aa...))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMutableValuesParallel(t *T) {
|
||||||
|
const events = 1000000
|
||||||
|
const workers = 10
|
||||||
|
|
||||||
|
incr := func(v interface{}) interface{} {
|
||||||
|
if v == nil {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
return v.(int) + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan bool, events)
|
||||||
|
for i := 0; i < events; i++ {
|
||||||
|
ch <- true
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
|
||||||
|
ctx := New()
|
||||||
|
wg := new(sync.WaitGroup)
|
||||||
|
for i := 0; i < workers; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for range ch {
|
||||||
|
GetSetMutableValue(ctx, false, 0, incr)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
massert.Fatal(t, massert.Equal(events, MutableValue(ctx, 0)))
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user