mrun: implement Thread and Wait

This commit is contained in:
Brian Picciano 2018-12-05 23:40:46 -05:00
parent 2f647017b9
commit 57bd022093
3 changed files with 240 additions and 2 deletions

View File

@ -1,9 +1,9 @@
// Package mctx extends the builtin context package to organize Contexts into a // Package mctx extends the builtin context package to organize Contexts into a
// heirarchy. Each node in the hierarchy is given a name and is aware of all of // hierarchy. Each node in the hierarchy is given a name and is aware of all of
// its ancestors. // its ancestors.
// //
// This package also provides extra functionality which allows contexts // This package also provides extra functionality which allows contexts
// to be more useful when used in the heirarchy. // to be more useful when used in the hierarchy.
// //
// All functions and methods in this package are thread-safe unless otherwise // All functions and methods in this package are thread-safe unless otherwise
// noted. // noted.

96
mrun/mrun.go Normal file
View File

@ -0,0 +1,96 @@
// Package mrun TODO
package mrun
import (
"errors"
"github.com/mediocregopher/mediocre-go-lib/mctx"
)
type futureErr struct {
doneCh chan struct{}
err error
}
func newFutureErr() *futureErr {
return &futureErr{
doneCh: make(chan struct{}),
}
}
func (fe *futureErr) get(cancelCh <-chan struct{}) (error, bool) {
select {
case <-fe.doneCh:
return fe.err, true
case <-cancelCh:
return nil, false
}
}
func (fe *futureErr) set(err error) {
fe.err = err
close(fe.doneCh)
}
type ctxKey int
// Thread spawns a go-routine which executes the given function. When the passed
// in Context is cancceled the Context within all threads spawned from it will
// be canceled as well.
//
// See Wait for accompanying functionality.
func Thread(ctx mctx.Context, fn func(mctx.Context) error) {
futErr := newFutureErr()
mctx.GetSetMutableValue(ctx, false, ctxKey(0), func(i interface{}) interface{} {
futErrs, ok := i.([]*futureErr)
if !ok {
futErrs = make([]*futureErr, 0, 1)
}
return append(futErrs, futErr)
})
go func() {
futErr.set(fn(ctx))
}()
}
// ErrDone is returned from Wait if cancelCh is closed before all threads have
// returned.
var ErrDone = errors.New("Wait is done waiting")
// Wait blocks until all go-routines spawned using Thread on the passed in
// Context, and all of its children, have returned. Any number of the threads
// may have returned already when Wait is called.
//
// If any of the thread functions returned an error during its runtime Wait will
// return that error. If multiple returned an error only one of those will be
// returned. TODO: Handle multi-errors better.
//
// If cancelCh is not nil and is closed before all threads have returned then
// this function stops waiting and returns ErrDone.
//
// Wait is safe to call in parallel, and will return the same result if called
// multiple times in sequence. If new Thread calls have been made since the last
// Wait call, the results of those calls will be waited upon during subsequent
// Wait calls.
func Wait(ctx mctx.Context, cancelCh <-chan struct{}) error {
// First wait for all the children, and see if any of them return an error
children := mctx.Children(ctx)
for _, childCtx := range children {
if err := Wait(childCtx, cancelCh); err != nil {
return err
}
}
futErrs, _ := mctx.MutableValue(ctx, ctxKey(0)).([]*futureErr)
for _, futErr := range futErrs {
err, ok := futErr.get(cancelCh)
if !ok {
return ErrDone
} else if err != nil {
return err
}
}
return nil
}

142
mrun/mrun_test.go Normal file
View File

@ -0,0 +1,142 @@
package mrun
import (
"errors"
. "testing"
"time"
"github.com/mediocregopher/mediocre-go-lib/mctx"
)
func TestThreadWait(t *T) {
testErr := errors.New("test error")
cancelCh := func(t time.Duration) <-chan struct{} {
tCtx, _ := mctx.WithTimeout(mctx.New(), t*2)
return tCtx.Done()
}
wait := func(ctx mctx.Context, shouldTake time.Duration) error {
start := time.Now()
err := Wait(ctx, cancelCh(shouldTake*2))
if took := time.Since(start); took < shouldTake || took > shouldTake*4/3 {
t.Fatalf("wait took %v, should have taken %v", took, shouldTake)
}
return err
}
t.Run("noChildren", func(t *T) {
t.Run("noBlock", func(t *T) {
t.Run("noErr", func(t *T) {
ctx := mctx.New()
Thread(ctx, func(mctx.Context) error { return nil })
if err := Wait(ctx, nil); err != nil {
t.Fatal(err)
}
})
t.Run("err", func(t *T) {
ctx := mctx.New()
Thread(ctx, func(mctx.Context) error { return testErr })
if err := Wait(ctx, nil); err != testErr {
t.Fatalf("should have got test error, got: %v", err)
}
})
})
t.Run("block", func(t *T) {
t.Run("noErr", func(t *T) {
ctx := mctx.New()
Thread(ctx, func(mctx.Context) error {
time.Sleep(1 * time.Second)
return nil
})
if err := wait(ctx, 1*time.Second); err != nil {
t.Fatal(err)
}
})
t.Run("err", func(t *T) {
ctx := mctx.New()
Thread(ctx, func(mctx.Context) error {
time.Sleep(1 * time.Second)
return testErr
})
if err := wait(ctx, 1*time.Second); err != testErr {
t.Fatalf("should have got test error, got: %v", err)
}
})
t.Run("canceled", func(t *T) {
ctx := mctx.New()
Thread(ctx, func(mctx.Context) error {
time.Sleep(5 * time.Second)
return testErr
})
if err := Wait(ctx, cancelCh(500*time.Millisecond)); err != ErrDone {
t.Fatalf("should have got ErrDone, got: %v", err)
}
})
})
})
ctxWithChild := func() (mctx.Context, mctx.Context) {
ctx := mctx.New()
return ctx, mctx.ChildOf(ctx, "child")
}
t.Run("children", func(t *T) {
t.Run("noBlock", func(t *T) {
t.Run("noErr", func(t *T) {
ctx, childCtx := ctxWithChild()
Thread(childCtx, func(mctx.Context) error { return nil })
if err := Wait(ctx, nil); err != nil {
t.Fatal(err)
}
})
t.Run("err", func(t *T) {
ctx, childCtx := ctxWithChild()
Thread(childCtx, func(mctx.Context) error { return testErr })
if err := Wait(ctx, nil); err != testErr {
t.Fatalf("should have got test error, got: %v", err)
}
})
})
t.Run("block", func(t *T) {
t.Run("noErr", func(t *T) {
ctx, childCtx := ctxWithChild()
Thread(childCtx, func(mctx.Context) error {
time.Sleep(1 * time.Second)
return nil
})
if err := wait(ctx, 1*time.Second); err != nil {
t.Fatal(err)
}
})
t.Run("err", func(t *T) {
ctx, childCtx := ctxWithChild()
Thread(childCtx, func(mctx.Context) error {
time.Sleep(1 * time.Second)
return testErr
})
if err := wait(ctx, 1*time.Second); err != testErr {
t.Fatalf("should have got test error, got: %v", err)
}
})
t.Run("canceled", func(t *T) {
ctx, childCtx := ctxWithChild()
Thread(childCtx, func(mctx.Context) error {
time.Sleep(5 * time.Second)
return testErr
})
if err := Wait(ctx, cancelCh(500*time.Millisecond)); err != ErrDone {
t.Fatalf("should have got ErrDone, got: %v", err)
}
})
})
})
}