From 92ecb09707b852e21e6c9e8b91ef44f3dde39115 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Thu, 10 Jan 2019 17:22:58 -0500 Subject: [PATCH] mctx: make GetSetMutableValue lock on the key itself, not the entire map, to allow more throughput when being used on multiple keys at once --- mctx/ctx.go | 60 ++++++++++++++++++++++++++++++------------------ mctx/ctx_test.go | 34 +++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 22 deletions(-) diff --git a/mctx/ctx.go b/mctx/ctx.go index 2fa3415..e56e12e 100644 --- a/mctx/ctx.go +++ b/mctx/ctx.go @@ -50,6 +50,11 @@ func WithTimeout(parent Context, d time.Duration) (Context, CancelFunc) { type ctxKey int +type mutVal struct { + l sync.RWMutex + v interface{} +} + type ctxState struct { path []string l sync.RWMutex @@ -57,7 +62,7 @@ type ctxState struct { children map[string]Context mutL sync.RWMutex - mutVals map[interface{}]interface{} + mutVals map[interface{}]*mutVal } func getCtxState(ctx Context) *ctxState { @@ -147,6 +152,9 @@ func ChildOf(ctx Context, name string) Context { return childCtx } +//////////////////////////////////////////////////////////////////////////////// +// code related to mutable values + // MutableValue acts like the Value method, except that it only deals with // keys/values set by SetMutableValue. func MutableValue(ctx Context, key interface{}) interface{} { @@ -155,8 +163,12 @@ func MutableValue(ctx Context, key interface{}) interface{} { defer s.mutL.RUnlock() if s.mutVals == nil { return nil + } else if mVal, ok := s.mutVals[key]; ok { + mVal.l.RLock() + defer mVal.l.RUnlock() + return mVal.v } - return s.mutVals[key] + return nil } // GetSetMutableValue is used to interact with a mutable value on the context in @@ -172,43 +184,47 @@ func MutableValue(ctx Context, key interface{}) interface{} { // Children of this context will _not_ inherit any of its mutable values. // // Within the callback it is fine to call other functions/methods on the -// Context, except for those related to mutable values (e.g. MutableValue and -// SetMutableValue). +// Context, except for those related to mutable values for this same key (e.g. +// MutableValue and SetMutableValue). func GetSetMutableValue( ctx Context, noCallbackIfSet bool, key interface{}, fn func(interface{}) interface{}, ) interface{} { s := getCtxState(ctx) + // if noCallbackIfSet, do a fast lookup with MutableValue first. if noCallbackIfSet { - s.mutL.RLock() - if s.mutVals != nil && s.mutVals[key] != nil { - defer s.mutL.RUnlock() - return s.mutVals[key] + if v := MutableValue(ctx, key); v != nil { + return v } - s.mutL.RUnlock() } s.mutL.Lock() - defer s.mutL.Unlock() - if s.mutVals == nil { - s.mutVals = map[interface{}]interface{}{} + s.mutVals = map[interface{}]*mutVal{} } - val := s.mutVals[key] + mVal, ok := s.mutVals[key] + if !ok { + mVal = new(mutVal) + s.mutVals[key] = mVal + } + s.mutL.Unlock() + + mVal.l.Lock() + defer mVal.l.Unlock() // It's possible something happened between the first check inside the // read-lock and now, so double check this case. It's still good to have the // read-lock check there, it'll handle 99% of the cases. - if noCallbackIfSet && val != nil { - return val + if noCallbackIfSet && mVal.v != nil { + return mVal.v } - val = fn(val) - if val == nil { - delete(s.mutVals, key) - } else { - s.mutVals[key] = val - } - return val + mVal.v = fn(mVal.v) + + // TODO if the new v is nil then key could be deleted out of mutVals. But + // doing so would be weird in the case that there's another routine which + // has already pulled this same mVal out of mutVals and is waiting on its + // mutex. + return mVal.v } diff --git a/mctx/ctx_test.go b/mctx/ctx_test.go index 45be4ea..7cf6e64 100644 --- a/mctx/ctx_test.go +++ b/mctx/ctx_test.go @@ -1,6 +1,7 @@ package mctx import ( + "sync" . "testing" "github.com/mediocregopher/mediocre-go-lib/mtest/massert" @@ -77,3 +78,36 @@ func TestMutableValues(t *T) { massert.Fatal(t, massert.All(aa...)) } + +func TestMutableValuesParallel(t *T) { + const events = 1000000 + const workers = 10 + + incr := func(v interface{}) interface{} { + if v == nil { + return 1 + } + return v.(int) + 1 + } + + ch := make(chan bool, events) + for i := 0; i < events; i++ { + ch <- true + } + close(ch) + + ctx := New() + wg := new(sync.WaitGroup) + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for range ch { + GetSetMutableValue(ctx, false, 0, incr) + } + }() + } + + wg.Wait() + massert.Fatal(t, massert.Equal(events, MutableValue(ctx, 0))) +}