mcfg: refactor slightly to separate Run into StartRun and StopRun

This commit is contained in:
Brian Picciano 2018-05-28 02:39:56 +00:00
parent 12df459840
commit 4e0d440d09
3 changed files with 32 additions and 46 deletions

View File

@ -6,7 +6,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"time"
) )
// TODO Sources: // TODO Sources:
@ -33,6 +32,9 @@ func (h *Hook) Then(h2 Hook) {
*h = func(ctx context.Context) error { *h = func(ctx context.Context) error {
if err := oldh(ctx); err != nil { if err := oldh(ctx); err != nil {
return err return err
} else if err := ctx.Err(); err != nil {
// in case the previous hook doesn't respect the context
return err
} }
return h2(ctx) return h2(ctx)
} }
@ -48,8 +50,11 @@ func (h *Hook) Also(h2 Hook) {
go func() { go func() {
errCh <- oldh(ctx) errCh <- oldh(ctx)
}() }()
err := h2(ctx) err := h2(ctx) // don't immediately return this, wait for h2 or ctx
if err := <-errCh; err != nil { if err := ctx.Err(); err != nil {
// in case the previous hook doesn't respect the context
return err
} else if err := <-errCh; err != nil {
return err return err
} }
return err return err
@ -100,28 +105,18 @@ type Cfg struct {
// separate go-routine // separate go-routine
Start Hook Start Hook
// Default 2 minutes. Timeout within which the Start Hook (and the Start
// Hooks of all children of this Cfg) must complete.
StartTimeout time.Duration
// Stop hook is performed on interrupt signal, and should stop all // Stop hook is performed on interrupt signal, and should stop all
// go-routines and close all resource handlers created during Start // go-routines and close all resource handlers created during Start
Stop Hook Stop Hook
// Default 30 seconds. Timeout within which the Stop Hook (and the Stop
// Hooks of all children of this Cfg) must complete.
StopTimeout time.Duration
} }
// New initializes and returns an empty Cfg with default values filled in // New initializes and returns an empty Cfg with default values filled in
func New() *Cfg { func New() *Cfg {
return &Cfg{ return &Cfg{
Children: map[string]*Cfg{}, Children: map[string]*Cfg{},
Params: map[string]Param{}, Params: map[string]Param{},
Start: Nop(), Start: Nop(),
StartTimeout: 2 * time.Minute, Stop: Nop(),
Stop: Nop(),
StopTimeout: 30 * time.Second,
} }
} }
@ -196,44 +191,35 @@ func (c *Cfg) runPreBlock(ctx context.Context, src Source) error {
return err return err
} }
startCtx, cancel := context.WithTimeout(ctx, c.StartTimeout) return c.Start(ctx)
defer cancel()
return c.Start(startCtx)
} }
// Run blocks while performing all steps of a Cfg run. The steps, in order, are; // StartRun blocks while performing all steps of a Cfg run. The steps, in order,
// are:
// * Populate all configuration parameters // * Populate all configuration parameters
// * Recursively perform Start hooks, depth first // * Perform Start hooks
// * Block till the passed in context is cancelled
// * Recursively perform Stop hooks, depth first
// //
// If any step returns an error then everything returns that error immediately. // If any step returns an error then everything returns that error immediately.
// func (c *Cfg) StartRun(ctx context.Context, src Source) error {
// A caveat about Run is that the error case doesn't leave a lot of room for a return c.runPreBlock(ctx, src)
// proper cleanup. If you care about that sort of thing you'll need to handle it
// yourself.
func (c *Cfg) Run(ctx context.Context, src Source) error {
if err := c.runPreBlock(ctx, src); err != nil {
return err
}
<-ctx.Done()
stopCtx, cancel := context.WithTimeout(context.Background(), c.StopTimeout)
defer cancel()
return c.Stop(stopCtx)
} }
// TestRun is like Run, except it's intended to only be used during tests to // StartTestRun is like StartRun, except it's intended to only be used during
// initialize other entities which are going to actually be tested. It assumes // tests to initialize other entities which are going to actually be tested. It
// all default configuration param values, and will return after the Start hook // assumes all default configuration param values, and will return after the
// has completed. It will panic on any errors. // Start hook has completed. It will panic on any errors.
func (c *Cfg) TestRun() { func (c *Cfg) StartTestRun() {
if err := c.runPreBlock(context.Background(), nil); err != nil { if err := c.runPreBlock(context.Background(), nil); err != nil {
panic(err) panic(err)
} }
} }
// StopRun blocks while calling the Stop hook of the Cfg, returning any error
// that it does.
func (c *Cfg) StopRun(ctx context.Context) error {
return c.Stop(ctx)
}
// Child returns a sub-Cfg of the callee with the given name. The name will be // Child returns a sub-Cfg of the callee with the given name. The name will be
// prepended to all configuration options created in the returned sub-Cfg, and // prepended to all configuration options created in the returned sub-Cfg, and
// must not be empty. // must not be empty.

View File

@ -25,7 +25,7 @@ func TestHook(t *T) {
}) })
errCh := make(chan error) errCh := make(chan error)
go func() { go func() {
errCh <- h(nil) errCh <- h(context.Background())
}() }()
assert.True(t, <-aCh) assert.True(t, <-aCh)
@ -63,7 +63,7 @@ func TestHook(t *T) {
}) })
errCh := make(chan error) errCh := make(chan error)
go func() { go func() {
errCh <- h(nil) errCh <- h(context.Background())
}() }()
// both channels should get written to, then closed, then errCh should // both channels should get written to, then closed, then errCh should

View File

@ -16,7 +16,7 @@ var testPS *PubSub
func init() { func init() {
cfg := mcfg.New() cfg := mcfg.New()
testPS = CfgPubSub(cfg, "test") testPS = CfgPubSub(cfg, "test")
cfg.TestRun() cfg.StartTestRun()
} }
// this requires the pubsub emulator to be running // this requires the pubsub emulator to be running