isle/go/daemon/daemon_restarter.go

263 lines
6.2 KiB
Go
Raw Normal View History

package daemon
import (
"bytes"
"context"
"errors"
"fmt"
"isle/bootstrap"
"sync"
"time"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
)
const (
daemonRestarterStateInitializing = iota
daemonRestarterStateOk
daemonRestarterStateRestarting
daemonRestarterStateShutdown
)
type daemonRestarter struct {
logger *mlog.Logger
daemonConfig Config
envBinDirPath string
opts *Opts
l sync.Mutex
state int
inner Daemon
currBootstrap bootstrap.Bootstrap
cancelFn context.CancelFunc
stoppedCh chan struct{}
}
// NewDaemonRestarter will wrap a Daemon such that it will be automatically
// shutdown and re-created whenever there's changes in the cluster which require
// the configuration to be changed (e.g. a new nebula lighthouse).
//
// While still starting up the daemon for the first time all methods will return
// ErrInitializing, except Shutdown which will block until initialization is
// canceled.
//
// During a restart all methods will return ErrRestarting, except Shutdown which
// will block until the currently executing restart is finished and then
// shutdown cleanly from there.
//
// TODO make daemonRestarter smarter, it currently restarts on _any_ change, but
// it should restart itself only when there's something actually requiring a
// restart.
func NewDaemonRestarter(
logger *mlog.Logger,
daemonConfig Config,
envBinDirPath string,
currBootstrap bootstrap.Bootstrap,
opts *Opts,
) Daemon {
ctx, cancelFn := context.WithCancel(context.Background())
dr := &daemonRestarter{
logger: logger,
daemonConfig: daemonConfig,
envBinDirPath: envBinDirPath,
opts: opts.withDefaults(),
currBootstrap: currBootstrap,
cancelFn: cancelFn,
stoppedCh: make(chan struct{}),
}
go func() {
dr.restartLoop(ctx)
dr.logger.Debug(ctx, "DaemonRestarter stopped")
close(dr.stoppedCh)
}()
return dr
}
func withInnerDaemon[Res any](
dr *daemonRestarter, fn func(Daemon) (Res, error),
) (Res, error) {
var zero Res
dr.l.Lock()
inner, state := dr.inner, dr.state
dr.l.Unlock()
switch state {
case daemonRestarterStateInitializing:
return zero, ErrInitializing
case daemonRestarterStateOk:
return fn(inner)
case daemonRestarterStateRestarting:
return zero, ErrRestarting
case daemonRestarterStateShutdown:
return zero, errors.New("already shutdown")
default:
panic(fmt.Sprintf("unknown state %d", dr.state))
}
}
// creates a new bootstrap file using available information from the network. If
// the new bootstrap file is different than the existing one, the existing one
// is overwritten and true is returned.
func (dr *daemonRestarter) checkBootstrap(
ctx context.Context, hostBootstrap bootstrap.Bootstrap,
) (
bootstrap.Bootstrap, bool, error,
) {
thisHost := hostBootstrap.ThisHost()
newHosts, err := dr.inner.GetGarageBootstrapHosts(ctx)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("getting hosts from garage: %w", err)
}
// the daemon's view of this host's bootstrap info takes precedence over
// whatever is in garage
newHosts[thisHost.Name] = thisHost
newHostsHash, err := bootstrap.HostsHash(newHosts)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of new hosts: %w", err)
}
currHostsHash, err := bootstrap.HostsHash(hostBootstrap.Hosts)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of current hosts: %w", err)
}
if bytes.Equal(newHostsHash, currHostsHash) {
return hostBootstrap, false, nil
}
hostBootstrap.Hosts = newHosts
return hostBootstrap, true, nil
}
// blocks until a new bootstrap is available or context is canceled
func (dr *daemonRestarter) watchForChanges(ctx context.Context) bootstrap.Bootstrap {
ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return bootstrap.Bootstrap{}
case <-ticker.C:
dr.logger.Info(ctx, "Checking for changes to bootstrap")
newBootstrap, changed, err := dr.checkBootstrap(
ctx, dr.currBootstrap,
)
if err != nil {
dr.logger.Error(ctx, "Checking bootstrap for changes failed", err)
continue
} else if !changed {
continue
}
err = writeBootstrapToStateDir(dr.opts.EnvVars.StateDirPath, newBootstrap)
if err != nil {
dr.logger.Error(ctx, "Writing new bootstrap to disk failed", err)
continue
}
return newBootstrap
}
}
}
func (dr *daemonRestarter) restartLoop(ctx context.Context) {
defer func() {
dr.l.Lock()
dr.state = daemonRestarterStateShutdown
inner := dr.inner
dr.l.Unlock()
if inner != nil {
if err := inner.Shutdown(); err != nil {
dr.logger.Fatal(ctx, "failed to cleanly shutdown daemon, there may be orphaned child processes", err)
}
}
}()
wait := func(d time.Duration) bool {
select {
case <-ctx.Done():
return false
case <-time.After(d):
return true
}
}
for {
if ctx.Err() != nil {
return
}
dr.logger.Info(ctx, "Creating new daemon")
daemonInst, err := New(
ctx,
dr.logger.WithNamespace("daemon"),
dr.daemonConfig,
dr.currBootstrap,
dr.envBinDirPath,
dr.opts,
)
if errors.Is(err, context.Canceled) {
return
} else if err != nil {
dr.logger.Error(ctx, "failed to initialize daemon", err)
if !wait(1 * time.Second) {
return
}
continue
}
dr.l.Lock()
dr.inner = daemonInst
dr.state = daemonRestarterStateOk
dr.l.Unlock()
newBootstrap := dr.watchForChanges(ctx)
if ctx.Err() != nil {
return
}
dr.logger.Info(ctx, "Bootstrap has changed, will restart daemon")
dr.l.Lock()
dr.currBootstrap = newBootstrap
dr.state = daemonRestarterStateRestarting
dr.l.Unlock()
dr.logger.Info(ctx, "Shutting down previous daemon")
if err := dr.inner.Shutdown(); err != nil {
dr.logger.Fatal(ctx, "failed to cleanly shutdown daemon, there may be orphaned child processes", err)
}
}
}
func (dr *daemonRestarter) GetGarageBootstrapHosts(
ctx context.Context,
) (
map[string]bootstrap.Host, error,
) {
return withInnerDaemon(dr, func(inner Daemon) (map[string]bootstrap.Host, error) {
return inner.GetGarageBootstrapHosts(ctx)
})
}
func (dr *daemonRestarter) Shutdown() error {
dr.cancelFn()
<-dr.stoppedCh
return nil
}