mrun: fix Threads after changes to mctx which got rid of children

This commit is contained in:
Brian Picciano 2019-06-15 17:41:20 -06:00
parent 8bd0664ba0
commit 31ed0fe625
2 changed files with 39 additions and 118 deletions

View File

@ -30,8 +30,6 @@ package mrun
import ( import (
"context" "context"
"errors" "errors"
"github.com/mediocregopher/mediocre-go-lib/mctx"
) )
type futureErr struct { type futureErr struct {
@ -105,14 +103,6 @@ var ErrDone = errors.New("Wait is done waiting")
// Wait is safe to call in parallel, and will return the same result if called // Wait is safe to call in parallel, and will return the same result if called
// multiple times. // multiple times.
func Wait(ctx context.Context, cancelCh <-chan struct{}) error { func Wait(ctx context.Context, cancelCh <-chan struct{}) error {
// First wait for all the children, and see if any of them return an error
children := mctx.Children(ctx)
for _, childCtx := range children {
if err := Wait(childCtx, cancelCh); err != nil {
return err
}
}
futErrs, _ := ctx.Value(threadCtxKey(0)).([]*futureErr) futErrs, _ := ctx.Value(threadCtxKey(0)).([]*futureErr)
for _, futErr := range futErrs { for _, futErr := range futErrs {
err, ok := futErr.get(cancelCh) err, ok := futErr.get(cancelCh)

View File

@ -5,8 +5,6 @@ import (
"errors" "errors"
. "testing" . "testing"
"time" "time"
"github.com/mediocregopher/mediocre-go-lib/mctx"
) )
func TestThreadWait(t *T) { func TestThreadWait(t *T) {
@ -26,123 +24,56 @@ func TestThreadWait(t *T) {
return err return err
} }
t.Run("noChildren", func(t *T) { t.Run("noBlock", func(t *T) {
t.Run("noBlock", func(t *T) { t.Run("noErr", func(t *T) {
t.Run("noErr", func(t *T) { ctx := context.Background()
ctx := context.Background() ctx = WithThreads(ctx, 1, func() error { return nil })
ctx = WithThreads(ctx, 1, func() error { return nil }) if err := Wait(ctx, nil); err != nil {
if err := Wait(ctx, nil); err != nil { t.Fatal(err)
t.Fatal(err) }
}
})
t.Run("err", func(t *T) {
ctx := context.Background()
ctx = WithThreads(ctx, 1, func() error { return testErr })
if err := Wait(ctx, nil); err != testErr {
t.Fatalf("should have got test error, got: %v", err)
}
})
}) })
t.Run("block", func(t *T) { t.Run("err", func(t *T) {
t.Run("noErr", func(t *T) { ctx := context.Background()
ctx := context.Background() ctx = WithThreads(ctx, 1, func() error { return testErr })
ctx = WithThreads(ctx, 1, func() error { if err := Wait(ctx, nil); err != testErr {
time.Sleep(1 * time.Second) t.Fatalf("should have got test error, got: %v", err)
return nil }
})
if err := wait(ctx, 1*time.Second); err != nil {
t.Fatal(err)
}
})
t.Run("err", func(t *T) {
ctx := context.Background()
ctx = WithThreads(ctx, 1, func() error {
time.Sleep(1 * time.Second)
return testErr
})
if err := wait(ctx, 1*time.Second); err != testErr {
t.Fatalf("should have got test error, got: %v", err)
}
})
t.Run("canceled", func(t *T) {
ctx := context.Background()
ctx = WithThreads(ctx, 1, func() error {
time.Sleep(5 * time.Second)
return testErr
})
if err := Wait(ctx, cancelCh(500*time.Millisecond)); err != ErrDone {
t.Fatalf("should have got ErrDone, got: %v", err)
}
})
}) })
}) })
ctxWithChild := func() (context.Context, context.Context) { t.Run("block", func(t *T) {
ctx := context.Background() t.Run("noErr", func(t *T) {
return ctx, mctx.NewChild(ctx, "child") ctx := context.Background()
} ctx = WithThreads(ctx, 1, func() error {
time.Sleep(1 * time.Second)
t.Run("children", func(t *T) { return nil
t.Run("noBlock", func(t *T) {
t.Run("noErr", func(t *T) {
ctx, childCtx := ctxWithChild()
childCtx = WithThreads(childCtx, 1, func() error { return nil })
ctx = mctx.WithChild(ctx, childCtx)
if err := Wait(ctx, nil); err != nil {
t.Fatal(err)
}
})
t.Run("err", func(t *T) {
ctx, childCtx := ctxWithChild()
childCtx = WithThreads(childCtx, 1, func() error { return testErr })
ctx = mctx.WithChild(ctx, childCtx)
if err := Wait(ctx, nil); err != testErr {
t.Fatalf("should have got test error, got: %v", err)
}
}) })
if err := wait(ctx, 1*time.Second); err != nil {
t.Fatal(err)
}
}) })
t.Run("block", func(t *T) { t.Run("err", func(t *T) {
t.Run("noErr", func(t *T) { ctx := context.Background()
ctx, childCtx := ctxWithChild() ctx = WithThreads(ctx, 1, func() error {
childCtx = WithThreads(childCtx, 1, func() error { time.Sleep(1 * time.Second)
time.Sleep(1 * time.Second) return testErr
return nil
})
ctx = mctx.WithChild(ctx, childCtx)
if err := wait(ctx, 1*time.Second); err != nil {
t.Fatal(err)
}
}) })
if err := wait(ctx, 1*time.Second); err != testErr {
t.Fatalf("should have got test error, got: %v", err)
}
})
t.Run("err", func(t *T) { t.Run("canceled", func(t *T) {
ctx, childCtx := ctxWithChild() ctx := context.Background()
childCtx = WithThreads(childCtx, 1, func() error { ctx = WithThreads(ctx, 1, func() error {
time.Sleep(1 * time.Second) time.Sleep(5 * time.Second)
return testErr return testErr
})
ctx = mctx.WithChild(ctx, childCtx)
if err := wait(ctx, 1*time.Second); err != testErr {
t.Fatalf("should have got test error, got: %v", err)
}
})
t.Run("canceled", func(t *T) {
ctx, childCtx := ctxWithChild()
childCtx = WithThreads(childCtx, 1, func() error {
time.Sleep(5 * time.Second)
return testErr
})
ctx = mctx.WithChild(ctx, childCtx)
if err := Wait(ctx, cancelCh(500*time.Millisecond)); err != ErrDone {
t.Fatalf("should have got ErrDone, got: %v", err)
}
}) })
if err := Wait(ctx, cancelCh(500*time.Millisecond)); err != ErrDone {
t.Fatalf("should have got ErrDone, got: %v", err)
}
}) })
}) })
} }