Refactor Daemon into Children and DaemonRestarter into Daemon

This commit is contained in:
Brian Picciano 2024-07-06 15:36:48 +02:00
parent 179059fd3d
commit 81368821b7
20 changed files with 474 additions and 427 deletions

View File

@ -166,25 +166,33 @@ var subCmdAdminCreateNetwork = subCmd{
return fmt.Errorf("merging daemon config into bootstrap data: %w", err) return fmt.Errorf("merging daemon config into bootstrap data: %w", err)
} }
daemonInst, err := daemon.New( children, err := daemon.NewChildren(
ctx, ctx,
logger.WithNamespace("daemon"), logger.WithNamespace("daemon"),
daemonConfig, daemonConfig,
hostBootstrap, hostBootstrap,
envBinDirPath, envBinDirPath,
&daemon.Opts{ &daemon.Opts{
// SkipHostBootstrapPush is required, because the global bucket
// hasn't actually been initialized yet, so there's nowhere to
// push to.
SkipHostBootstrapPush: true,
// NOTE both stdout and stderr are sent to stderr, so that the // NOTE both stdout and stderr are sent to stderr, so that the
// user can pipe the resulting admin.json to stdout. // user can pipe the resulting admin.json to stdout.
Stdout: os.Stderr, Stdout: os.Stderr,
}, },
) )
if err != nil { if err != nil {
return fmt.Errorf("initializing daemon: %w", err) return fmt.Errorf("initializing children: %w", err)
}
defer func() {
logger.Info(ctx, "Shutting down child processes")
if err := children.Shutdown(); err != nil {
logger.Error(ctx, "Failed to shut down children cleanly, there may be zombie children leftover", err)
}
}()
logger.Info(ctx, "Applying garage layout")
if err := daemon.GarageApplyLayout(
ctx, logger, hostBootstrap, daemonConfig,
); err != nil {
return fmt.Errorf("applying garage layout: %w", err)
} }
logger.Info(ctx, "initializing garage shared global bucket") logger.Info(ctx, "initializing garage shared global bucket")
@ -199,10 +207,6 @@ var subCmdAdminCreateNetwork = subCmd{
return fmt.Errorf("initializing garage shared global bucket: %w", err) return fmt.Errorf("initializing garage shared global bucket: %w", err)
} }
if err := daemonInst.Shutdown(); err != nil {
return fmt.Errorf("shutting down daemon: %w (this can mean there are zombie children leftover)", err)
}
hostBootstrap.Garage.GlobalBucketS3APICredentials = garageGlobalBucketCreds hostBootstrap.Garage.GlobalBucketS3APICredentials = garageGlobalBucketCreds
// rewrite the bootstrap now that the global bucket creds have been // rewrite the bootstrap now that the global bucket creds have been
@ -211,7 +215,7 @@ var subCmdAdminCreateNetwork = subCmd{
return fmt.Errorf("writing bootstrap file: %w", err) return fmt.Errorf("writing bootstrap file: %w", err)
} }
logger.Info(ctx, "cluster initialized successfully, writing admin.json to stdout") logger.Info(ctx, "Network initialized successfully, writing admin.json to stdout")
adm := admin.Admin{ adm := admin.Admin{
CreationParams: adminCreationParams, CreationParams: adminCreationParams,

View File

@ -127,7 +127,7 @@ var subCmdDaemon = subCmd{
return fmt.Errorf("merging daemon config into bootstrap data: %w", err) return fmt.Errorf("merging daemon config into bootstrap data: %w", err)
} }
daemonInst := daemon.NewDaemonRestarter( daemonInst := daemon.NewDaemon(
logger, daemonConfig, envBinDirPath, hostBootstrap, nil, logger, daemonConfig, envBinDirPath, hostBootstrap, nil,
) )
defer func() { defer func() {

View File

@ -39,9 +39,14 @@ func NewGarageAdminClient(
) )
} }
func (d *daemon) waitForGarage(ctx context.Context) error { func waitForGarage(
ctx context.Context,
logger *mlog.Logger,
daemonConfig Config,
hostBootstrap bootstrap.Bootstrap,
) error {
allocs := d.config.Storage.Allocations allocs := daemonConfig.Storage.Allocations
// if this host doesn't have any allocations specified then fall back to // if this host doesn't have any allocations specified then fall back to
// waiting for nebula // waiting for nebula
@ -49,23 +54,23 @@ func (d *daemon) waitForGarage(ctx context.Context) error {
return nil return nil
} }
adminClientLogger := garageAdminClientLogger(d.logger) adminClientLogger := garageAdminClientLogger(logger)
for _, alloc := range allocs { for _, alloc := range allocs {
adminAddr := net.JoinHostPort( adminAddr := net.JoinHostPort(
d.hostBootstrap.ThisHost().IP().String(), hostBootstrap.ThisHost().IP().String(),
strconv.Itoa(alloc.AdminPort), strconv.Itoa(alloc.AdminPort),
) )
adminClient := garage.NewAdminClient( adminClient := garage.NewAdminClient(
adminClientLogger, adminClientLogger,
adminAddr, adminAddr,
d.hostBootstrap.Garage.AdminToken, hostBootstrap.Garage.AdminToken,
) )
ctx := mctx.Annotate(ctx, "garageAdminAddr", adminAddr) ctx := mctx.Annotate(ctx, "garageAdminAddr", adminAddr)
d.logger.Debug(ctx, "waiting for garage instance to start") logger.Debug(ctx, "waiting for garage instance to start")
if err := adminClient.Wait(ctx); err != nil { if err := adminClient.Wait(ctx); err != nil {
return fmt.Errorf("waiting for garage instance %q to start up: %w", adminAddr, err) return fmt.Errorf("waiting for garage instance %q to start up: %w", adminAddr, err)
@ -139,9 +144,10 @@ func garageWriteChildConfig(
} }
func garagePmuxProcConfigs( func garagePmuxProcConfigs(
logger *mlog.Logger,
runtimeDirPath, binDirPath string, runtimeDirPath, binDirPath string,
hostBootstrap bootstrap.Bootstrap,
daemonConfig Config, daemonConfig Config,
hostBootstrap bootstrap.Bootstrap,
) ( ) (
[]pmuxlib.ProcessConfig, error, []pmuxlib.ProcessConfig, error,
) { ) {
@ -162,7 +168,7 @@ func garagePmuxProcConfigs(
Cmd: filepath.Join(binDirPath, "garage"), Cmd: filepath.Join(binDirPath, "garage"),
Args: []string{"-c", childConfigPath, "server"}, Args: []string{"-c", childConfigPath, "server"},
StartAfterFunc: func(ctx context.Context) error { StartAfterFunc: func(ctx context.Context) error {
return waitForNebula(ctx, hostBootstrap) return waitForNebula(ctx, logger, hostBootstrap)
}, },
}) })
} }
@ -170,7 +176,9 @@ func garagePmuxProcConfigs(
return pmuxProcConfigs, nil return pmuxProcConfigs, nil
} }
func garageApplyLayout( // TODO don't expose this publicly once cluster creation is done via Daemon
// interface.
func GarageApplyLayout(
ctx context.Context, ctx context.Context,
logger *mlog.Logger, logger *mlog.Logger,
hostBootstrap bootstrap.Bootstrap, hostBootstrap bootstrap.Bootstrap,

View File

@ -9,6 +9,8 @@ import (
"path/filepath" "path/filepath"
"code.betamike.com/micropelago/pmux/pmuxlib" "code.betamike.com/micropelago/pmux/pmuxlib"
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
"github.com/slackhq/nebula/cert" "github.com/slackhq/nebula/cert"
) )
@ -17,28 +19,37 @@ import (
// its source. If this succeeds we can assume that at the very least the nebula // its source. If this succeeds we can assume that at the very least the nebula
// interface has been initialized. // interface has been initialized.
func waitForNebula( func waitForNebula(
ctx context.Context, hostBootstrap bootstrap.Bootstrap, ctx context.Context, logger *mlog.Logger, hostBootstrap bootstrap.Bootstrap,
) error { ) error {
var (
ip = hostBootstrap.ThisHost().IP()
lUDPAddr = &net.UDPAddr{IP: ip, Port: 0}
rUDPAddr = &net.UDPAddr{IP: ip, Port: 45535}
)
ip := hostBootstrap.ThisHost().IP() ctx = mctx.Annotate(ctx, "lUDPAddr", lUDPAddr, "rUDPAddr", rUDPAddr)
lUdpAddr := &net.UDPAddr{IP: ip, Port: 0} until(
rUdpAddr := &net.UDPAddr{IP: ip, Port: 45535} ctx,
logger,
return until(ctx, func(context.Context) error { "Creating UDP socket from nebula addr",
conn, err := net.DialUDP("udp", lUdpAddr, rUdpAddr) func(context.Context) error {
conn, err := net.DialUDP("udp", lUDPAddr, rUDPAddr)
if err != nil { if err != nil {
return err return err
} }
conn.Close() conn.Close()
return nil return nil
}) },
)
return ctx.Err()
} }
func nebulaPmuxProcConfig( func nebulaPmuxProcConfig(
runtimeDirPath, binDirPath string, runtimeDirPath, binDirPath string,
hostBootstrap bootstrap.Bootstrap,
daemonConfig Config, daemonConfig Config,
hostBootstrap bootstrap.Bootstrap,
) ( ) (
pmuxlib.ProcessConfig, error, pmuxlib.ProcessConfig, error,
) { ) {

View File

@ -3,26 +3,29 @@ package daemon
import ( import (
"context" "context"
"fmt" "fmt"
"isle/bootstrap"
"code.betamike.com/micropelago/pmux/pmuxlib" "code.betamike.com/micropelago/pmux/pmuxlib"
) )
func (d *daemon) newPmuxConfig() (pmuxlib.Config, error) { func (c *Children) newPmuxConfig(
binDirPath string, daemonConfig Config, hostBootstrap bootstrap.Bootstrap,
) (pmuxlib.Config, error) {
nebulaPmuxProcConfig, err := nebulaPmuxProcConfig( nebulaPmuxProcConfig, err := nebulaPmuxProcConfig(
d.opts.EnvVars.RuntimeDirPath, c.opts.EnvVars.RuntimeDirPath,
d.binDirPath, binDirPath,
d.hostBootstrap, daemonConfig,
d.config, hostBootstrap,
) )
if err != nil { if err != nil {
return pmuxlib.Config{}, fmt.Errorf("generating nebula config: %w", err) return pmuxlib.Config{}, fmt.Errorf("generating nebula config: %w", err)
} }
dnsmasqPmuxProcConfig, err := dnsmasqPmuxProcConfig( dnsmasqPmuxProcConfig, err := dnsmasqPmuxProcConfig(
d.opts.EnvVars.RuntimeDirPath, c.opts.EnvVars.RuntimeDirPath,
d.binDirPath, binDirPath,
d.hostBootstrap, hostBootstrap,
d.config, daemonConfig,
) )
if err != nil { if err != nil {
return pmuxlib.Config{}, fmt.Errorf( return pmuxlib.Config{}, fmt.Errorf(
@ -31,7 +34,11 @@ func (d *daemon) newPmuxConfig() (pmuxlib.Config, error) {
} }
garagePmuxProcConfigs, err := garagePmuxProcConfigs( garagePmuxProcConfigs, err := garagePmuxProcConfigs(
d.opts.EnvVars.RuntimeDirPath, d.binDirPath, d.hostBootstrap, d.config, c.logger,
c.opts.EnvVars.RuntimeDirPath,
binDirPath,
daemonConfig,
hostBootstrap,
) )
if err != nil { if err != nil {
return pmuxlib.Config{}, fmt.Errorf( return pmuxlib.Config{}, fmt.Errorf(
@ -50,46 +57,21 @@ func (d *daemon) newPmuxConfig() (pmuxlib.Config, error) {
}, nil }, nil
} }
func (d *daemon) postPmuxInit(ctx context.Context) error { func (c *Children) postPmuxInit(
d.logger.Info(ctx, "waiting for nebula VPN to come online") ctx context.Context,
if err := waitForNebula(ctx, d.hostBootstrap); err != nil { daemonConfig Config,
hostBootstrap bootstrap.Bootstrap,
) error {
c.logger.Info(ctx, "Waiting for nebula VPN to come online")
if err := waitForNebula(ctx, c.logger, hostBootstrap); err != nil {
return fmt.Errorf("waiting for nebula to start: %w", err) return fmt.Errorf("waiting for nebula to start: %w", err)
} }
d.logger.Info(ctx, "waiting for garage instances to come online") c.logger.Info(ctx, "Waiting for garage instances to come online")
if err := d.waitForGarage(ctx); err != nil { err := waitForGarage(ctx, c.logger, daemonConfig, hostBootstrap)
if err != nil {
return fmt.Errorf("waiting for garage to start: %w", err) return fmt.Errorf("waiting for garage to start: %w", err)
} }
if len(d.config.Storage.Allocations) > 0 {
err := until(ctx, func(ctx context.Context) error {
err := garageApplyLayout(ctx, d.logger, d.hostBootstrap, d.config)
if err != nil {
d.logger.Error(ctx, "applying garage layout", err)
return err
}
return nil
})
if err != nil {
return fmt.Errorf("applying garage layout: %w", err)
}
}
if !d.opts.SkipHostBootstrapPush {
if err := until(ctx, func(ctx context.Context) error {
if err := d.putGarageBoostrapHost(ctx); err != nil {
d.logger.Error(ctx, "updating host info in garage", err)
return err
}
return nil
}); err != nil {
return fmt.Errorf("updating host info in garage: %w", err)
}
}
return nil return nil
} }

86
go/daemon/children.go Normal file
View File

@ -0,0 +1,86 @@
package daemon
import (
"context"
"fmt"
"isle/bootstrap"
"code.betamike.com/micropelago/pmux/pmuxlib"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
)
// Children manages all child processes of a network. Child processes are
// comprised of:
// - nebula
// - dnsmasq
// - garage (0 or more, depending on configured storage allocations)
type Children struct {
logger *mlog.Logger
opts Opts
pmuxCancelFn context.CancelFunc
pmuxStoppedCh chan struct{}
}
// NewChildren initialized and returns a Children instance. If initialization
// fails an error is returned.
func NewChildren(
ctx context.Context,
logger *mlog.Logger,
daemonConfig Config,
hostBootstrap bootstrap.Bootstrap,
binDirPath string,
opts *Opts,
) (
*Children, error,
) {
opts = opts.withDefaults()
pmuxCtx, pmuxCancelFn := context.WithCancel(context.Background())
c := &Children{
logger: logger,
opts: *opts,
pmuxCancelFn: pmuxCancelFn,
pmuxStoppedCh: make(chan struct{}),
}
pmuxConfig, err := c.newPmuxConfig(binDirPath, daemonConfig, hostBootstrap)
if err != nil {
return nil, fmt.Errorf("generating pmux config: %w", err)
}
go func() {
defer close(c.pmuxStoppedCh)
pmuxlib.Run(pmuxCtx, c.opts.Stdout, c.opts.Stderr, pmuxConfig)
c.logger.Debug(pmuxCtx, "pmux stopped")
}()
initErr := c.postPmuxInit(ctx, daemonConfig, hostBootstrap)
if initErr != nil {
logger.Warn(ctx, "failed to initialize Children, shutting down child processes", err)
if err := c.Shutdown(); err != nil {
panic(fmt.Sprintf(
"failed to shut down child processes after initialization"+
" error, there may be zombie children leftover."+
" Original error: %v",
initErr,
))
}
return nil, initErr
}
return c, nil
}
// Shutdown blocks until all child processes have gracefully shut themselves
// down.
//
// If this returns an error then it's possible that child processes are
// still running and are no longer managed.
func (c *Children) Shutdown() error {
c.pmuxCancelFn()
<-c.pmuxStoppedCh
return nil
}

View File

@ -1,15 +1,18 @@
// Package daemon implements the isle daemon, which is a long-running service // Package daemon implements the isle daemon, which is a long-running service
// managing all isle background tasks and sub-processes for a single cluster. // managing all isle background tasks and sub-processes for a single network.
package daemon package daemon
import ( import (
"bytes"
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"isle/bootstrap" "isle/bootstrap"
"os" "os"
"sync"
"time"
"code.betamike.com/micropelago/pmux/pmuxlib"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog" "dev.mediocregopher.com/mediocre-go-lib.git/mlog"
) )
@ -36,10 +39,6 @@ type Daemon interface {
// Opts are optional parameters which can be passed in when initializing a new // Opts are optional parameters which can be passed in when initializing a new
// Daemon instance. A nil Opts is equivalent to a zero value. // Daemon instance. A nil Opts is equivalent to a zero value.
type Opts struct { type Opts struct {
// SkipHostBootstrapPush, if set, will cause the Daemon to not push the
// bootstrap to garage upon a successful initialization.
SkipHostBootstrapPush bool
// Stdout and Stderr are what the associated outputs from child processes // Stdout and Stderr are what the associated outputs from child processes
// will be directed to. // will be directed to.
Stdout, Stderr io.Writer Stdout, Stderr io.Writer
@ -67,73 +66,280 @@ func (o *Opts) withDefaults() *Opts {
return o return o
} }
const (
daemonStateInitializing = iota
daemonStateOk
daemonStateRestarting
daemonStateShutdown
)
type daemon struct { type daemon struct {
logger *mlog.Logger logger *mlog.Logger
config Config daemonConfig Config
hostBootstrap bootstrap.Bootstrap envBinDirPath string
binDirPath string opts *Opts
opts Opts
pmuxCancelFn context.CancelFunc l sync.Mutex
pmuxStoppedCh chan struct{} state int
children *Children
currBootstrap bootstrap.Bootstrap
cancelFn context.CancelFunc
stoppedCh chan struct{}
} }
// New initialized and returns a Daemon. If initialization fails an error is // NewDaemon initializes and returns a Daemon instance which will manage all
// returned. // child processes and state required by the isle service, as well as an HTTP
func New( // socket over which RPC calls will be served.
ctx context.Context, //
// Inner Children instance(s) will be wrapped such that they will be
// automatically shutdown and re-created whenever there's changes in the network
// which require the configuration to be changed (e.g. a new nebula lighthouse).
// During such an inner restart all methods will return ErrRestarting, except
// Shutdown which will block until the currently executing restart is finished
// and then shutdown cleanly from there.
//
// While still starting up the Daemon for the first time all methods will return
// ErrInitializing, except Shutdown which will block until initialization is
// canceled.
//
// TODO make daemon smarter, it currently restarts on _any_ change, but
// it should restart itself only when there's something actually requiring a
// restart.
func NewDaemon(
logger *mlog.Logger, logger *mlog.Logger,
config Config, daemonConfig Config,
hostBootstrap bootstrap.Bootstrap, envBinDirPath string,
binDirPath string, currBootstrap bootstrap.Bootstrap,
opts *Opts, opts *Opts,
) ( ) Daemon {
Daemon, error, ctx, cancelFn := context.WithCancel(context.Background())
) {
opts = opts.withDefaults()
pmuxCtx, pmuxCancelFn := context.WithCancel(context.Background())
d := &daemon{ d := &daemon{
logger: logger, logger: logger,
config: config, daemonConfig: daemonConfig,
hostBootstrap: hostBootstrap, envBinDirPath: envBinDirPath,
binDirPath: binDirPath, opts: opts.withDefaults(),
opts: *opts, currBootstrap: currBootstrap,
pmuxCancelFn: pmuxCancelFn, cancelFn: cancelFn,
pmuxStoppedCh: make(chan struct{}), stoppedCh: make(chan struct{}),
}
pmuxConfig, err := d.newPmuxConfig()
if err != nil {
return nil, fmt.Errorf("generating pmux config: %w", err)
} }
go func() { go func() {
defer close(d.pmuxStoppedCh) d.restartLoop(ctx)
pmuxlib.Run(pmuxCtx, d.opts.Stdout, d.opts.Stderr, pmuxConfig) d.logger.Debug(ctx, "DaemonRestarter stopped")
d.logger.Debug(pmuxCtx, "pmux stopped") close(d.stoppedCh)
}() }()
if initErr := d.postPmuxInit(ctx); initErr != nil { return d
logger.Warn(ctx, "failed to initialize daemon, shutting down child processes", err) }
if err := d.Shutdown(); err != nil {
panic(fmt.Sprintf( func withInnerChildren[Res any](
"failed to shut down child processes after initialization"+ d *daemon, fn func(*Children) (Res, error),
" error, there may be zombie children leftover."+ ) (Res, error) {
" Original error: %v", var zero Res
initErr, d.l.Lock()
)) children, state := d.children, d.state
d.l.Unlock()
switch state {
case daemonStateInitializing:
return zero, ErrInitializing
case daemonStateOk:
return fn(children)
case daemonStateRestarting:
return zero, ErrRestarting
case daemonStateShutdown:
return zero, errors.New("already shutdown")
default:
panic(fmt.Sprintf("unknown state %d", d.state))
}
}
// creates a new bootstrap file using available information from the network. If
// the new bootstrap file is different than the existing one, the existing one
// is overwritten and true is returned.
func (d *daemon) checkBootstrap(
ctx context.Context, hostBootstrap bootstrap.Bootstrap,
) (
bootstrap.Bootstrap, bool, error,
) {
thisHost := hostBootstrap.ThisHost()
newHosts, err := d.getGarageBootstrapHosts(ctx)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("getting hosts from garage: %w", err)
} }
return nil, initErr // the daemon's view of this host's bootstrap info takes precedence over
// whatever is in garage
newHosts[thisHost.Name] = thisHost
newHostsHash, err := bootstrap.HostsHash(newHosts)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of new hosts: %w", err)
} }
return d, nil currHostsHash, err := bootstrap.HostsHash(hostBootstrap.Hosts)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of current hosts: %w", err)
}
if bytes.Equal(newHostsHash, currHostsHash) {
return hostBootstrap, false, nil
}
hostBootstrap.Hosts = newHosts
return hostBootstrap, true, nil
}
// blocks until a new bootstrap is available or context is canceled
func (d *daemon) watchForChanges(ctx context.Context) bootstrap.Bootstrap {
ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return bootstrap.Bootstrap{}
case <-ticker.C:
d.logger.Info(ctx, "Checking for changes to bootstrap")
newBootstrap, changed, err := d.checkBootstrap(
ctx, d.currBootstrap,
)
if err != nil {
d.logger.Error(ctx, "Checking bootstrap for changes failed", err)
continue
} else if !changed {
continue
}
err = writeBootstrapToStateDir(d.opts.EnvVars.StateDirPath, newBootstrap)
if err != nil {
d.logger.Error(ctx, "Writing new bootstrap to disk failed", err)
continue
}
return newBootstrap
}
}
}
func (d *daemon) restartLoop(ctx context.Context) {
defer func() {
d.l.Lock()
d.state = daemonStateShutdown
children := d.children
d.l.Unlock()
if children != nil {
if err := children.Shutdown(); err != nil {
d.logger.Fatal(ctx, "Failed to cleanly shutdown daemon children, there may be orphaned child processes", err)
}
}
}()
wait := func(d time.Duration) bool {
select {
case <-ctx.Done():
return false
case <-time.After(d):
return true
}
}
for {
if ctx.Err() != nil {
return
}
d.logger.Info(ctx, "Creating new daemon")
children, err := NewChildren(
ctx,
d.logger.WithNamespace("daemon"),
d.daemonConfig,
d.currBootstrap,
d.envBinDirPath,
d.opts,
)
if errors.Is(err, context.Canceled) {
return
} else if err != nil {
d.logger.Error(ctx, "failed to initialize daemon", err)
if !wait(1 * time.Second) {
return
}
continue
}
d.l.Lock()
d.children = children
d.state = daemonStateOk
d.l.Unlock()
if len(d.daemonConfig.Storage.Allocations) > 0 {
if !until(
ctx,
d.logger,
"Applying garage layout",
func(ctx context.Context) error {
return GarageApplyLayout(
ctx, d.logger, d.currBootstrap, d.daemonConfig,
)
},
) {
return
}
}
if !until(
ctx,
d.logger,
"Updating host info in garage",
func(ctx context.Context) error {
return d.putGarageBoostrapHost(ctx)
},
) {
return
}
newBootstrap := d.watchForChanges(ctx)
if ctx.Err() != nil {
return
}
d.logger.Info(ctx, "Bootstrap has changed, will restart daemon")
d.l.Lock()
d.currBootstrap = newBootstrap
d.state = daemonStateRestarting
d.l.Unlock()
d.logger.Info(ctx, "Shutting down previous child processes")
if err := d.children.Shutdown(); err != nil {
d.logger.Fatal(ctx, "Failed to cleanly shutdown children, there may be orphaned child processes", err)
}
}
}
func (d *daemon) GetGarageBootstrapHosts(
ctx context.Context,
) (
map[string]bootstrap.Host, error,
) {
return withInnerChildren(d, func(*Children) (map[string]bootstrap.Host, error) {
return d.getGarageBootstrapHosts(ctx)
})
} }
func (d *daemon) Shutdown() error { func (d *daemon) Shutdown() error {
d.pmuxCancelFn() d.cancelFn()
<-d.pmuxStoppedCh <-d.stoppedCh
return nil return nil
} }

View File

@ -1,262 +0,0 @@
package daemon
import (
"bytes"
"context"
"errors"
"fmt"
"isle/bootstrap"
"sync"
"time"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
)
const (
daemonRestarterStateInitializing = iota
daemonRestarterStateOk
daemonRestarterStateRestarting
daemonRestarterStateShutdown
)
type daemonRestarter struct {
logger *mlog.Logger
daemonConfig Config
envBinDirPath string
opts *Opts
l sync.Mutex
state int
inner Daemon
currBootstrap bootstrap.Bootstrap
cancelFn context.CancelFunc
stoppedCh chan struct{}
}
// NewDaemonRestarter will wrap a Daemon such that it will be automatically
// shutdown and re-created whenever there's changes in the cluster which require
// the configuration to be changed (e.g. a new nebula lighthouse).
//
// While still starting up the daemon for the first time all methods will return
// ErrInitializing, except Shutdown which will block until initialization is
// canceled.
//
// During a restart all methods will return ErrRestarting, except Shutdown which
// will block until the currently executing restart is finished and then
// shutdown cleanly from there.
//
// TODO make daemonRestarter smarter, it currently restarts on _any_ change, but
// it should restart itself only when there's something actually requiring a
// restart.
func NewDaemonRestarter(
logger *mlog.Logger,
daemonConfig Config,
envBinDirPath string,
currBootstrap bootstrap.Bootstrap,
opts *Opts,
) Daemon {
ctx, cancelFn := context.WithCancel(context.Background())
dr := &daemonRestarter{
logger: logger,
daemonConfig: daemonConfig,
envBinDirPath: envBinDirPath,
opts: opts.withDefaults(),
currBootstrap: currBootstrap,
cancelFn: cancelFn,
stoppedCh: make(chan struct{}),
}
go func() {
dr.restartLoop(ctx)
dr.logger.Debug(ctx, "DaemonRestarter stopped")
close(dr.stoppedCh)
}()
return dr
}
func withInnerDaemon[Res any](
dr *daemonRestarter, fn func(Daemon) (Res, error),
) (Res, error) {
var zero Res
dr.l.Lock()
inner, state := dr.inner, dr.state
dr.l.Unlock()
switch state {
case daemonRestarterStateInitializing:
return zero, ErrInitializing
case daemonRestarterStateOk:
return fn(inner)
case daemonRestarterStateRestarting:
return zero, ErrRestarting
case daemonRestarterStateShutdown:
return zero, errors.New("already shutdown")
default:
panic(fmt.Sprintf("unknown state %d", dr.state))
}
}
// creates a new bootstrap file using available information from the network. If
// the new bootstrap file is different than the existing one, the existing one
// is overwritten and true is returned.
func (dr *daemonRestarter) checkBootstrap(
ctx context.Context, hostBootstrap bootstrap.Bootstrap,
) (
bootstrap.Bootstrap, bool, error,
) {
thisHost := hostBootstrap.ThisHost()
newHosts, err := dr.inner.GetGarageBootstrapHosts(ctx)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("getting hosts from garage: %w", err)
}
// the daemon's view of this host's bootstrap info takes precedence over
// whatever is in garage
newHosts[thisHost.Name] = thisHost
newHostsHash, err := bootstrap.HostsHash(newHosts)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of new hosts: %w", err)
}
currHostsHash, err := bootstrap.HostsHash(hostBootstrap.Hosts)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of current hosts: %w", err)
}
if bytes.Equal(newHostsHash, currHostsHash) {
return hostBootstrap, false, nil
}
hostBootstrap.Hosts = newHosts
return hostBootstrap, true, nil
}
// blocks until a new bootstrap is available or context is canceled
func (dr *daemonRestarter) watchForChanges(ctx context.Context) bootstrap.Bootstrap {
ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return bootstrap.Bootstrap{}
case <-ticker.C:
dr.logger.Info(ctx, "Checking for changes to bootstrap")
newBootstrap, changed, err := dr.checkBootstrap(
ctx, dr.currBootstrap,
)
if err != nil {
dr.logger.Error(ctx, "Checking bootstrap for changes failed", err)
continue
} else if !changed {
continue
}
err = writeBootstrapToStateDir(dr.opts.EnvVars.StateDirPath, newBootstrap)
if err != nil {
dr.logger.Error(ctx, "Writing new bootstrap to disk failed", err)
continue
}
return newBootstrap
}
}
}
func (dr *daemonRestarter) restartLoop(ctx context.Context) {
defer func() {
dr.l.Lock()
dr.state = daemonRestarterStateShutdown
inner := dr.inner
dr.l.Unlock()
if inner != nil {
if err := inner.Shutdown(); err != nil {
dr.logger.Fatal(ctx, "failed to cleanly shutdown daemon, there may be orphaned child processes", err)
}
}
}()
wait := func(d time.Duration) bool {
select {
case <-ctx.Done():
return false
case <-time.After(d):
return true
}
}
for {
if ctx.Err() != nil {
return
}
dr.logger.Info(ctx, "Creating new daemon")
daemonInst, err := New(
ctx,
dr.logger.WithNamespace("daemon"),
dr.daemonConfig,
dr.currBootstrap,
dr.envBinDirPath,
dr.opts,
)
if errors.Is(err, context.Canceled) {
return
} else if err != nil {
dr.logger.Error(ctx, "failed to initialize daemon", err)
if !wait(1 * time.Second) {
return
}
continue
}
dr.l.Lock()
dr.inner = daemonInst
dr.state = daemonRestarterStateOk
dr.l.Unlock()
newBootstrap := dr.watchForChanges(ctx)
if ctx.Err() != nil {
return
}
dr.logger.Info(ctx, "Bootstrap has changed, will restart daemon")
dr.l.Lock()
dr.currBootstrap = newBootstrap
dr.state = daemonRestarterStateRestarting
dr.l.Unlock()
dr.logger.Info(ctx, "Shutting down previous daemon")
if err := dr.inner.Shutdown(); err != nil {
dr.logger.Fatal(ctx, "failed to cleanly shutdown daemon, there may be orphaned child processes", err)
}
}
}
func (dr *daemonRestarter) GetGarageBootstrapHosts(
ctx context.Context,
) (
map[string]bootstrap.Host, error,
) {
return withInnerDaemon(dr, func(inner Daemon) (map[string]bootstrap.Host, error) {
return inner.GetGarageBootstrapHosts(ctx)
})
}
func (dr *daemonRestarter) Shutdown() error {
dr.cancelFn()
<-dr.stoppedCh
return nil
}

View File

@ -3,11 +3,11 @@ package daemon
import "isle/daemon/jsonrpc2" import "isle/daemon/jsonrpc2"
var ( var (
// ErrInitializing is returned when a cluster is unavailable due to still // ErrInitializing is returned when a network is unavailable due to still
// being initialized. // being initialized.
ErrInitializing = jsonrpc2.NewError(1, "Cluster is being initialized") ErrInitializing = jsonrpc2.NewError(1, "Network is being initialized")
// ErrRestarting is returned when a cluster is unavailable due to being // ErrRestarting is returned when a network is unavailable due to being
// restarted. // restarted.
ErrRestarting = jsonrpc2.NewError(2, "Cluster is being restarted") ErrRestarting = jsonrpc2.NewError(2, "Network is being restarted")
) )

View File

@ -24,7 +24,7 @@ const (
// it. // it.
func (d *daemon) putGarageBoostrapHost(ctx context.Context) error { func (d *daemon) putGarageBoostrapHost(ctx context.Context) error {
var ( var (
b = d.hostBootstrap b = d.currBootstrap
host = b.ThisHost() host = b.ThisHost()
client = b.GlobalBucketS3APIClient() client = b.GlobalBucketS3APIClient()
) )
@ -65,13 +65,13 @@ func (d *daemon) putGarageBoostrapHost(ctx context.Context) error {
return nil return nil
} }
func (d *daemon) GetGarageBootstrapHosts( func (d *daemon) getGarageBootstrapHosts(
ctx context.Context, ctx context.Context,
) ( ) (
map[string]bootstrap.Host, error, map[string]bootstrap.Host, error,
) { ) {
var ( var (
b = d.hostBootstrap b = d.currBootstrap
client = b.GlobalBucketS3APIClient() client = b.GlobalBucketS3APIClient()
hosts = map[string]bootstrap.Host{} hosts = map[string]bootstrap.Host{}
@ -106,13 +106,13 @@ func (d *daemon) GetGarageBootstrapHosts(
obj.Close() obj.Close()
if err != nil { if err != nil {
d.logger.Warn(ctx, "object contains invalid json", err) d.logger.Warn(ctx, "Object contains invalid json", err)
continue continue
} }
host, err := authedHost.Unwrap(b.CAPublicCredentials) host, err := authedHost.Unwrap(b.CAPublicCredentials)
if err != nil { if err != nil {
d.logger.Warn(ctx, "host could not be authenticated", err) d.logger.Warn(ctx, "Host could not be authenticated", err)
} }
hosts[host.Name] = host hosts[host.Name] = host

View File

@ -3,16 +3,28 @@ package daemon
import ( import (
"context" "context"
"time" "time"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
) )
func until(ctx context.Context, fn func(context.Context) error) error { // until keeps trying fn until it returns nil, returning true. If the context is
// canceled then until returns false.
func until(
ctx context.Context,
logger *mlog.Logger,
descr string,
fn func(context.Context) error,
) bool {
for { for {
if err := fn(ctx); err == nil { logger.Info(ctx, descr)
return nil err := fn(ctx)
if err == nil {
return true
} else if ctxErr := ctx.Err(); ctxErr != nil { } else if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr return false
} }
logger.Warn(ctx, descr+" failed, retrying in one second", err)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
} }

View File

@ -26,7 +26,7 @@ func NewRPC(daemon Daemon) *RPC {
return &RPC{daemon} return &RPC{daemon}
} }
// GetHosts returns all hosts known to the cluster, sorted by their name. // GetHosts returns all hosts known to the network, sorted by their name.
func (r *RPC) GetHosts( func (r *RPC) GetHosts(
ctx context.Context, req struct{}, ctx context.Context, req struct{},
) ( ) (

View File

@ -1,5 +1,5 @@
# shellcheck source=../../utils/with-1-data-1-empty-node-cluster.sh # shellcheck source=../../utils/with-1-data-1-empty-node-network.sh
source "$UTILS"/with-1-data-1-empty-node-cluster.sh source "$UTILS"/with-1-data-1-empty-node-network.sh
[ "$(cat a/meta/isle/rpc_port)" = "3900" ] [ "$(cat a/meta/isle/rpc_port)" = "3900" ]
[ "$(cat b/meta/isle/rpc_port)" = "3910" ] [ "$(cat b/meta/isle/rpc_port)" = "3910" ]

View File

@ -1,8 +1,8 @@
# shellcheck source=../../utils/with-1-data-1-empty-node-cluster.sh # shellcheck source=../../utils/with-1-data-1-empty-node-network.sh
source "$UTILS"/with-1-data-1-empty-node-cluster.sh source "$UTILS"/with-1-data-1-empty-node-network.sh
adminBS="$XDG_STATE_HOME"/isle/bootstrap.json adminBS="$XDG_STATE_HOME"/isle/bootstrap.json
bs="$secondus_bootstrap" # set in with-1-data-1-empty-node-cluster.sh bs="$secondus_bootstrap" # set in with-1-data-1-empty-node-network.sh
[ "$(jq -r <"$bs" '.AdminCreationParams')" = "$(jq -r <admin.json '.CreationParams')" ] [ "$(jq -r <"$bs" '.AdminCreationParams')" = "$(jq -r <admin.json '.CreationParams')" ]
[ "$(jq -r <"$bs" '.SignedHostAssigned.Body.Name')" = "secondus" ] [ "$(jq -r <"$bs" '.SignedHostAssigned.Body.Name')" = "secondus" ]

View File

@ -1,5 +1,5 @@
# shellcheck source=../../utils/with-1-data-1-empty-node-cluster.sh # shellcheck source=../../utils/with-1-data-1-empty-node-network.sh
source "$UTILS"/with-1-data-1-empty-node-cluster.sh source "$UTILS"/with-1-data-1-empty-node-network.sh
function assert_a { function assert_a {
want_ip="$1" want_ip="$1"

View File

@ -1,5 +1,5 @@
# shellcheck source=../../utils/with-1-data-1-empty-node-cluster.sh # shellcheck source=../../utils/with-1-data-1-empty-node-network.sh
source "$UTILS"/with-1-data-1-empty-node-cluster.sh source "$UTILS"/with-1-data-1-empty-node-network.sh
function do_tests { function do_tests {
status="$(isle garage cli status | tail -n+3)" status="$(isle garage cli status | tail -n+3)"

View File

@ -1,5 +1,5 @@
# shellcheck source=../../utils/with-1-data-1-empty-node-cluster.sh # shellcheck source=../../utils/with-1-data-1-empty-node-network.sh
source "$UTILS"/with-1-data-1-empty-node-cluster.sh source "$UTILS"/with-1-data-1-empty-node-network.sh
function do_tests { function do_tests {
files="$(isle garage mc -- tree --json garage)" files="$(isle garage mc -- tree --json garage)"

View File

@ -1,5 +1,5 @@
# shellcheck source=../../utils/with-1-data-1-empty-node-cluster.sh # shellcheck source=../../utils/with-1-data-1-empty-node-network.sh
source "$UTILS"/with-1-data-1-empty-node-cluster.sh source "$UTILS"/with-1-data-1-empty-node-network.sh
as_primus as_primus
hosts="$(isle hosts list)" hosts="$(isle hosts list)"

View File

@ -92,8 +92,8 @@ for file in $test_files; do
done done
# Clean up any shared running clusters. Each cleanup script is responsible for # Clean up any shared running networks. Each cleanup script is responsible for
# figuring out if its shared cluster was actually instantiated during any tests. # figuring out if its shared network was actually instantiated during any tests.
if [ -e "$ROOT_TMPDIR/cleanup-pids" ]; then if [ -e "$ROOT_TMPDIR/cleanup-pids" ]; then
echo "Cleaning up running pids" echo "Cleaning up running pids"

View File

@ -25,7 +25,7 @@ as_primus
secondus_bootstrap="$(pwd)/secondus-bootstrap.json" secondus_bootstrap="$(pwd)/secondus-bootstrap.json"
if [ ! -d "$XDG_RUNTIME_DIR/isle" ]; then if [ ! -d "$XDG_RUNTIME_DIR/isle" ]; then
echo "Initializing shared single node cluster" echo "Initializing shared single node network"
mkdir a mkdir a
mkdir b mkdir b
@ -62,7 +62,7 @@ EOF
pid="$!" pid="$!"
echo "Waiting for primus daemon (process $pid) to initialize" echo "Waiting for primus daemon (process $pid) to initialize"
$SHELL "$UTILS/register-cleanup.sh" "$pid" "1-data-1-empty-node-cluster/primus" $SHELL "$UTILS/register-cleanup.sh" "$pid" "1-data-1-empty-node-network/primus"
while ! isle hosts list >/dev/null; do sleep 1; done while ! isle hosts list >/dev/null; do sleep 1; done
@ -86,7 +86,7 @@ EOF
pid="$!" pid="$!"
echo "Waiting for secondus daemon (process $!) to initialize" echo "Waiting for secondus daemon (process $!) to initialize"
$SHELL "$UTILS/register-cleanup.sh" "$pid" "1-data-1-empty-node-cluster/secondus" $SHELL "$UTILS/register-cleanup.sh" "$pid" "1-data-1-empty-node-network/secondus"
while ! isle hosts list >/dev/null; do sleep 1; done while ! isle hosts list >/dev/null; do sleep 1; done
) )
fi fi