Rework how Network background jobs are managed a bit
This commit is contained in:
parent
efdab29ae6
commit
8e264cf028
@ -23,6 +23,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
|
||||||
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
|
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
|
||||||
"golang.org/x/exp/maps"
|
"golang.org/x/exp/maps"
|
||||||
)
|
)
|
||||||
@ -181,13 +182,15 @@ type network struct {
|
|||||||
networkConfig daecommon.NetworkConfig
|
networkConfig daecommon.NetworkConfig
|
||||||
currBootstrap bootstrap.Bootstrap
|
currBootstrap bootstrap.Bootstrap
|
||||||
|
|
||||||
shutdownCh chan struct{}
|
workerCtx context.Context
|
||||||
wg sync.WaitGroup
|
workerCancel context.CancelFunc
|
||||||
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// instatiateNetwork returns an instantiated *network instance which has not yet
|
// instatiateNetwork returns an instantiated *network instance which has not yet
|
||||||
// been initialized.
|
// been initialized.
|
||||||
func instatiateNetwork(
|
func instatiateNetwork(
|
||||||
|
ctx context.Context,
|
||||||
logger *mlog.Logger,
|
logger *mlog.Logger,
|
||||||
networkConfig daecommon.NetworkConfig,
|
networkConfig daecommon.NetworkConfig,
|
||||||
envBinDirPath string,
|
envBinDirPath string,
|
||||||
@ -195,6 +198,8 @@ func instatiateNetwork(
|
|||||||
runtimeDir toolkit.Dir,
|
runtimeDir toolkit.Dir,
|
||||||
opts *Opts,
|
opts *Opts,
|
||||||
) *network {
|
) *network {
|
||||||
|
ctx = context.WithoutCancel(ctx)
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
return &network{
|
return &network{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
networkConfig: networkConfig,
|
networkConfig: networkConfig,
|
||||||
@ -202,7 +207,8 @@ func instatiateNetwork(
|
|||||||
stateDir: stateDir,
|
stateDir: stateDir,
|
||||||
runtimeDir: runtimeDir,
|
runtimeDir: runtimeDir,
|
||||||
opts: opts.withDefaults(),
|
opts: opts.withDefaults(),
|
||||||
shutdownCh: make(chan struct{}),
|
workerCtx: ctx,
|
||||||
|
workerCancel: cancel,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -244,6 +250,7 @@ func Load(
|
|||||||
Network, error,
|
Network, error,
|
||||||
) {
|
) {
|
||||||
n := instatiateNetwork(
|
n := instatiateNetwork(
|
||||||
|
ctx,
|
||||||
logger,
|
logger,
|
||||||
networkConfig,
|
networkConfig,
|
||||||
envBinDirPath,
|
envBinDirPath,
|
||||||
@ -289,6 +296,7 @@ func Join(
|
|||||||
Network, error,
|
Network, error,
|
||||||
) {
|
) {
|
||||||
n := instatiateNetwork(
|
n := instatiateNetwork(
|
||||||
|
ctx,
|
||||||
logger,
|
logger,
|
||||||
networkConfig,
|
networkConfig,
|
||||||
envBinDirPath,
|
envBinDirPath,
|
||||||
@ -355,6 +363,7 @@ func Create(
|
|||||||
garageRPCSecret := toolkit.RandStr(32)
|
garageRPCSecret := toolkit.RandStr(32)
|
||||||
|
|
||||||
n := instatiateNetwork(
|
n := instatiateNetwork(
|
||||||
|
ctx,
|
||||||
logger,
|
logger,
|
||||||
networkConfig,
|
networkConfig,
|
||||||
envBinDirPath,
|
envBinDirPath,
|
||||||
@ -407,6 +416,38 @@ func (n *network) initializeDirs(mayExist bool) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *network) periodically(
|
||||||
|
logger *mlog.Logger,
|
||||||
|
fn func(context.Context) error,
|
||||||
|
period time.Duration,
|
||||||
|
) {
|
||||||
|
n.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer n.wg.Done()
|
||||||
|
|
||||||
|
ctx := mctx.Annotate(n.workerCtx, "period", period)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(period)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
logger.Info(ctx, "Starting background job runner")
|
||||||
|
defer logger.Info(ctx, "Stopping background job runner")
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
|
||||||
|
case <-ticker.C:
|
||||||
|
logger.Info(ctx, "Background job running")
|
||||||
|
if err := fn(ctx); err != nil {
|
||||||
|
logger.Error(ctx, "Background job failed", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
func (n *network) initialize(
|
func (n *network) initialize(
|
||||||
ctx context.Context, prevBootstrap bootstrap.Bootstrap,
|
ctx context.Context, prevBootstrap bootstrap.Bootstrap,
|
||||||
) error {
|
) error {
|
||||||
@ -463,21 +504,11 @@ func (n *network) initialize(
|
|||||||
return fmt.Errorf("Reloading network bootstrap: %w", err)
|
return fmt.Errorf("Reloading network bootstrap: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx = context.WithoutCancel(ctx)
|
n.periodically(
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
n.logger.WithNamespace("reloadHosts"),
|
||||||
n.wg.Add(1)
|
n.reloadHosts,
|
||||||
go func() {
|
3*time.Minute,
|
||||||
defer n.wg.Done()
|
)
|
||||||
<-n.shutdownCh
|
|
||||||
cancel()
|
|
||||||
}()
|
|
||||||
|
|
||||||
n.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer n.wg.Done()
|
|
||||||
n.reloadLoop(ctx)
|
|
||||||
n.logger.Debug(ctx, "Daemon reload loop stopped")
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -575,25 +606,6 @@ func (n *network) reloadHosts(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *network) reloadLoop(ctx context.Context) {
|
|
||||||
const period = 3 * time.Minute
|
|
||||||
ticker := time.NewTicker(period)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
|
|
||||||
case <-ticker.C:
|
|
||||||
if err := n.reloadHosts(ctx); err != nil {
|
|
||||||
n.logger.Error(ctx, "Attempting to reload", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns the bootstrap prior to the reload being applied.
|
// returns the bootstrap prior to the reload being applied.
|
||||||
func (n *network) reload(
|
func (n *network) reload(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
@ -966,10 +978,11 @@ func (n *network) GetNetworkCreationParams(
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *network) Shutdown() error {
|
func (n *network) Shutdown() error {
|
||||||
close(n.shutdownCh)
|
n.workerCancel()
|
||||||
n.wg.Wait()
|
n.wg.Wait()
|
||||||
|
|
||||||
if n.children != nil {
|
if n.children != nil {
|
||||||
|
n.logger.Info(context.Background(), "Shutting down children")
|
||||||
n.children.Shutdown()
|
n.children.Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user