diff --git a/go/cmd/entrypoint/admin.go b/go/cmd/entrypoint/admin.go index ca3262b..cc56e73 100644 --- a/go/cmd/entrypoint/admin.go +++ b/go/cmd/entrypoint/admin.go @@ -166,25 +166,33 @@ var subCmdAdminCreateNetwork = subCmd{ return fmt.Errorf("merging daemon config into bootstrap data: %w", err) } - daemonInst, err := daemon.New( + children, err := daemon.NewChildren( ctx, logger.WithNamespace("daemon"), daemonConfig, hostBootstrap, envBinDirPath, &daemon.Opts{ - // SkipHostBootstrapPush is required, because the global bucket - // hasn't actually been initialized yet, so there's nowhere to - // push to. - SkipHostBootstrapPush: true, - // NOTE both stdout and stderr are sent to stderr, so that the // user can pipe the resulting admin.json to stdout. Stdout: os.Stderr, }, ) if err != nil { - return fmt.Errorf("initializing daemon: %w", err) + return fmt.Errorf("initializing children: %w", err) + } + defer func() { + logger.Info(ctx, "Shutting down child processes") + if err := children.Shutdown(); err != nil { + logger.Error(ctx, "Failed to shut down children cleanly, there may be zombie children leftover", err) + } + }() + + logger.Info(ctx, "Applying garage layout") + if err := daemon.GarageApplyLayout( + ctx, logger, hostBootstrap, daemonConfig, + ); err != nil { + return fmt.Errorf("applying garage layout: %w", err) } logger.Info(ctx, "initializing garage shared global bucket") @@ -199,10 +207,6 @@ var subCmdAdminCreateNetwork = subCmd{ return fmt.Errorf("initializing garage shared global bucket: %w", err) } - if err := daemonInst.Shutdown(); err != nil { - return fmt.Errorf("shutting down daemon: %w (this can mean there are zombie children leftover)", err) - } - hostBootstrap.Garage.GlobalBucketS3APICredentials = garageGlobalBucketCreds // rewrite the bootstrap now that the global bucket creds have been @@ -211,7 +215,7 @@ var subCmdAdminCreateNetwork = subCmd{ return fmt.Errorf("writing bootstrap file: %w", err) } - logger.Info(ctx, "cluster initialized successfully, writing admin.json to stdout") + logger.Info(ctx, "Network initialized successfully, writing admin.json to stdout") adm := admin.Admin{ CreationParams: adminCreationParams, diff --git a/go/cmd/entrypoint/daemon.go b/go/cmd/entrypoint/daemon.go index 32f1c91..d9db84f 100644 --- a/go/cmd/entrypoint/daemon.go +++ b/go/cmd/entrypoint/daemon.go @@ -127,7 +127,7 @@ var subCmdDaemon = subCmd{ return fmt.Errorf("merging daemon config into bootstrap data: %w", err) } - daemonInst := daemon.NewDaemonRestarter( + daemonInst := daemon.NewDaemon( logger, daemonConfig, envBinDirPath, hostBootstrap, nil, ) defer func() { diff --git a/go/daemon/child_garage.go b/go/daemon/child_garage.go index 298980e..5abc2a3 100644 --- a/go/daemon/child_garage.go +++ b/go/daemon/child_garage.go @@ -39,9 +39,14 @@ func NewGarageAdminClient( ) } -func (d *daemon) waitForGarage(ctx context.Context) error { +func waitForGarage( + ctx context.Context, + logger *mlog.Logger, + daemonConfig Config, + hostBootstrap bootstrap.Bootstrap, +) error { - allocs := d.config.Storage.Allocations + allocs := daemonConfig.Storage.Allocations // if this host doesn't have any allocations specified then fall back to // waiting for nebula @@ -49,23 +54,23 @@ func (d *daemon) waitForGarage(ctx context.Context) error { return nil } - adminClientLogger := garageAdminClientLogger(d.logger) + adminClientLogger := garageAdminClientLogger(logger) for _, alloc := range allocs { adminAddr := net.JoinHostPort( - d.hostBootstrap.ThisHost().IP().String(), + hostBootstrap.ThisHost().IP().String(), strconv.Itoa(alloc.AdminPort), ) adminClient := garage.NewAdminClient( adminClientLogger, adminAddr, - d.hostBootstrap.Garage.AdminToken, + hostBootstrap.Garage.AdminToken, ) ctx := mctx.Annotate(ctx, "garageAdminAddr", adminAddr) - d.logger.Debug(ctx, "waiting for garage instance to start") + logger.Debug(ctx, "waiting for garage instance to start") if err := adminClient.Wait(ctx); err != nil { return fmt.Errorf("waiting for garage instance %q to start up: %w", adminAddr, err) @@ -139,9 +144,10 @@ func garageWriteChildConfig( } func garagePmuxProcConfigs( + logger *mlog.Logger, runtimeDirPath, binDirPath string, - hostBootstrap bootstrap.Bootstrap, daemonConfig Config, + hostBootstrap bootstrap.Bootstrap, ) ( []pmuxlib.ProcessConfig, error, ) { @@ -162,7 +168,7 @@ func garagePmuxProcConfigs( Cmd: filepath.Join(binDirPath, "garage"), Args: []string{"-c", childConfigPath, "server"}, StartAfterFunc: func(ctx context.Context) error { - return waitForNebula(ctx, hostBootstrap) + return waitForNebula(ctx, logger, hostBootstrap) }, }) } @@ -170,7 +176,9 @@ func garagePmuxProcConfigs( return pmuxProcConfigs, nil } -func garageApplyLayout( +// TODO don't expose this publicly once cluster creation is done via Daemon +// interface. +func GarageApplyLayout( ctx context.Context, logger *mlog.Logger, hostBootstrap bootstrap.Bootstrap, diff --git a/go/daemon/child_nebula.go b/go/daemon/child_nebula.go index d0f6770..bcc58d4 100644 --- a/go/daemon/child_nebula.go +++ b/go/daemon/child_nebula.go @@ -9,6 +9,8 @@ import ( "path/filepath" "code.betamike.com/micropelago/pmux/pmuxlib" + "dev.mediocregopher.com/mediocre-go-lib.git/mctx" + "dev.mediocregopher.com/mediocre-go-lib.git/mlog" "github.com/slackhq/nebula/cert" ) @@ -17,28 +19,37 @@ import ( // its source. If this succeeds we can assume that at the very least the nebula // interface has been initialized. func waitForNebula( - ctx context.Context, hostBootstrap bootstrap.Bootstrap, + ctx context.Context, logger *mlog.Logger, hostBootstrap bootstrap.Bootstrap, ) error { + var ( + ip = hostBootstrap.ThisHost().IP() + lUDPAddr = &net.UDPAddr{IP: ip, Port: 0} + rUDPAddr = &net.UDPAddr{IP: ip, Port: 45535} + ) - ip := hostBootstrap.ThisHost().IP() + ctx = mctx.Annotate(ctx, "lUDPAddr", lUDPAddr, "rUDPAddr", rUDPAddr) - lUdpAddr := &net.UDPAddr{IP: ip, Port: 0} - rUdpAddr := &net.UDPAddr{IP: ip, Port: 45535} + until( + ctx, + logger, + "Creating UDP socket from nebula addr", + func(context.Context) error { + conn, err := net.DialUDP("udp", lUDPAddr, rUDPAddr) + if err != nil { + return err + } + conn.Close() + return nil + }, + ) - return until(ctx, func(context.Context) error { - conn, err := net.DialUDP("udp", lUdpAddr, rUdpAddr) - if err != nil { - return err - } - conn.Close() - return nil - }) + return ctx.Err() } func nebulaPmuxProcConfig( runtimeDirPath, binDirPath string, - hostBootstrap bootstrap.Bootstrap, daemonConfig Config, + hostBootstrap bootstrap.Bootstrap, ) ( pmuxlib.ProcessConfig, error, ) { diff --git a/go/daemon/child_pmux.go b/go/daemon/child_pmux.go index 9270961..0a5b702 100644 --- a/go/daemon/child_pmux.go +++ b/go/daemon/child_pmux.go @@ -3,26 +3,29 @@ package daemon import ( "context" "fmt" + "isle/bootstrap" "code.betamike.com/micropelago/pmux/pmuxlib" ) -func (d *daemon) newPmuxConfig() (pmuxlib.Config, error) { +func (c *Children) newPmuxConfig( + binDirPath string, daemonConfig Config, hostBootstrap bootstrap.Bootstrap, +) (pmuxlib.Config, error) { nebulaPmuxProcConfig, err := nebulaPmuxProcConfig( - d.opts.EnvVars.RuntimeDirPath, - d.binDirPath, - d.hostBootstrap, - d.config, + c.opts.EnvVars.RuntimeDirPath, + binDirPath, + daemonConfig, + hostBootstrap, ) if err != nil { return pmuxlib.Config{}, fmt.Errorf("generating nebula config: %w", err) } dnsmasqPmuxProcConfig, err := dnsmasqPmuxProcConfig( - d.opts.EnvVars.RuntimeDirPath, - d.binDirPath, - d.hostBootstrap, - d.config, + c.opts.EnvVars.RuntimeDirPath, + binDirPath, + hostBootstrap, + daemonConfig, ) if err != nil { return pmuxlib.Config{}, fmt.Errorf( @@ -31,7 +34,11 @@ func (d *daemon) newPmuxConfig() (pmuxlib.Config, error) { } garagePmuxProcConfigs, err := garagePmuxProcConfigs( - d.opts.EnvVars.RuntimeDirPath, d.binDirPath, d.hostBootstrap, d.config, + c.logger, + c.opts.EnvVars.RuntimeDirPath, + binDirPath, + daemonConfig, + hostBootstrap, ) if err != nil { return pmuxlib.Config{}, fmt.Errorf( @@ -50,46 +57,21 @@ func (d *daemon) newPmuxConfig() (pmuxlib.Config, error) { }, nil } -func (d *daemon) postPmuxInit(ctx context.Context) error { - d.logger.Info(ctx, "waiting for nebula VPN to come online") - if err := waitForNebula(ctx, d.hostBootstrap); err != nil { +func (c *Children) postPmuxInit( + ctx context.Context, + daemonConfig Config, + hostBootstrap bootstrap.Bootstrap, +) error { + c.logger.Info(ctx, "Waiting for nebula VPN to come online") + if err := waitForNebula(ctx, c.logger, hostBootstrap); err != nil { return fmt.Errorf("waiting for nebula to start: %w", err) } - d.logger.Info(ctx, "waiting for garage instances to come online") - if err := d.waitForGarage(ctx); err != nil { + c.logger.Info(ctx, "Waiting for garage instances to come online") + err := waitForGarage(ctx, c.logger, daemonConfig, hostBootstrap) + if err != nil { return fmt.Errorf("waiting for garage to start: %w", err) } - if len(d.config.Storage.Allocations) > 0 { - - err := until(ctx, func(ctx context.Context) error { - err := garageApplyLayout(ctx, d.logger, d.hostBootstrap, d.config) - if err != nil { - d.logger.Error(ctx, "applying garage layout", err) - return err - } - - return nil - }) - - if err != nil { - return fmt.Errorf("applying garage layout: %w", err) - } - } - - if !d.opts.SkipHostBootstrapPush { - if err := until(ctx, func(ctx context.Context) error { - if err := d.putGarageBoostrapHost(ctx); err != nil { - d.logger.Error(ctx, "updating host info in garage", err) - return err - } - - return nil - }); err != nil { - return fmt.Errorf("updating host info in garage: %w", err) - } - } - return nil } diff --git a/go/daemon/children.go b/go/daemon/children.go new file mode 100644 index 0000000..2df1712 --- /dev/null +++ b/go/daemon/children.go @@ -0,0 +1,86 @@ +package daemon + +import ( + "context" + "fmt" + "isle/bootstrap" + + "code.betamike.com/micropelago/pmux/pmuxlib" + "dev.mediocregopher.com/mediocre-go-lib.git/mlog" +) + +// Children manages all child processes of a network. Child processes are +// comprised of: +// - nebula +// - dnsmasq +// - garage (0 or more, depending on configured storage allocations) +type Children struct { + logger *mlog.Logger + opts Opts + + pmuxCancelFn context.CancelFunc + pmuxStoppedCh chan struct{} +} + +// NewChildren initialized and returns a Children instance. If initialization +// fails an error is returned. +func NewChildren( + ctx context.Context, + logger *mlog.Logger, + daemonConfig Config, + hostBootstrap bootstrap.Bootstrap, + binDirPath string, + opts *Opts, +) ( + *Children, error, +) { + opts = opts.withDefaults() + + pmuxCtx, pmuxCancelFn := context.WithCancel(context.Background()) + + c := &Children{ + logger: logger, + opts: *opts, + pmuxCancelFn: pmuxCancelFn, + pmuxStoppedCh: make(chan struct{}), + } + + pmuxConfig, err := c.newPmuxConfig(binDirPath, daemonConfig, hostBootstrap) + if err != nil { + return nil, fmt.Errorf("generating pmux config: %w", err) + } + + go func() { + defer close(c.pmuxStoppedCh) + pmuxlib.Run(pmuxCtx, c.opts.Stdout, c.opts.Stderr, pmuxConfig) + c.logger.Debug(pmuxCtx, "pmux stopped") + }() + + initErr := c.postPmuxInit(ctx, daemonConfig, hostBootstrap) + if initErr != nil { + logger.Warn(ctx, "failed to initialize Children, shutting down child processes", err) + if err := c.Shutdown(); err != nil { + panic(fmt.Sprintf( + "failed to shut down child processes after initialization"+ + " error, there may be zombie children leftover."+ + " Original error: %v", + initErr, + )) + } + + return nil, initErr + } + + return c, nil +} + +// Shutdown blocks until all child processes have gracefully shut themselves +// down. +// +// If this returns an error then it's possible that child processes are +// still running and are no longer managed. +func (c *Children) Shutdown() error { + c.pmuxCancelFn() + <-c.pmuxStoppedCh + return nil +} diff --git a/go/daemon/daemon.go b/go/daemon/daemon.go index e736099..123a9ee 100644 --- a/go/daemon/daemon.go +++ b/go/daemon/daemon.go @@ -1,15 +1,18 @@ // Package daemon implements the isle daemon, which is a long-running service -// managing all isle background tasks and sub-processes for a single cluster. +// managing all isle background tasks and sub-processes for a single network. package daemon import ( + "bytes" "context" + "errors" "fmt" "io" "isle/bootstrap" "os" + "sync" + "time" - "code.betamike.com/micropelago/pmux/pmuxlib" "dev.mediocregopher.com/mediocre-go-lib.git/mlog" ) @@ -36,10 +39,6 @@ type Daemon interface { // Opts are optional parameters which can be passed in when initializing a new // Daemon instance. A nil Opts is equivalent to a zero value. type Opts struct { - // SkipHostBootstrapPush, if set, will cause the Daemon to not push the - // bootstrap to garage upon a successful initialization. - SkipHostBootstrapPush bool - // Stdout and Stderr are what the associated outputs from child processes // will be directed to. Stdout, Stderr io.Writer @@ -67,73 +66,280 @@ func (o *Opts) withDefaults() *Opts { return o } +const ( + daemonStateInitializing = iota + daemonStateOk + daemonStateRestarting + daemonStateShutdown +) + type daemon struct { logger *mlog.Logger - config Config - hostBootstrap bootstrap.Bootstrap - binDirPath string - opts Opts + daemonConfig Config + envBinDirPath string + opts *Opts - pmuxCancelFn context.CancelFunc - pmuxStoppedCh chan struct{} + l sync.Mutex + state int + children *Children + currBootstrap bootstrap.Bootstrap + + cancelFn context.CancelFunc + stoppedCh chan struct{} } -// New initialized and returns a Daemon. If initialization fails an error is -// returned. -func New( - ctx context.Context, +// NewDaemon initializes and returns a Daemon instance which will manage all +// child processes and state required by the isle service, as well as an HTTP +// socket over which RPC calls will be served. +// +// Inner Children instance(s) will be wrapped such that they will be +// automatically shutdown and re-created whenever there's changes in the network +// which require the configuration to be changed (e.g. a new nebula lighthouse). +// During such an inner restart all methods will return ErrRestarting, except +// Shutdown which will block until the currently executing restart is finished +// and then shutdown cleanly from there. +// +// While still starting up the Daemon for the first time all methods will return +// ErrInitializing, except Shutdown which will block until initialization is +// canceled. +// +// TODO make daemon smarter, it currently restarts on _any_ change, but +// it should restart itself only when there's something actually requiring a +// restart. +func NewDaemon( logger *mlog.Logger, - config Config, - hostBootstrap bootstrap.Bootstrap, - binDirPath string, + daemonConfig Config, + envBinDirPath string, + currBootstrap bootstrap.Bootstrap, opts *Opts, -) ( - Daemon, error, -) { - opts = opts.withDefaults() - - pmuxCtx, pmuxCancelFn := context.WithCancel(context.Background()) +) Daemon { + ctx, cancelFn := context.WithCancel(context.Background()) d := &daemon{ logger: logger, - config: config, - hostBootstrap: hostBootstrap, - binDirPath: binDirPath, - opts: *opts, - pmuxCancelFn: pmuxCancelFn, - pmuxStoppedCh: make(chan struct{}), - } - - pmuxConfig, err := d.newPmuxConfig() - if err != nil { - return nil, fmt.Errorf("generating pmux config: %w", err) + daemonConfig: daemonConfig, + envBinDirPath: envBinDirPath, + opts: opts.withDefaults(), + currBootstrap: currBootstrap, + cancelFn: cancelFn, + stoppedCh: make(chan struct{}), } go func() { - defer close(d.pmuxStoppedCh) - pmuxlib.Run(pmuxCtx, d.opts.Stdout, d.opts.Stderr, pmuxConfig) - d.logger.Debug(pmuxCtx, "pmux stopped") + d.restartLoop(ctx) + d.logger.Debug(ctx, "DaemonRestarter stopped") + close(d.stoppedCh) }() - if initErr := d.postPmuxInit(ctx); initErr != nil { - logger.Warn(ctx, "failed to initialize daemon, shutting down child processes", err) - if err := d.Shutdown(); err != nil { - panic(fmt.Sprintf( - "failed to shut down child processes after initialization"+ - " error, there may be zombie children leftover."+ - " Original error: %v", - initErr, - )) - } + return d +} - return nil, initErr +func withInnerChildren[Res any]( + d *daemon, fn func(*Children) (Res, error), +) (Res, error) { + var zero Res + d.l.Lock() + children, state := d.children, d.state + d.l.Unlock() + + switch state { + case daemonStateInitializing: + return zero, ErrInitializing + case daemonStateOk: + return fn(children) + case daemonStateRestarting: + return zero, ErrRestarting + case daemonStateShutdown: + return zero, errors.New("already shutdown") + default: + panic(fmt.Sprintf("unknown state %d", d.state)) + } +} + +// creates a new bootstrap file using available information from the network. If +// the new bootstrap file is different than the existing one, the existing one +// is overwritten and true is returned. +func (d *daemon) checkBootstrap( + ctx context.Context, hostBootstrap bootstrap.Bootstrap, +) ( + bootstrap.Bootstrap, bool, error, +) { + + thisHost := hostBootstrap.ThisHost() + + newHosts, err := d.getGarageBootstrapHosts(ctx) + if err != nil { + return bootstrap.Bootstrap{}, false, fmt.Errorf("getting hosts from garage: %w", err) } - return d, nil + // the daemon's view of this host's bootstrap info takes precedence over + // whatever is in garage + newHosts[thisHost.Name] = thisHost + + newHostsHash, err := bootstrap.HostsHash(newHosts) + if err != nil { + return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of new hosts: %w", err) + } + + currHostsHash, err := bootstrap.HostsHash(hostBootstrap.Hosts) + if err != nil { + return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of current hosts: %w", err) + } + + if bytes.Equal(newHostsHash, currHostsHash) { + return hostBootstrap, false, nil + } + + hostBootstrap.Hosts = newHosts + + return hostBootstrap, true, nil +} + +// blocks until a new bootstrap is available or context is canceled +func (d *daemon) watchForChanges(ctx context.Context) bootstrap.Bootstrap { + + ticker := time.NewTicker(3 * time.Minute) + defer ticker.Stop() + + for { + select { + + case <-ctx.Done(): + return bootstrap.Bootstrap{} + + case <-ticker.C: + + d.logger.Info(ctx, "Checking for changes to bootstrap") + + newBootstrap, changed, err := d.checkBootstrap( + ctx, d.currBootstrap, + ) + if err != nil { + d.logger.Error(ctx, "Checking bootstrap for changes failed", err) + continue + } else if !changed { + continue + } + + err = writeBootstrapToStateDir(d.opts.EnvVars.StateDirPath, newBootstrap) + if err != nil { + d.logger.Error(ctx, "Writing new bootstrap to disk failed", err) + continue + } + + return newBootstrap + } + } +} + +func (d *daemon) restartLoop(ctx context.Context) { + defer func() { + d.l.Lock() + d.state = daemonStateShutdown + children := d.children + d.l.Unlock() + + if children != nil { + if err := children.Shutdown(); err != nil { + d.logger.Fatal(ctx, "Failed to cleanly shutdown daemon children, there may be orphaned child processes", err) + } + } + }() + + wait := func(d time.Duration) bool { + select { + case <-ctx.Done(): + return false + case <-time.After(d): + return true + } + } + + for { + if ctx.Err() != nil { + return + } + + d.logger.Info(ctx, "Creating new daemon") + children, err := NewChildren( + ctx, + d.logger.WithNamespace("daemon"), + d.daemonConfig, + d.currBootstrap, + d.envBinDirPath, + d.opts, + ) + if errors.Is(err, context.Canceled) { + return + } else if err != nil { + d.logger.Error(ctx, "failed to initialize daemon", err) + if !wait(1 * time.Second) { + return + } + continue + } + + d.l.Lock() + d.children = children + d.state = daemonStateOk + d.l.Unlock() + + if len(d.daemonConfig.Storage.Allocations) > 0 { + if !until( + ctx, + d.logger, + "Applying garage layout", + func(ctx context.Context) error { + return GarageApplyLayout( + ctx, d.logger, d.currBootstrap, d.daemonConfig, + ) + }, + ) { + return + } + } + + if !until( + ctx, + d.logger, + "Updating host info in garage", + func(ctx context.Context) error { + return d.putGarageBoostrapHost(ctx) + }, + ) { + return + } + + newBootstrap := d.watchForChanges(ctx) + if ctx.Err() != nil { + return + } + + d.logger.Info(ctx, "Bootstrap has changed, will restart daemon") + d.l.Lock() + d.currBootstrap = newBootstrap + d.state = daemonStateRestarting + d.l.Unlock() + + d.logger.Info(ctx, "Shutting down previous child processes") + if err := d.children.Shutdown(); err != nil { + d.logger.Fatal(ctx, "Failed to cleanly shutdown children, there may be orphaned child processes", err) + } + } +} + +func (d *daemon) GetGarageBootstrapHosts( + ctx context.Context, +) ( + map[string]bootstrap.Host, error, +) { + return withInnerChildren(d, func(*Children) (map[string]bootstrap.Host, error) { + return d.getGarageBootstrapHosts(ctx) + }) } func (d *daemon) Shutdown() error { - d.pmuxCancelFn() - <-d.pmuxStoppedCh + d.cancelFn() + <-d.stoppedCh return nil } diff --git a/go/daemon/daemon_restarter.go b/go/daemon/daemon_restarter.go deleted file mode 100644 index 54ac7f8..0000000 --- a/go/daemon/daemon_restarter.go +++ /dev/null @@ -1,262 +0,0 @@ -package daemon - -import ( - "bytes" - "context" - "errors" - "fmt" - "isle/bootstrap" - "sync" - "time" - - "dev.mediocregopher.com/mediocre-go-lib.git/mlog" -) - -const ( - daemonRestarterStateInitializing = iota - daemonRestarterStateOk - daemonRestarterStateRestarting - daemonRestarterStateShutdown -) - -type daemonRestarter struct { - logger *mlog.Logger - daemonConfig Config - envBinDirPath string - opts *Opts - - l sync.Mutex - state int - inner Daemon - currBootstrap bootstrap.Bootstrap - - cancelFn context.CancelFunc - stoppedCh chan struct{} -} - -// NewDaemonRestarter will wrap a Daemon such that it will be automatically -// shutdown and re-created whenever there's changes in the cluster which require -// the configuration to be changed (e.g. a new nebula lighthouse). -// -// While still starting up the daemon for the first time all methods will return -// ErrInitializing, except Shutdown which will block until initialization is -// canceled. -// -// During a restart all methods will return ErrRestarting, except Shutdown which -// will block until the currently executing restart is finished and then -// shutdown cleanly from there. -// -// TODO make daemonRestarter smarter, it currently restarts on _any_ change, but -// it should restart itself only when there's something actually requiring a -// restart. -func NewDaemonRestarter( - logger *mlog.Logger, - daemonConfig Config, - envBinDirPath string, - currBootstrap bootstrap.Bootstrap, - opts *Opts, -) Daemon { - ctx, cancelFn := context.WithCancel(context.Background()) - - dr := &daemonRestarter{ - logger: logger, - daemonConfig: daemonConfig, - envBinDirPath: envBinDirPath, - opts: opts.withDefaults(), - currBootstrap: currBootstrap, - cancelFn: cancelFn, - stoppedCh: make(chan struct{}), - } - - go func() { - dr.restartLoop(ctx) - dr.logger.Debug(ctx, "DaemonRestarter stopped") - close(dr.stoppedCh) - }() - - return dr -} - -func withInnerDaemon[Res any]( - dr *daemonRestarter, fn func(Daemon) (Res, error), -) (Res, error) { - var zero Res - dr.l.Lock() - inner, state := dr.inner, dr.state - dr.l.Unlock() - - switch state { - case daemonRestarterStateInitializing: - return zero, ErrInitializing - case daemonRestarterStateOk: - return fn(inner) - case daemonRestarterStateRestarting: - return zero, ErrRestarting - case daemonRestarterStateShutdown: - return zero, errors.New("already shutdown") - default: - panic(fmt.Sprintf("unknown state %d", dr.state)) - } -} - -// creates a new bootstrap file using available information from the network. If -// the new bootstrap file is different than the existing one, the existing one -// is overwritten and true is returned. -func (dr *daemonRestarter) checkBootstrap( - ctx context.Context, hostBootstrap bootstrap.Bootstrap, -) ( - bootstrap.Bootstrap, bool, error, -) { - - thisHost := hostBootstrap.ThisHost() - - newHosts, err := dr.inner.GetGarageBootstrapHosts(ctx) - if err != nil { - return bootstrap.Bootstrap{}, false, fmt.Errorf("getting hosts from garage: %w", err) - } - - // the daemon's view of this host's bootstrap info takes precedence over - // whatever is in garage - newHosts[thisHost.Name] = thisHost - - newHostsHash, err := bootstrap.HostsHash(newHosts) - if err != nil { - return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of new hosts: %w", err) - } - - currHostsHash, err := bootstrap.HostsHash(hostBootstrap.Hosts) - if err != nil { - return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of current hosts: %w", err) - } - - if bytes.Equal(newHostsHash, currHostsHash) { - return hostBootstrap, false, nil - } - - hostBootstrap.Hosts = newHosts - - return hostBootstrap, true, nil -} - -// blocks until a new bootstrap is available or context is canceled -func (dr *daemonRestarter) watchForChanges(ctx context.Context) bootstrap.Bootstrap { - - ticker := time.NewTicker(3 * time.Minute) - defer ticker.Stop() - - for { - select { - - case <-ctx.Done(): - return bootstrap.Bootstrap{} - - case <-ticker.C: - - dr.logger.Info(ctx, "Checking for changes to bootstrap") - - newBootstrap, changed, err := dr.checkBootstrap( - ctx, dr.currBootstrap, - ) - if err != nil { - dr.logger.Error(ctx, "Checking bootstrap for changes failed", err) - continue - } else if !changed { - continue - } - - err = writeBootstrapToStateDir(dr.opts.EnvVars.StateDirPath, newBootstrap) - if err != nil { - dr.logger.Error(ctx, "Writing new bootstrap to disk failed", err) - continue - } - - return newBootstrap - } - } -} - -func (dr *daemonRestarter) restartLoop(ctx context.Context) { - defer func() { - dr.l.Lock() - dr.state = daemonRestarterStateShutdown - inner := dr.inner - dr.l.Unlock() - - if inner != nil { - if err := inner.Shutdown(); err != nil { - dr.logger.Fatal(ctx, "failed to cleanly shutdown daemon, there may be orphaned child processes", err) - } - } - }() - - wait := func(d time.Duration) bool { - select { - case <-ctx.Done(): - return false - case <-time.After(d): - return true - } - } - - for { - if ctx.Err() != nil { - return - } - - dr.logger.Info(ctx, "Creating new daemon") - daemonInst, err := New( - ctx, - dr.logger.WithNamespace("daemon"), - dr.daemonConfig, - dr.currBootstrap, - dr.envBinDirPath, - dr.opts, - ) - if errors.Is(err, context.Canceled) { - return - } else if err != nil { - dr.logger.Error(ctx, "failed to initialize daemon", err) - if !wait(1 * time.Second) { - return - } - continue - } - - dr.l.Lock() - dr.inner = daemonInst - dr.state = daemonRestarterStateOk - dr.l.Unlock() - - newBootstrap := dr.watchForChanges(ctx) - if ctx.Err() != nil { - return - } - - dr.logger.Info(ctx, "Bootstrap has changed, will restart daemon") - dr.l.Lock() - dr.currBootstrap = newBootstrap - dr.state = daemonRestarterStateRestarting - dr.l.Unlock() - - dr.logger.Info(ctx, "Shutting down previous daemon") - if err := dr.inner.Shutdown(); err != nil { - dr.logger.Fatal(ctx, "failed to cleanly shutdown daemon, there may be orphaned child processes", err) - } - } -} - -func (dr *daemonRestarter) GetGarageBootstrapHosts( - ctx context.Context, -) ( - map[string]bootstrap.Host, error, -) { - return withInnerDaemon(dr, func(inner Daemon) (map[string]bootstrap.Host, error) { - return inner.GetGarageBootstrapHosts(ctx) - }) -} - -func (dr *daemonRestarter) Shutdown() error { - dr.cancelFn() - <-dr.stoppedCh - return nil -} diff --git a/go/daemon/errors.go b/go/daemon/errors.go index 6370216..a8ea305 100644 --- a/go/daemon/errors.go +++ b/go/daemon/errors.go @@ -3,11 +3,11 @@ package daemon import "isle/daemon/jsonrpc2" var ( - // ErrInitializing is returned when a cluster is unavailable due to still + // ErrInitializing is returned when a network is unavailable due to still // being initialized. - ErrInitializing = jsonrpc2.NewError(1, "Cluster is being initialized") + ErrInitializing = jsonrpc2.NewError(1, "Network is being initialized") - // ErrRestarting is returned when a cluster is unavailable due to being + // ErrRestarting is returned when a network is unavailable due to being // restarted. - ErrRestarting = jsonrpc2.NewError(2, "Cluster is being restarted") + ErrRestarting = jsonrpc2.NewError(2, "Network is being restarted") ) diff --git a/go/daemon/global_bucket.go b/go/daemon/global_bucket.go index 7fda231..bea0602 100644 --- a/go/daemon/global_bucket.go +++ b/go/daemon/global_bucket.go @@ -24,7 +24,7 @@ const ( // it. func (d *daemon) putGarageBoostrapHost(ctx context.Context) error { var ( - b = d.hostBootstrap + b = d.currBootstrap host = b.ThisHost() client = b.GlobalBucketS3APIClient() ) @@ -65,13 +65,13 @@ func (d *daemon) putGarageBoostrapHost(ctx context.Context) error { return nil } -func (d *daemon) GetGarageBootstrapHosts( +func (d *daemon) getGarageBootstrapHosts( ctx context.Context, ) ( map[string]bootstrap.Host, error, ) { var ( - b = d.hostBootstrap + b = d.currBootstrap client = b.GlobalBucketS3APIClient() hosts = map[string]bootstrap.Host{} @@ -106,13 +106,13 @@ func (d *daemon) GetGarageBootstrapHosts( obj.Close() if err != nil { - d.logger.Warn(ctx, "object contains invalid json", err) + d.logger.Warn(ctx, "Object contains invalid json", err) continue } host, err := authedHost.Unwrap(b.CAPublicCredentials) if err != nil { - d.logger.Warn(ctx, "host could not be authenticated", err) + d.logger.Warn(ctx, "Host could not be authenticated", err) } hosts[host.Name] = host diff --git a/go/daemon/jigs.go b/go/daemon/jigs.go index e9dafb1..3e46ce2 100644 --- a/go/daemon/jigs.go +++ b/go/daemon/jigs.go @@ -3,16 +3,28 @@ package daemon import ( "context" "time" + + "dev.mediocregopher.com/mediocre-go-lib.git/mlog" ) -func until(ctx context.Context, fn func(context.Context) error) error { +// until keeps trying fn until it returns nil, returning true. If the context is +// canceled then until returns false. +func until( + ctx context.Context, + logger *mlog.Logger, + descr string, + fn func(context.Context) error, +) bool { for { - if err := fn(ctx); err == nil { - return nil + logger.Info(ctx, descr) + err := fn(ctx) + if err == nil { + return true } else if ctxErr := ctx.Err(); ctxErr != nil { - return ctxErr + return false } + logger.Warn(ctx, descr+" failed, retrying in one second", err) time.Sleep(1 * time.Second) } } diff --git a/go/daemon/rpc.go b/go/daemon/rpc.go index d41325f..5d2d411 100644 --- a/go/daemon/rpc.go +++ b/go/daemon/rpc.go @@ -26,7 +26,7 @@ func NewRPC(daemon Daemon) *RPC { return &RPC{daemon} } -// GetHosts returns all hosts known to the cluster, sorted by their name. +// GetHosts returns all hosts known to the network, sorted by their name. func (r *RPC) GetHosts( ctx context.Context, req struct{}, ) ( diff --git a/tests/cases/admin/01-create-network.sh b/tests/cases/admin/01-create-network.sh index a73cb72..1f5d3b6 100644 --- a/tests/cases/admin/01-create-network.sh +++ b/tests/cases/admin/01-create-network.sh @@ -1,5 +1,5 @@ -# shellcheck source=../../utils/with-1-data-1-empty-node-cluster.sh -source "$UTILS"/with-1-data-1-empty-node-cluster.sh +# shellcheck source=../../utils/with-1-data-1-empty-node-network.sh +source "$UTILS"/with-1-data-1-empty-node-network.sh [ "$(cat a/meta/isle/rpc_port)" = "3900" ] [ "$(cat b/meta/isle/rpc_port)" = "3910" ] diff --git a/tests/cases/admin/02-create-bootstrap.sh b/tests/cases/admin/02-create-bootstrap.sh index 7dac613..a60bcd3 100644 --- a/tests/cases/admin/02-create-bootstrap.sh +++ b/tests/cases/admin/02-create-bootstrap.sh @@ -1,8 +1,8 @@ -# shellcheck source=../../utils/with-1-data-1-empty-node-cluster.sh -source "$UTILS"/with-1-data-1-empty-node-cluster.sh +# shellcheck source=../../utils/with-1-data-1-empty-node-network.sh +source "$UTILS"/with-1-data-1-empty-node-network.sh adminBS="$XDG_STATE_HOME"/isle/bootstrap.json -bs="$secondus_bootstrap" # set in with-1-data-1-empty-node-cluster.sh +bs="$secondus_bootstrap" # set in with-1-data-1-empty-node-network.sh [ "$(jq -r <"$bs" '.AdminCreationParams')" = "$(jq -r /dev/null; do sleep 1; done @@ -86,7 +86,7 @@ EOF pid="$!" echo "Waiting for secondus daemon (process $!) to initialize" - $SHELL "$UTILS/register-cleanup.sh" "$pid" "1-data-1-empty-node-cluster/secondus" + $SHELL "$UTILS/register-cleanup.sh" "$pid" "1-data-1-empty-node-network/secondus" while ! isle hosts list >/dev/null; do sleep 1; done ) fi