Compare commits

..

No commits in common. "81368821b7aa64d9373eb4b68d1d5beb9850ada3" and "05e91cd657a91f43d50501727f9e961c7345eada" have entirely different histories.

24 changed files with 445 additions and 503 deletions

View File

@ -166,33 +166,25 @@ var subCmdAdminCreateNetwork = subCmd{
return fmt.Errorf("merging daemon config into bootstrap data: %w", err) return fmt.Errorf("merging daemon config into bootstrap data: %w", err)
} }
children, err := daemon.NewChildren( daemonInst, err := daemon.New(
ctx, ctx,
logger.WithNamespace("daemon"), logger.WithNamespace("daemon"),
daemonConfig, daemonConfig,
hostBootstrap, hostBootstrap,
envBinDirPath, envBinDirPath,
&daemon.Opts{ &daemon.Opts{
// SkipHostBootstrapPush is required, because the global bucket
// hasn't actually been initialized yet, so there's nowhere to
// push to.
SkipHostBootstrapPush: true,
// NOTE both stdout and stderr are sent to stderr, so that the // NOTE both stdout and stderr are sent to stderr, so that the
// user can pipe the resulting admin.json to stdout. // user can pipe the resulting admin.json to stdout.
Stdout: os.Stderr, Stdout: os.Stderr,
}, },
) )
if err != nil { if err != nil {
return fmt.Errorf("initializing children: %w", err) return fmt.Errorf("initializing daemon: %w", err)
}
defer func() {
logger.Info(ctx, "Shutting down child processes")
if err := children.Shutdown(); err != nil {
logger.Error(ctx, "Failed to shut down children cleanly, there may be zombie children leftover", err)
}
}()
logger.Info(ctx, "Applying garage layout")
if err := daemon.GarageApplyLayout(
ctx, logger, hostBootstrap, daemonConfig,
); err != nil {
return fmt.Errorf("applying garage layout: %w", err)
} }
logger.Info(ctx, "initializing garage shared global bucket") logger.Info(ctx, "initializing garage shared global bucket")
@ -207,6 +199,10 @@ var subCmdAdminCreateNetwork = subCmd{
return fmt.Errorf("initializing garage shared global bucket: %w", err) return fmt.Errorf("initializing garage shared global bucket: %w", err)
} }
if err := daemonInst.Shutdown(); err != nil {
return fmt.Errorf("shutting down daemon: %w (this can mean there are zombie children leftover)", err)
}
hostBootstrap.Garage.GlobalBucketS3APICredentials = garageGlobalBucketCreds hostBootstrap.Garage.GlobalBucketS3APICredentials = garageGlobalBucketCreds
// rewrite the bootstrap now that the global bucket creds have been // rewrite the bootstrap now that the global bucket creds have been
@ -215,7 +211,7 @@ var subCmdAdminCreateNetwork = subCmd{
return fmt.Errorf("writing bootstrap file: %w", err) return fmt.Errorf("writing bootstrap file: %w", err)
} }
logger.Info(ctx, "Network initialized successfully, writing admin.json to stdout") logger.Info(ctx, "cluster initialized successfully, writing admin.json to stdout")
adm := admin.Admin{ adm := admin.Admin{
CreationParams: adminCreationParams, CreationParams: adminCreationParams,

View File

@ -127,7 +127,7 @@ var subCmdDaemon = subCmd{
return fmt.Errorf("merging daemon config into bootstrap data: %w", err) return fmt.Errorf("merging daemon config into bootstrap data: %w", err)
} }
daemonInst := daemon.NewDaemon( daemonInst := daemon.NewDaemonRestarter(
logger, daemonConfig, envBinDirPath, hostBootstrap, nil, logger, daemonConfig, envBinDirPath, hostBootstrap, nil,
) )
defer func() { defer func() {

View File

@ -66,11 +66,12 @@ func newHTTPServer(
) ( ) (
*http.Server, error, *http.Server, error,
) { ) {
socketPath := daemon.HTTPSocketPath() l, err := net.Listen("unix", daemonEnvVars.HTTPSocketPath)
l, err := net.Listen("unix", socketPath)
if err != nil { if err != nil {
return nil, fmt.Errorf( return nil, fmt.Errorf(
"failed to listen on socket %q: %w", socketPath, err, "failed to listen on socket %q: %w",
daemonEnvVars.HTTPSocketPath,
err,
) )
} }

View File

@ -3,7 +3,6 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"isle/daemon"
"isle/daemon/jsonrpc2" "isle/daemon/jsonrpc2"
"os" "os"
"strings" "strings"
@ -111,7 +110,7 @@ func (ctx subCmdCtx) doSubCmd(subCmds ...subCmd) error {
} }
daemonRCPClient := jsonrpc2.NewUnixHTTPClient( daemonRCPClient := jsonrpc2.NewUnixHTTPClient(
daemon.HTTPSocketPath(), daemonHTTPRPCPath, daemonEnvVars.HTTPSocketPath, daemonHTTPRPCPath,
) )
err := subCmd.do(subCmdCtx{ err := subCmd.do(subCmdCtx{

View File

@ -39,14 +39,9 @@ func NewGarageAdminClient(
) )
} }
func waitForGarage( func (d *daemon) waitForGarage(ctx context.Context) error {
ctx context.Context,
logger *mlog.Logger,
daemonConfig Config,
hostBootstrap bootstrap.Bootstrap,
) error {
allocs := daemonConfig.Storage.Allocations allocs := d.config.Storage.Allocations
// if this host doesn't have any allocations specified then fall back to // if this host doesn't have any allocations specified then fall back to
// waiting for nebula // waiting for nebula
@ -54,23 +49,23 @@ func waitForGarage(
return nil return nil
} }
adminClientLogger := garageAdminClientLogger(logger) adminClientLogger := garageAdminClientLogger(d.logger)
for _, alloc := range allocs { for _, alloc := range allocs {
adminAddr := net.JoinHostPort( adminAddr := net.JoinHostPort(
hostBootstrap.ThisHost().IP().String(), d.hostBootstrap.ThisHost().IP().String(),
strconv.Itoa(alloc.AdminPort), strconv.Itoa(alloc.AdminPort),
) )
adminClient := garage.NewAdminClient( adminClient := garage.NewAdminClient(
adminClientLogger, adminClientLogger,
adminAddr, adminAddr,
hostBootstrap.Garage.AdminToken, d.hostBootstrap.Garage.AdminToken,
) )
ctx := mctx.Annotate(ctx, "garageAdminAddr", adminAddr) ctx := mctx.Annotate(ctx, "garageAdminAddr", adminAddr)
logger.Debug(ctx, "waiting for garage instance to start") d.logger.Debug(ctx, "waiting for garage instance to start")
if err := adminClient.Wait(ctx); err != nil { if err := adminClient.Wait(ctx); err != nil {
return fmt.Errorf("waiting for garage instance %q to start up: %w", adminAddr, err) return fmt.Errorf("waiting for garage instance %q to start up: %w", adminAddr, err)
@ -144,10 +139,9 @@ func garageWriteChildConfig(
} }
func garagePmuxProcConfigs( func garagePmuxProcConfigs(
logger *mlog.Logger,
runtimeDirPath, binDirPath string, runtimeDirPath, binDirPath string,
daemonConfig Config,
hostBootstrap bootstrap.Bootstrap, hostBootstrap bootstrap.Bootstrap,
daemonConfig Config,
) ( ) (
[]pmuxlib.ProcessConfig, error, []pmuxlib.ProcessConfig, error,
) { ) {
@ -168,7 +162,7 @@ func garagePmuxProcConfigs(
Cmd: filepath.Join(binDirPath, "garage"), Cmd: filepath.Join(binDirPath, "garage"),
Args: []string{"-c", childConfigPath, "server"}, Args: []string{"-c", childConfigPath, "server"},
StartAfterFunc: func(ctx context.Context) error { StartAfterFunc: func(ctx context.Context) error {
return waitForNebula(ctx, logger, hostBootstrap) return waitForNebula(ctx, hostBootstrap)
}, },
}) })
} }
@ -176,9 +170,7 @@ func garagePmuxProcConfigs(
return pmuxProcConfigs, nil return pmuxProcConfigs, nil
} }
// TODO don't expose this publicly once cluster creation is done via Daemon func garageApplyLayout(
// interface.
func GarageApplyLayout(
ctx context.Context, ctx context.Context,
logger *mlog.Logger, logger *mlog.Logger,
hostBootstrap bootstrap.Bootstrap, hostBootstrap bootstrap.Bootstrap,

View File

@ -9,8 +9,6 @@ import (
"path/filepath" "path/filepath"
"code.betamike.com/micropelago/pmux/pmuxlib" "code.betamike.com/micropelago/pmux/pmuxlib"
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
"github.com/slackhq/nebula/cert" "github.com/slackhq/nebula/cert"
) )
@ -19,37 +17,28 @@ import (
// its source. If this succeeds we can assume that at the very least the nebula // its source. If this succeeds we can assume that at the very least the nebula
// interface has been initialized. // interface has been initialized.
func waitForNebula( func waitForNebula(
ctx context.Context, logger *mlog.Logger, hostBootstrap bootstrap.Bootstrap, ctx context.Context, hostBootstrap bootstrap.Bootstrap,
) error { ) error {
var (
ip = hostBootstrap.ThisHost().IP()
lUDPAddr = &net.UDPAddr{IP: ip, Port: 0}
rUDPAddr = &net.UDPAddr{IP: ip, Port: 45535}
)
ctx = mctx.Annotate(ctx, "lUDPAddr", lUDPAddr, "rUDPAddr", rUDPAddr) ip := hostBootstrap.ThisHost().IP()
until( lUdpAddr := &net.UDPAddr{IP: ip, Port: 0}
ctx, rUdpAddr := &net.UDPAddr{IP: ip, Port: 45535}
logger,
"Creating UDP socket from nebula addr", return until(ctx, func(context.Context) error {
func(context.Context) error { conn, err := net.DialUDP("udp", lUdpAddr, rUdpAddr)
conn, err := net.DialUDP("udp", lUDPAddr, rUDPAddr)
if err != nil { if err != nil {
return err return err
} }
conn.Close() conn.Close()
return nil return nil
}, })
)
return ctx.Err()
} }
func nebulaPmuxProcConfig( func nebulaPmuxProcConfig(
runtimeDirPath, binDirPath string, runtimeDirPath, binDirPath string,
daemonConfig Config,
hostBootstrap bootstrap.Bootstrap, hostBootstrap bootstrap.Bootstrap,
daemonConfig Config,
) ( ) (
pmuxlib.ProcessConfig, error, pmuxlib.ProcessConfig, error,
) { ) {

View File

@ -3,29 +3,26 @@ package daemon
import ( import (
"context" "context"
"fmt" "fmt"
"isle/bootstrap"
"code.betamike.com/micropelago/pmux/pmuxlib" "code.betamike.com/micropelago/pmux/pmuxlib"
) )
func (c *Children) newPmuxConfig( func (d *daemon) newPmuxConfig() (pmuxlib.Config, error) {
binDirPath string, daemonConfig Config, hostBootstrap bootstrap.Bootstrap,
) (pmuxlib.Config, error) {
nebulaPmuxProcConfig, err := nebulaPmuxProcConfig( nebulaPmuxProcConfig, err := nebulaPmuxProcConfig(
c.opts.EnvVars.RuntimeDirPath, d.opts.EnvVars.RuntimeDirPath,
binDirPath, d.binDirPath,
daemonConfig, d.hostBootstrap,
hostBootstrap, d.config,
) )
if err != nil { if err != nil {
return pmuxlib.Config{}, fmt.Errorf("generating nebula config: %w", err) return pmuxlib.Config{}, fmt.Errorf("generating nebula config: %w", err)
} }
dnsmasqPmuxProcConfig, err := dnsmasqPmuxProcConfig( dnsmasqPmuxProcConfig, err := dnsmasqPmuxProcConfig(
c.opts.EnvVars.RuntimeDirPath, d.opts.EnvVars.RuntimeDirPath,
binDirPath, d.binDirPath,
hostBootstrap, d.hostBootstrap,
daemonConfig, d.config,
) )
if err != nil { if err != nil {
return pmuxlib.Config{}, fmt.Errorf( return pmuxlib.Config{}, fmt.Errorf(
@ -34,11 +31,7 @@ func (c *Children) newPmuxConfig(
} }
garagePmuxProcConfigs, err := garagePmuxProcConfigs( garagePmuxProcConfigs, err := garagePmuxProcConfigs(
c.logger, d.opts.EnvVars.RuntimeDirPath, d.binDirPath, d.hostBootstrap, d.config,
c.opts.EnvVars.RuntimeDirPath,
binDirPath,
daemonConfig,
hostBootstrap,
) )
if err != nil { if err != nil {
return pmuxlib.Config{}, fmt.Errorf( return pmuxlib.Config{}, fmt.Errorf(
@ -57,21 +50,46 @@ func (c *Children) newPmuxConfig(
}, nil }, nil
} }
func (c *Children) postPmuxInit( func (d *daemon) postPmuxInit(ctx context.Context) error {
ctx context.Context, d.logger.Info(ctx, "waiting for nebula VPN to come online")
daemonConfig Config, if err := waitForNebula(ctx, d.hostBootstrap); err != nil {
hostBootstrap bootstrap.Bootstrap,
) error {
c.logger.Info(ctx, "Waiting for nebula VPN to come online")
if err := waitForNebula(ctx, c.logger, hostBootstrap); err != nil {
return fmt.Errorf("waiting for nebula to start: %w", err) return fmt.Errorf("waiting for nebula to start: %w", err)
} }
c.logger.Info(ctx, "Waiting for garage instances to come online") d.logger.Info(ctx, "waiting for garage instances to come online")
err := waitForGarage(ctx, c.logger, daemonConfig, hostBootstrap) if err := d.waitForGarage(ctx); err != nil {
if err != nil {
return fmt.Errorf("waiting for garage to start: %w", err) return fmt.Errorf("waiting for garage to start: %w", err)
} }
if len(d.config.Storage.Allocations) > 0 {
err := until(ctx, func(ctx context.Context) error {
err := garageApplyLayout(ctx, d.logger, d.hostBootstrap, d.config)
if err != nil {
d.logger.Error(ctx, "applying garage layout", err)
return err
}
return nil
})
if err != nil {
return fmt.Errorf("applying garage layout: %w", err)
}
}
if !d.opts.SkipHostBootstrapPush {
if err := until(ctx, func(ctx context.Context) error {
if err := d.putGarageBoostrapHost(ctx); err != nil {
d.logger.Error(ctx, "updating host info in garage", err)
return err
}
return nil
}); err != nil {
return fmt.Errorf("updating host info in garage: %w", err)
}
}
return nil return nil
} }

View File

@ -1,86 +0,0 @@
package daemon
import (
"context"
"fmt"
"isle/bootstrap"
"code.betamike.com/micropelago/pmux/pmuxlib"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
)
// Children manages all child processes of a network. Child processes are
// comprised of:
// - nebula
// - dnsmasq
// - garage (0 or more, depending on configured storage allocations)
type Children struct {
logger *mlog.Logger
opts Opts
pmuxCancelFn context.CancelFunc
pmuxStoppedCh chan struct{}
}
// NewChildren initialized and returns a Children instance. If initialization
// fails an error is returned.
func NewChildren(
ctx context.Context,
logger *mlog.Logger,
daemonConfig Config,
hostBootstrap bootstrap.Bootstrap,
binDirPath string,
opts *Opts,
) (
*Children, error,
) {
opts = opts.withDefaults()
pmuxCtx, pmuxCancelFn := context.WithCancel(context.Background())
c := &Children{
logger: logger,
opts: *opts,
pmuxCancelFn: pmuxCancelFn,
pmuxStoppedCh: make(chan struct{}),
}
pmuxConfig, err := c.newPmuxConfig(binDirPath, daemonConfig, hostBootstrap)
if err != nil {
return nil, fmt.Errorf("generating pmux config: %w", err)
}
go func() {
defer close(c.pmuxStoppedCh)
pmuxlib.Run(pmuxCtx, c.opts.Stdout, c.opts.Stderr, pmuxConfig)
c.logger.Debug(pmuxCtx, "pmux stopped")
}()
initErr := c.postPmuxInit(ctx, daemonConfig, hostBootstrap)
if initErr != nil {
logger.Warn(ctx, "failed to initialize Children, shutting down child processes", err)
if err := c.Shutdown(); err != nil {
panic(fmt.Sprintf(
"failed to shut down child processes after initialization"+
" error, there may be zombie children leftover."+
" Original error: %v",
initErr,
))
}
return nil, initErr
}
return c, nil
}
// Shutdown blocks until all child processes have gracefully shut themselves
// down.
//
// If this returns an error then it's possible that child processes are
// still running and are no longer managed.
func (c *Children) Shutdown() error {
c.pmuxCancelFn()
<-c.pmuxStoppedCh
return nil
}

View File

@ -1,18 +1,15 @@
// Package daemon implements the isle daemon, which is a long-running service // Package daemon implements the isle daemon, which is a long-running service
// managing all isle background tasks and sub-processes for a single network. // managing all isle background tasks and sub-processes for a single cluster.
package daemon package daemon
import ( import (
"bytes"
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"isle/bootstrap" "isle/bootstrap"
"os" "os"
"sync"
"time"
"code.betamike.com/micropelago/pmux/pmuxlib"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog" "dev.mediocregopher.com/mediocre-go-lib.git/mlog"
) )
@ -39,6 +36,10 @@ type Daemon interface {
// Opts are optional parameters which can be passed in when initializing a new // Opts are optional parameters which can be passed in when initializing a new
// Daemon instance. A nil Opts is equivalent to a zero value. // Daemon instance. A nil Opts is equivalent to a zero value.
type Opts struct { type Opts struct {
// SkipHostBootstrapPush, if set, will cause the Daemon to not push the
// bootstrap to garage upon a successful initialization.
SkipHostBootstrapPush bool
// Stdout and Stderr are what the associated outputs from child processes // Stdout and Stderr are what the associated outputs from child processes
// will be directed to. // will be directed to.
Stdout, Stderr io.Writer Stdout, Stderr io.Writer
@ -66,280 +67,73 @@ func (o *Opts) withDefaults() *Opts {
return o return o
} }
const (
daemonStateInitializing = iota
daemonStateOk
daemonStateRestarting
daemonStateShutdown
)
type daemon struct { type daemon struct {
logger *mlog.Logger logger *mlog.Logger
daemonConfig Config config Config
envBinDirPath string hostBootstrap bootstrap.Bootstrap
opts *Opts binDirPath string
opts Opts
l sync.Mutex pmuxCancelFn context.CancelFunc
state int pmuxStoppedCh chan struct{}
children *Children
currBootstrap bootstrap.Bootstrap
cancelFn context.CancelFunc
stoppedCh chan struct{}
} }
// NewDaemon initializes and returns a Daemon instance which will manage all // New initialized and returns a Daemon. If initialization fails an error is
// child processes and state required by the isle service, as well as an HTTP // returned.
// socket over which RPC calls will be served. func New(
// ctx context.Context,
// Inner Children instance(s) will be wrapped such that they will be
// automatically shutdown and re-created whenever there's changes in the network
// which require the configuration to be changed (e.g. a new nebula lighthouse).
// During such an inner restart all methods will return ErrRestarting, except
// Shutdown which will block until the currently executing restart is finished
// and then shutdown cleanly from there.
//
// While still starting up the Daemon for the first time all methods will return
// ErrInitializing, except Shutdown which will block until initialization is
// canceled.
//
// TODO make daemon smarter, it currently restarts on _any_ change, but
// it should restart itself only when there's something actually requiring a
// restart.
func NewDaemon(
logger *mlog.Logger, logger *mlog.Logger,
daemonConfig Config, config Config,
envBinDirPath string, hostBootstrap bootstrap.Bootstrap,
currBootstrap bootstrap.Bootstrap, binDirPath string,
opts *Opts, opts *Opts,
) Daemon { ) (
ctx, cancelFn := context.WithCancel(context.Background()) Daemon, error,
) {
opts = opts.withDefaults()
pmuxCtx, pmuxCancelFn := context.WithCancel(context.Background())
d := &daemon{ d := &daemon{
logger: logger, logger: logger,
daemonConfig: daemonConfig, config: config,
envBinDirPath: envBinDirPath, hostBootstrap: hostBootstrap,
opts: opts.withDefaults(), binDirPath: binDirPath,
currBootstrap: currBootstrap, opts: *opts,
cancelFn: cancelFn, pmuxCancelFn: pmuxCancelFn,
stoppedCh: make(chan struct{}), pmuxStoppedCh: make(chan struct{}),
}
pmuxConfig, err := d.newPmuxConfig()
if err != nil {
return nil, fmt.Errorf("generating pmux config: %w", err)
} }
go func() { go func() {
d.restartLoop(ctx) defer close(d.pmuxStoppedCh)
d.logger.Debug(ctx, "DaemonRestarter stopped") pmuxlib.Run(pmuxCtx, d.opts.Stdout, d.opts.Stderr, pmuxConfig)
close(d.stoppedCh) d.logger.Debug(pmuxCtx, "pmux stopped")
}() }()
return d if initErr := d.postPmuxInit(ctx); initErr != nil {
} logger.Warn(ctx, "failed to initialize daemon, shutting down child processes", err)
if err := d.Shutdown(); err != nil {
func withInnerChildren[Res any]( panic(fmt.Sprintf(
d *daemon, fn func(*Children) (Res, error), "failed to shut down child processes after initialization"+
) (Res, error) { " error, there may be zombie children leftover."+
var zero Res " Original error: %v",
d.l.Lock() initErr,
children, state := d.children, d.state ))
d.l.Unlock()
switch state {
case daemonStateInitializing:
return zero, ErrInitializing
case daemonStateOk:
return fn(children)
case daemonStateRestarting:
return zero, ErrRestarting
case daemonStateShutdown:
return zero, errors.New("already shutdown")
default:
panic(fmt.Sprintf("unknown state %d", d.state))
}
}
// creates a new bootstrap file using available information from the network. If
// the new bootstrap file is different than the existing one, the existing one
// is overwritten and true is returned.
func (d *daemon) checkBootstrap(
ctx context.Context, hostBootstrap bootstrap.Bootstrap,
) (
bootstrap.Bootstrap, bool, error,
) {
thisHost := hostBootstrap.ThisHost()
newHosts, err := d.getGarageBootstrapHosts(ctx)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("getting hosts from garage: %w", err)
} }
// the daemon's view of this host's bootstrap info takes precedence over return nil, initErr
// whatever is in garage
newHosts[thisHost.Name] = thisHost
newHostsHash, err := bootstrap.HostsHash(newHosts)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of new hosts: %w", err)
} }
currHostsHash, err := bootstrap.HostsHash(hostBootstrap.Hosts) return d, nil
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of current hosts: %w", err)
}
if bytes.Equal(newHostsHash, currHostsHash) {
return hostBootstrap, false, nil
}
hostBootstrap.Hosts = newHosts
return hostBootstrap, true, nil
}
// blocks until a new bootstrap is available or context is canceled
func (d *daemon) watchForChanges(ctx context.Context) bootstrap.Bootstrap {
ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return bootstrap.Bootstrap{}
case <-ticker.C:
d.logger.Info(ctx, "Checking for changes to bootstrap")
newBootstrap, changed, err := d.checkBootstrap(
ctx, d.currBootstrap,
)
if err != nil {
d.logger.Error(ctx, "Checking bootstrap for changes failed", err)
continue
} else if !changed {
continue
}
err = writeBootstrapToStateDir(d.opts.EnvVars.StateDirPath, newBootstrap)
if err != nil {
d.logger.Error(ctx, "Writing new bootstrap to disk failed", err)
continue
}
return newBootstrap
}
}
}
func (d *daemon) restartLoop(ctx context.Context) {
defer func() {
d.l.Lock()
d.state = daemonStateShutdown
children := d.children
d.l.Unlock()
if children != nil {
if err := children.Shutdown(); err != nil {
d.logger.Fatal(ctx, "Failed to cleanly shutdown daemon children, there may be orphaned child processes", err)
}
}
}()
wait := func(d time.Duration) bool {
select {
case <-ctx.Done():
return false
case <-time.After(d):
return true
}
}
for {
if ctx.Err() != nil {
return
}
d.logger.Info(ctx, "Creating new daemon")
children, err := NewChildren(
ctx,
d.logger.WithNamespace("daemon"),
d.daemonConfig,
d.currBootstrap,
d.envBinDirPath,
d.opts,
)
if errors.Is(err, context.Canceled) {
return
} else if err != nil {
d.logger.Error(ctx, "failed to initialize daemon", err)
if !wait(1 * time.Second) {
return
}
continue
}
d.l.Lock()
d.children = children
d.state = daemonStateOk
d.l.Unlock()
if len(d.daemonConfig.Storage.Allocations) > 0 {
if !until(
ctx,
d.logger,
"Applying garage layout",
func(ctx context.Context) error {
return GarageApplyLayout(
ctx, d.logger, d.currBootstrap, d.daemonConfig,
)
},
) {
return
}
}
if !until(
ctx,
d.logger,
"Updating host info in garage",
func(ctx context.Context) error {
return d.putGarageBoostrapHost(ctx)
},
) {
return
}
newBootstrap := d.watchForChanges(ctx)
if ctx.Err() != nil {
return
}
d.logger.Info(ctx, "Bootstrap has changed, will restart daemon")
d.l.Lock()
d.currBootstrap = newBootstrap
d.state = daemonStateRestarting
d.l.Unlock()
d.logger.Info(ctx, "Shutting down previous child processes")
if err := d.children.Shutdown(); err != nil {
d.logger.Fatal(ctx, "Failed to cleanly shutdown children, there may be orphaned child processes", err)
}
}
}
func (d *daemon) GetGarageBootstrapHosts(
ctx context.Context,
) (
map[string]bootstrap.Host, error,
) {
return withInnerChildren(d, func(*Children) (map[string]bootstrap.Host, error) {
return d.getGarageBootstrapHosts(ctx)
})
} }
func (d *daemon) Shutdown() error { func (d *daemon) Shutdown() error {
d.cancelFn() d.pmuxCancelFn()
<-d.stoppedCh <-d.pmuxStoppedCh
return nil return nil
} }

View File

@ -0,0 +1,262 @@
package daemon
import (
"bytes"
"context"
"errors"
"fmt"
"isle/bootstrap"
"sync"
"time"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
)
const (
daemonRestarterStateInitializing = iota
daemonRestarterStateOk
daemonRestarterStateRestarting
daemonRestarterStateShutdown
)
type daemonRestarter struct {
logger *mlog.Logger
daemonConfig Config
envBinDirPath string
opts *Opts
l sync.Mutex
state int
inner Daemon
currBootstrap bootstrap.Bootstrap
cancelFn context.CancelFunc
stoppedCh chan struct{}
}
// NewDaemonRestarter will wrap a Daemon such that it will be automatically
// shutdown and re-created whenever there's changes in the cluster which require
// the configuration to be changed (e.g. a new nebula lighthouse).
//
// While still starting up the daemon for the first time all methods will return
// ErrInitializing, except Shutdown which will block until initialization is
// canceled.
//
// During a restart all methods will return ErrRestarting, except Shutdown which
// will block until the currently executing restart is finished and then
// shutdown cleanly from there.
//
// TODO make daemonRestarter smarter, it currently restarts on _any_ change, but
// it should restart itself only when there's something actually requiring a
// restart.
func NewDaemonRestarter(
logger *mlog.Logger,
daemonConfig Config,
envBinDirPath string,
currBootstrap bootstrap.Bootstrap,
opts *Opts,
) Daemon {
ctx, cancelFn := context.WithCancel(context.Background())
dr := &daemonRestarter{
logger: logger,
daemonConfig: daemonConfig,
envBinDirPath: envBinDirPath,
opts: opts.withDefaults(),
currBootstrap: currBootstrap,
cancelFn: cancelFn,
stoppedCh: make(chan struct{}),
}
go func() {
dr.restartLoop(ctx)
dr.logger.Debug(ctx, "DaemonRestarter stopped")
close(dr.stoppedCh)
}()
return dr
}
func withInnerDaemon[Res any](
dr *daemonRestarter, fn func(Daemon) (Res, error),
) (Res, error) {
var zero Res
dr.l.Lock()
inner, state := dr.inner, dr.state
dr.l.Unlock()
switch state {
case daemonRestarterStateInitializing:
return zero, ErrInitializing
case daemonRestarterStateOk:
return fn(inner)
case daemonRestarterStateRestarting:
return zero, ErrRestarting
case daemonRestarterStateShutdown:
return zero, errors.New("already shutdown")
default:
panic(fmt.Sprintf("unknown state %d", dr.state))
}
}
// creates a new bootstrap file using available information from the network. If
// the new bootstrap file is different than the existing one, the existing one
// is overwritten and true is returned.
func (dr *daemonRestarter) checkBootstrap(
ctx context.Context, hostBootstrap bootstrap.Bootstrap,
) (
bootstrap.Bootstrap, bool, error,
) {
thisHost := hostBootstrap.ThisHost()
newHosts, err := dr.inner.GetGarageBootstrapHosts(ctx)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("getting hosts from garage: %w", err)
}
// the daemon's view of this host's bootstrap info takes precedence over
// whatever is in garage
newHosts[thisHost.Name] = thisHost
newHostsHash, err := bootstrap.HostsHash(newHosts)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of new hosts: %w", err)
}
currHostsHash, err := bootstrap.HostsHash(hostBootstrap.Hosts)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of current hosts: %w", err)
}
if bytes.Equal(newHostsHash, currHostsHash) {
return hostBootstrap, false, nil
}
hostBootstrap.Hosts = newHosts
return hostBootstrap, true, nil
}
// blocks until a new bootstrap is available or context is canceled
func (dr *daemonRestarter) watchForChanges(ctx context.Context) bootstrap.Bootstrap {
ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return bootstrap.Bootstrap{}
case <-ticker.C:
dr.logger.Info(ctx, "Checking for changes to bootstrap")
newBootstrap, changed, err := dr.checkBootstrap(
ctx, dr.currBootstrap,
)
if err != nil {
dr.logger.Error(ctx, "Checking bootstrap for changes failed", err)
continue
} else if !changed {
continue
}
err = writeBootstrapToStateDir(dr.opts.EnvVars.StateDirPath, newBootstrap)
if err != nil {
dr.logger.Error(ctx, "Writing new bootstrap to disk failed", err)
continue
}
return newBootstrap
}
}
}
func (dr *daemonRestarter) restartLoop(ctx context.Context) {
defer func() {
dr.l.Lock()
dr.state = daemonRestarterStateShutdown
inner := dr.inner
dr.l.Unlock()
if inner != nil {
if err := inner.Shutdown(); err != nil {
dr.logger.Fatal(ctx, "failed to cleanly shutdown daemon, there may be orphaned child processes", err)
}
}
}()
wait := func(d time.Duration) bool {
select {
case <-ctx.Done():
return false
case <-time.After(d):
return true
}
}
for {
if ctx.Err() != nil {
return
}
dr.logger.Info(ctx, "Creating new daemon")
daemonInst, err := New(
ctx,
dr.logger.WithNamespace("daemon"),
dr.daemonConfig,
dr.currBootstrap,
dr.envBinDirPath,
dr.opts,
)
if errors.Is(err, context.Canceled) {
return
} else if err != nil {
dr.logger.Error(ctx, "failed to initialize daemon", err)
if !wait(1 * time.Second) {
return
}
continue
}
dr.l.Lock()
dr.inner = daemonInst
dr.state = daemonRestarterStateOk
dr.l.Unlock()
newBootstrap := dr.watchForChanges(ctx)
if ctx.Err() != nil {
return
}
dr.logger.Info(ctx, "Bootstrap has changed, will restart daemon")
dr.l.Lock()
dr.currBootstrap = newBootstrap
dr.state = daemonRestarterStateRestarting
dr.l.Unlock()
dr.logger.Info(ctx, "Shutting down previous daemon")
if err := dr.inner.Shutdown(); err != nil {
dr.logger.Fatal(ctx, "failed to cleanly shutdown daemon, there may be orphaned child processes", err)
}
}
}
func (dr *daemonRestarter) GetGarageBootstrapHosts(
ctx context.Context,
) (
map[string]bootstrap.Host, error,
) {
return withInnerDaemon(dr, func(inner Daemon) (map[string]bootstrap.Host, error) {
return inner.GetGarageBootstrapHosts(ctx)
})
}
func (dr *daemonRestarter) Shutdown() error {
dr.cancelFn()
<-dr.stoppedCh
return nil
}

View File

@ -13,18 +13,15 @@ import (
"github.com/adrg/xdg" "github.com/adrg/xdg"
) )
// DEPRECATED
//
// EnvVars are variables which are derived based on the environment which the // EnvVars are variables which are derived based on the environment which the
// process is running in. // process is running in.
//
// TODO EnvVars should be private to this package.
type EnvVars struct { type EnvVars struct {
RuntimeDirPath string RuntimeDirPath string // TODO should be private to this package
StateDirPath string StateDirPath string // TODO should be private to this package
HTTPSocketPath string
} }
func getDefaultHTTPSocketDirPath() string { func getRPCSocketDirPath() string {
path, err := firstExistingDir( path, err := firstExistingDir(
"/run", "/run",
"/var/run", "/var/run",
@ -32,27 +29,12 @@ func getDefaultHTTPSocketDirPath() string {
"/dev/shm", "/dev/shm",
) )
if err != nil { if err != nil {
panic(fmt.Sprintf("Failed to find directory for HTTP socket: %v", err)) panic(fmt.Sprintf("Failed to find directory for RPC socket: %v", err))
} }
return path return path
} }
// HTTPSocketPath returns the path to the daemon's HTTP socket which is used for
// RPC and other functionality.
var HTTPSocketPath = sync.OnceValue(func() string {
return envOr(
"ISLE_DAEMON_HTTP_SOCKET_PATH",
func() string {
return filepath.Join(
getDefaultHTTPSocketDirPath(), "isle-daemon.sock",
)
},
)
})
// DEPRECATED
//
// GetEnvVars will return the EnvVars of the current processes, as determined by // GetEnvVars will return the EnvVars of the current processes, as determined by
// the process's environment. // the process's environment.
var GetEnvVars = sync.OnceValue(func() (v EnvVars) { var GetEnvVars = sync.OnceValue(func() (v EnvVars) {
@ -69,6 +51,13 @@ var GetEnvVars = sync.OnceValue(func() (v EnvVars) {
func() string { return filepath.Join(xdg.StateHome, "isle") }, func() string { return filepath.Join(xdg.StateHome, "isle") },
) )
v.HTTPSocketPath = envOr(
"ISLE_SOCKET_PATH",
func() string {
return filepath.Join(getRPCSocketDirPath(), "isle-daemon.sock")
},
)
return return
}) })

View File

@ -3,11 +3,11 @@ package daemon
import "isle/daemon/jsonrpc2" import "isle/daemon/jsonrpc2"
var ( var (
// ErrInitializing is returned when a network is unavailable due to still // ErrInitializing is returned when a cluster is unavailable due to still
// being initialized. // being initialized.
ErrInitializing = jsonrpc2.NewError(1, "Network is being initialized") ErrInitializing = jsonrpc2.NewError(1, "Cluster is being initialized")
// ErrRestarting is returned when a network is unavailable due to being // ErrRestarting is returned when a cluster is unavailable due to being
// restarted. // restarted.
ErrRestarting = jsonrpc2.NewError(2, "Network is being restarted") ErrRestarting = jsonrpc2.NewError(2, "Cluster is being restarted")
) )

View File

@ -24,7 +24,7 @@ const (
// it. // it.
func (d *daemon) putGarageBoostrapHost(ctx context.Context) error { func (d *daemon) putGarageBoostrapHost(ctx context.Context) error {
var ( var (
b = d.currBootstrap b = d.hostBootstrap
host = b.ThisHost() host = b.ThisHost()
client = b.GlobalBucketS3APIClient() client = b.GlobalBucketS3APIClient()
) )
@ -65,13 +65,13 @@ func (d *daemon) putGarageBoostrapHost(ctx context.Context) error {
return nil return nil
} }
func (d *daemon) getGarageBootstrapHosts( func (d *daemon) GetGarageBootstrapHosts(
ctx context.Context, ctx context.Context,
) ( ) (
map[string]bootstrap.Host, error, map[string]bootstrap.Host, error,
) { ) {
var ( var (
b = d.currBootstrap b = d.hostBootstrap
client = b.GlobalBucketS3APIClient() client = b.GlobalBucketS3APIClient()
hosts = map[string]bootstrap.Host{} hosts = map[string]bootstrap.Host{}
@ -106,13 +106,13 @@ func (d *daemon) getGarageBootstrapHosts(
obj.Close() obj.Close()
if err != nil { if err != nil {
d.logger.Warn(ctx, "Object contains invalid json", err) d.logger.Warn(ctx, "object contains invalid json", err)
continue continue
} }
host, err := authedHost.Unwrap(b.CAPublicCredentials) host, err := authedHost.Unwrap(b.CAPublicCredentials)
if err != nil { if err != nil {
d.logger.Warn(ctx, "Host could not be authenticated", err) d.logger.Warn(ctx, "host could not be authenticated", err)
} }
hosts[host.Name] = host hosts[host.Name] = host

View File

@ -3,28 +3,16 @@ package daemon
import ( import (
"context" "context"
"time" "time"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
) )
// until keeps trying fn until it returns nil, returning true. If the context is func until(ctx context.Context, fn func(context.Context) error) error {
// canceled then until returns false.
func until(
ctx context.Context,
logger *mlog.Logger,
descr string,
fn func(context.Context) error,
) bool {
for { for {
logger.Info(ctx, descr) if err := fn(ctx); err == nil {
err := fn(ctx) return nil
if err == nil {
return true
} else if ctxErr := ctx.Err(); ctxErr != nil { } else if ctxErr := ctx.Err(); ctxErr != nil {
return false return ctxErr
} }
logger.Warn(ctx, descr+" failed, retrying in one second", err)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
} }

View File

@ -26,7 +26,7 @@ func NewRPC(daemon Daemon) *RPC {
return &RPC{daemon} return &RPC{daemon}
} }
// GetHosts returns all hosts known to the network, sorted by their name. // GetHosts returns all hosts known to the cluster, sorted by their name.
func (r *RPC) GetHosts( func (r *RPC) GetHosts(
ctx context.Context, req struct{}, ctx context.Context, req struct{},
) ( ) (

View File

@ -1,5 +1,5 @@
# shellcheck source=../../utils/with-1-data-1-empty-node-network.sh # shellcheck source=../../utils/with-1-data-1-empty-node-cluster.sh
source "$UTILS"/with-1-data-1-empty-node-network.sh source "$UTILS"/with-1-data-1-empty-node-cluster.sh
[ "$(cat a/meta/isle/rpc_port)" = "3900" ] [ "$(cat a/meta/isle/rpc_port)" = "3900" ]
[ "$(cat b/meta/isle/rpc_port)" = "3910" ] [ "$(cat b/meta/isle/rpc_port)" = "3910" ]

View File

@ -1,8 +1,8 @@
# shellcheck source=../../utils/with-1-data-1-empty-node-network.sh # shellcheck source=../../utils/with-1-data-1-empty-node-cluster.sh
source "$UTILS"/with-1-data-1-empty-node-network.sh source "$UTILS"/with-1-data-1-empty-node-cluster.sh
adminBS="$XDG_STATE_HOME"/isle/bootstrap.json adminBS="$XDG_STATE_HOME"/isle/bootstrap.json
bs="$secondus_bootstrap" # set in with-1-data-1-empty-node-network.sh bs="$secondus_bootstrap" # set in with-1-data-1-empty-node-cluster.sh
[ "$(jq -r <"$bs" '.AdminCreationParams')" = "$(jq -r <admin.json '.CreationParams')" ] [ "$(jq -r <"$bs" '.AdminCreationParams')" = "$(jq -r <admin.json '.CreationParams')" ]
[ "$(jq -r <"$bs" '.SignedHostAssigned.Body.Name')" = "secondus" ] [ "$(jq -r <"$bs" '.SignedHostAssigned.Body.Name')" = "secondus" ]

View File

@ -1,5 +1,5 @@
# shellcheck source=../../utils/with-1-data-1-empty-node-network.sh # shellcheck source=../../utils/with-1-data-1-empty-node-cluster.sh
source "$UTILS"/with-1-data-1-empty-node-network.sh source "$UTILS"/with-1-data-1-empty-node-cluster.sh
function assert_a { function assert_a {
want_ip="$1" want_ip="$1"

View File

@ -1,5 +1,5 @@
# shellcheck source=../../utils/with-1-data-1-empty-node-network.sh # shellcheck source=../../utils/with-1-data-1-empty-node-cluster.sh
source "$UTILS"/with-1-data-1-empty-node-network.sh source "$UTILS"/with-1-data-1-empty-node-cluster.sh
function do_tests { function do_tests {
status="$(isle garage cli status | tail -n+3)" status="$(isle garage cli status | tail -n+3)"

View File

@ -1,5 +1,5 @@
# shellcheck source=../../utils/with-1-data-1-empty-node-network.sh # shellcheck source=../../utils/with-1-data-1-empty-node-cluster.sh
source "$UTILS"/with-1-data-1-empty-node-network.sh source "$UTILS"/with-1-data-1-empty-node-cluster.sh
function do_tests { function do_tests {
files="$(isle garage mc -- tree --json garage)" files="$(isle garage mc -- tree --json garage)"

View File

@ -1,5 +1,5 @@
# shellcheck source=../../utils/with-1-data-1-empty-node-network.sh # shellcheck source=../../utils/with-1-data-1-empty-node-cluster.sh
source "$UTILS"/with-1-data-1-empty-node-network.sh source "$UTILS"/with-1-data-1-empty-node-cluster.sh
as_primus as_primus
hosts="$(isle hosts list)" hosts="$(isle hosts list)"

View File

@ -92,8 +92,8 @@ for file in $test_files; do
done done
# Clean up any shared running networks. Each cleanup script is responsible for # Clean up any shared running clusters. Each cleanup script is responsible for
# figuring out if its shared network was actually instantiated during any tests. # figuring out if its shared cluster was actually instantiated during any tests.
if [ -e "$ROOT_TMPDIR/cleanup-pids" ]; then if [ -e "$ROOT_TMPDIR/cleanup-pids" ]; then
echo "Cleaning up running pids" echo "Cleaning up running pids"

View File

@ -12,6 +12,6 @@ cat <<EOF
export TMPDIR="$TMPDIR" export TMPDIR="$TMPDIR"
export XDG_RUNTIME_DIR="$XDG_RUNTIME_DIR" export XDG_RUNTIME_DIR="$XDG_RUNTIME_DIR"
export XDG_STATE_HOME="$XDG_STATE_HOME" export XDG_STATE_HOME="$XDG_STATE_HOME"
export ISLE_DAEMON_HTTP_SOCKET_PATH="$ROOT_TMPDIR/$base-daemon.sock" export ISLE_SOCKET_PATH="$ROOT_TMPDIR/$base-daemon.sock"
cd "$TMPDIR" cd "$TMPDIR"
EOF EOF

View File

@ -25,7 +25,7 @@ as_primus
secondus_bootstrap="$(pwd)/secondus-bootstrap.json" secondus_bootstrap="$(pwd)/secondus-bootstrap.json"
if [ ! -d "$XDG_RUNTIME_DIR/isle" ]; then if [ ! -d "$XDG_RUNTIME_DIR/isle" ]; then
echo "Initializing shared single node network" echo "Initializing shared single node cluster"
mkdir a mkdir a
mkdir b mkdir b
@ -62,7 +62,7 @@ EOF
pid="$!" pid="$!"
echo "Waiting for primus daemon (process $pid) to initialize" echo "Waiting for primus daemon (process $pid) to initialize"
$SHELL "$UTILS/register-cleanup.sh" "$pid" "1-data-1-empty-node-network/primus" $SHELL "$UTILS/register-cleanup.sh" "$pid" "1-data-1-empty-node-cluster/primus"
while ! isle hosts list >/dev/null; do sleep 1; done while ! isle hosts list >/dev/null; do sleep 1; done
@ -86,7 +86,7 @@ EOF
pid="$!" pid="$!"
echo "Waiting for secondus daemon (process $!) to initialize" echo "Waiting for secondus daemon (process $!) to initialize"
$SHELL "$UTILS/register-cleanup.sh" "$pid" "1-data-1-empty-node-network/secondus" $SHELL "$UTILS/register-cleanup.sh" "$pid" "1-data-1-empty-node-cluster/secondus"
while ! isle hosts list >/dev/null; do sleep 1; done while ! isle hosts list >/dev/null; do sleep 1; done
) )
fi fi