mediocre-go-lib/mrun/mrun.go

129 lines
3.4 KiB
Go
Raw Normal View History

2019-05-18 18:36:53 +00:00
// Package mrun provides the ability to register callback hooks on contexts, as
// well as some convenience functions which allow using a context as a
// wait-group.
//
// Hooks
//
// Hooks are registered onto contexts and later called in bulk. mrun will take
// into account the order Hooks are registered, including Hooks within a
// context's children (see mctx package), and execute them in the same order
// they were registered. For example:
//
// newHook := func(i int) mrun.Hook {
// return func(context.Context) error {
// fmt.Println(i)
// return nil
// }
// }
//
// ctx := context.Background()
// ctx = mrun.WithStartHook(ctx, newHook(0))
//
// child := mctx.NewChild(ctx, "child")
// child = mrun.WithStartHook(child, newHook(1))
// ctx = mctx.WithChild(ctx, child)
//
// ctx = mrun.WithStartHook(ctx, newHook(2))
// mrun.Start(ctx) // prints "0", "1", then "2"
//
2018-12-06 04:40:46 +00:00
package mrun
import (
"context"
2018-12-06 04:40:46 +00:00
"errors"
"github.com/mediocregopher/mediocre-go-lib/mctx"
)
type futureErr struct {
doneCh chan struct{}
err error
}
func newFutureErr() *futureErr {
return &futureErr{
doneCh: make(chan struct{}),
}
}
func (fe *futureErr) get(cancelCh <-chan struct{}) (error, bool) {
select {
case <-fe.doneCh:
return fe.err, true
case <-cancelCh:
return nil, false
}
}
func (fe *futureErr) set(err error) {
fe.err = err
close(fe.doneCh)
}
type threadCtxKey int
2018-12-06 04:40:46 +00:00
2019-05-18 18:36:53 +00:00
// WithThreads spawns n go-routines, each of which executes the given function.
// The returned Context tracks these go-routines, and can then be passed into
// the Wait function to block until the spawned go-routines all return.
2019-02-24 22:38:05 +00:00
func WithThreads(ctx context.Context, n uint, fn func() error) context.Context {
// I dunno why this would happen, but it wouldn't actually hurt anything
if n == 0 {
return ctx
}
oldFutErrs, _ := ctx.Value(threadCtxKey(0)).([]*futureErr)
2019-02-24 22:38:05 +00:00
futErrs := make([]*futureErr, len(oldFutErrs), len(oldFutErrs)+int(n))
copy(futErrs, oldFutErrs)
2018-12-06 04:40:46 +00:00
2019-02-24 22:38:05 +00:00
for i := uint(0); i < n; i++ {
futErr := newFutureErr()
futErrs = append(futErrs, futErr)
go func() {
futErr.set(fn())
}()
}
2019-02-24 22:38:05 +00:00
return context.WithValue(ctx, threadCtxKey(0), futErrs)
2018-12-06 04:40:46 +00:00
}
// ErrDone is returned from Wait if cancelCh is closed before all threads have
// returned.
var ErrDone = errors.New("Wait is done waiting")
2019-02-24 22:38:05 +00:00
// Wait blocks until all go-routines spawned using WithThreads on the passed in
// Context (and its predecessors) have returned. Any number of the go-routines
2019-02-24 22:38:05 +00:00
// may have returned already when Wait is called, and not all go-routines need
2019-05-18 18:36:53 +00:00
// to be from the same WithThreads call.
2018-12-06 04:40:46 +00:00
//
// If any of the thread functions returned an error during its runtime Wait will
// return that error. If multiple returned an error only one of those will be
// returned. TODO: Handle multi-errors better.
//
// If cancelCh is not nil and is closed before all threads have returned then
// this function stops waiting and returns ErrDone.
//
// Wait is safe to call in parallel, and will return the same result if called
2019-02-24 22:38:05 +00:00
// multiple times.
func Wait(ctx context.Context, cancelCh <-chan struct{}) error {
2018-12-06 04:40:46 +00:00
// First wait for all the children, and see if any of them return an error
children := mctx.Children(ctx)
for _, childCtx := range children {
if err := Wait(childCtx, cancelCh); err != nil {
return err
}
}
futErrs, _ := ctx.Value(threadCtxKey(0)).([]*futureErr)
2018-12-06 04:40:46 +00:00
for _, futErr := range futErrs {
err, ok := futErr.get(cancelCh)
if !ok {
return ErrDone
} else if err != nil {
return err
}
}
return nil
}