From a8893e4fc6b3bbd5a720363e2807d2faee6b19c8 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Mon, 17 Jun 2024 20:51:02 +0200 Subject: [PATCH] Move daemon sub-process logic into daemon package --- go/cmd/entrypoint/admin.go | 84 +++--- go/cmd/entrypoint/daemon.go | 94 ++----- go/cmd/entrypoint/daemon_util.go | 22 +- go/cmd/entrypoint/garage_util.go | 206 +-------------- go/cmd/entrypoint/main.go | 3 +- .../child_dnsmasq.go} | 15 +- go/daemon/child_garage.go | 206 +++++++++++++++ .../nebula_util.go => daemon/child_nebula.go} | 16 +- go/daemon/config.go | 83 +++++- go/daemon/daemon.go | 239 +++++++++++++----- go/daemon/jigs.go | 18 ++ 11 files changed, 560 insertions(+), 426 deletions(-) rename go/{cmd/entrypoint/dnsmasq_util.go => daemon/child_dnsmasq.go} (85%) create mode 100644 go/daemon/child_garage.go rename go/{cmd/entrypoint/nebula_util.go => daemon/child_nebula.go} (90%) create mode 100644 go/daemon/jigs.go diff --git a/go/cmd/entrypoint/admin.go b/go/cmd/entrypoint/admin.go index 5577c36..e1ff7eb 100644 --- a/go/cmd/entrypoint/admin.go +++ b/go/cmd/entrypoint/admin.go @@ -1,7 +1,6 @@ package main import ( - "context" "crypto/rand" "encoding/hex" "errors" @@ -15,7 +14,6 @@ import ( "os" "strings" - "code.betamike.com/micropelago/pmux/pmuxlib" "github.com/mediocregopher/mediocre-go-lib/v2/mlog" ) @@ -164,54 +162,30 @@ var subCmdAdminCreateNetwork = subCmd{ return fmt.Errorf("initializing bootstrap data: %w", err) } - if hostBootstrap, daemonConfig, err = coalesceDaemonConfigAndBootstrap(hostBootstrap, daemonConfig); err != nil { + if hostBootstrap, err = coalesceDaemonConfigAndBootstrap(hostBootstrap, daemonConfig); err != nil { return fmt.Errorf("merging daemon config into bootstrap data: %w", err) } - nebulaPmuxProcConfig, err := nebulaPmuxProcConfig(hostBootstrap, daemonConfig) + daemonInst, err := daemon.New( + ctx, + logger.WithNamespace("daemon"), + daemonConfig, + hostBootstrap, + envRuntimeDirPath, + envBinDirPath, + &daemon.Opts{ + // SkipHostBootstrapPush is required, because the global bucket + // hasn't actually been initialized yet, so there's nowhere to + // push to. + SkipHostBootstrapPush: true, + + // NOTE both stdout and stderr are sent to stderr, so that the + // user can pipe the resulting admin.json to stdout. + Stdout: os.Stderr, + }, + ) if err != nil { - return fmt.Errorf("generating nebula config: %w", err) - } - - garagePmuxProcConfigs, err := garagePmuxProcConfigs(hostBootstrap, daemonConfig) - if err != nil { - return fmt.Errorf("generating garage configs: %w", err) - } - - pmuxConfig := pmuxlib.Config{ - Processes: append( - []pmuxlib.ProcessConfig{ - nebulaPmuxProcConfig, - }, - garagePmuxProcConfigs..., - ), - } - - ctx, cancel := context.WithCancel(ctx) - pmuxDoneCh := make(chan struct{}) - - logger.Info(ctx, "starting child processes") - go func() { - // NOTE both stdout and stderr are sent to stderr, so that the user - // can pipe the resulting admin.json to stdout. - pmuxlib.Run(ctx, os.Stderr, os.Stderr, pmuxConfig) - close(pmuxDoneCh) - }() - - defer func() { - cancel() - logger.Info(ctx, "waiting for child processes to exit") - <-pmuxDoneCh - }() - - logger.Info(ctx, "waiting for garage instances to come online") - if err := waitForGarageAndNebula(ctx, logger, hostBootstrap, daemonConfig); err != nil { - return fmt.Errorf("waiting for garage to start up: %w", err) - } - - logger.Info(ctx, "applying initial garage layout") - if err := garageApplyLayout(ctx, logger, hostBootstrap, daemonConfig); err != nil { - return fmt.Errorf("applying initial garage layout: %w", err) + return fmt.Errorf("initializing daemon: %w", err) } logger.Info(ctx, "initializing garage shared global bucket") @@ -219,6 +193,17 @@ var subCmdAdminCreateNetwork = subCmd{ ctx, logger, hostBootstrap, daemonConfig, ) + if cErr := (garage.AdminClientError{}); errors.As(err, &cErr) && cErr.StatusCode == 409 { + return fmt.Errorf("shared global bucket has already been created, are the storage allocations from a previously initialized isle being used?") + + } else if err != nil { + return fmt.Errorf("initializing garage shared global bucket: %w", err) + } + + if err := daemonInst.Shutdown(ctx); err != nil { + return fmt.Errorf("shutting down daemon: %w (this can mean there are zombie children leftover)", err) + } + hostBootstrap.Garage.GlobalBucketS3APICredentials = garageGlobalBucketCreds // rewrite the bootstrap now that the global bucket creds have been @@ -227,13 +212,6 @@ var subCmdAdminCreateNetwork = subCmd{ return fmt.Errorf("writing bootstrap file: %w", err) } - if cErr := (garage.AdminClientError{}); errors.As(err, &cErr) && cErr.StatusCode == 409 { - return fmt.Errorf("shared global bucket has already been created, are the storage allocations from a previously initialized isle being used?") - - } else if err != nil { - return fmt.Errorf("initializing garage shared global bucket: %w", err) - } - logger.Info(ctx, "cluster initialized successfully, writing admin.json to stdout") adm := admin.Admin{ diff --git a/go/cmd/entrypoint/daemon.go b/go/cmd/entrypoint/daemon.go index 3901613..3abc490 100644 --- a/go/cmd/entrypoint/daemon.go +++ b/go/cmd/entrypoint/daemon.go @@ -7,13 +7,11 @@ import ( "fmt" "io/fs" "os" - "sync" "time" "isle/bootstrap" "isle/daemon" - "code.betamike.com/micropelago/pmux/pmuxlib" "github.com/mediocregopher/mediocre-go-lib/v2/mctx" "github.com/mediocregopher/mediocre-go-lib/v2/mlog" ) @@ -94,77 +92,27 @@ func runDaemonPmuxOnce( ) ( bootstrap.Bootstrap, error, ) { - - nebulaPmuxProcConfig, err := nebulaPmuxProcConfig(hostBootstrap, daemonConfig) + daemonInst, err := daemon.New( + ctx, + logger.WithNamespace("daemon"), + daemonConfig, + hostBootstrap, + envRuntimeDirPath, + envBinDirPath, + nil, + ) if err != nil { - return bootstrap.Bootstrap{}, fmt.Errorf("generating nebula config: %w", err) + return bootstrap.Bootstrap{}, fmt.Errorf("initializing daemon: %w", err) } - - dnsmasqPmuxProcConfig, err := dnsmasqPmuxProcConfig(hostBootstrap, daemonConfig) - if err != nil { - return bootstrap.Bootstrap{}, fmt.Errorf("generating dnsmasq config: %w", err) - } - - garagePmuxProcConfigs, err := garagePmuxProcConfigs(hostBootstrap, daemonConfig) - if err != nil { - return bootstrap.Bootstrap{}, fmt.Errorf("generating garage children configs: %w", err) - } - - pmuxConfig := pmuxlib.Config{ - Processes: append( - []pmuxlib.ProcessConfig{ - nebulaPmuxProcConfig, - dnsmasqPmuxProcConfig, - }, - garagePmuxProcConfigs..., - ), - } - - var wg sync.WaitGroup - defer wg.Wait() - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - wg.Add(1) - go func() { - defer wg.Done() - pmuxlib.Run(ctx, os.Stdout, os.Stderr, pmuxConfig) + defer func() { + // context.Background() is deliberate here. At this point the entire + // process is shutting down, so whatever owns the process should decide + // when it's been too long. + if err := daemonInst.Shutdown(context.Background()); err != nil { + logger.Error(ctx, "failed to cleanly shutdown daemon", err) + } }() - if err := waitForGarageAndNebula(ctx, logger, hostBootstrap, daemonConfig); err != nil { - return bootstrap.Bootstrap{}, fmt.Errorf("waiting for nebula/garage to start up: %w", err) - } - - if len(daemonConfig.Storage.Allocations) > 0 { - - err := doOnce(ctx, func(ctx context.Context) error { - if err := garageApplyLayout(ctx, logger, hostBootstrap, daemonConfig); err != nil { - logger.Error(ctx, "applying garage layout", err) - return err - } - - return nil - }) - - if err != nil { - return bootstrap.Bootstrap{}, fmt.Errorf("applying garage layout: %w", err) - } - } - - err = doOnce(ctx, func(ctx context.Context) error { - if err := hostBootstrap.PutGarageBoostrapHost(ctx); err != nil { - logger.Error(ctx, "updating host info in garage", err) - return err - } - - return nil - }) - - if err != nil { - return bootstrap.Bootstrap{}, fmt.Errorf("updating host info in garage: %w", err) - } - ticker := time.NewTicker(3 * time.Minute) defer ticker.Stop() @@ -176,7 +124,7 @@ func runDaemonPmuxOnce( case <-ticker.C: - fmt.Fprintln(os.Stderr, "checking for changes to bootstrap") + logger.Info(ctx, "checking for changes to bootstrap") var ( changed bool @@ -301,9 +249,9 @@ var subCmdDaemon = subCmd{ // we update this Host's data using whatever configuration has been // provided by the daemon config. This way the daemon has the most // up-to-date possible bootstrap. This updated bootstrap will later get - // updated in garage using bootstrap.PutGarageBoostrapHost, so other - // hosts will see it as well. - if hostBootstrap, daemonConfig, err = coalesceDaemonConfigAndBootstrap(hostBootstrap, daemonConfig); err != nil { + // updated in garage as a background daemon task, so other hosts will + // see it as well. + if hostBootstrap, err = coalesceDaemonConfigAndBootstrap(hostBootstrap, daemonConfig); err != nil { return fmt.Errorf("merging daemon config into bootstrap data: %w", err) } diff --git a/go/cmd/entrypoint/daemon_util.go b/go/cmd/entrypoint/daemon_util.go index a85e8b4..17588a9 100644 --- a/go/cmd/entrypoint/daemon_util.go +++ b/go/cmd/entrypoint/daemon_util.go @@ -1,19 +1,17 @@ package main import ( - "context" "fmt" "isle/bootstrap" "isle/daemon" "isle/garage/garagesrv" - "time" ) func coalesceDaemonConfigAndBootstrap( hostBootstrap bootstrap.Bootstrap, daemonConfig daemon.Config, ) ( - bootstrap.Bootstrap, daemon.Config, error, + bootstrap.Bootstrap, error, ) { host := bootstrap.Host{ @@ -31,7 +29,7 @@ func coalesceDaemonConfigAndBootstrap( id, rpcPort, err := garagesrv.InitAlloc(alloc.MetaPath, alloc.RPCPort) if err != nil { - return bootstrap.Bootstrap{}, daemon.Config{}, fmt.Errorf("initializing alloc at %q: %w", alloc.MetaPath, err) + return bootstrap.Bootstrap{}, fmt.Errorf("initializing alloc at %q: %w", alloc.MetaPath, err) } host.Garage.Instances = append(host.Garage.Instances, bootstrap.GarageHostInstance{ @@ -47,20 +45,8 @@ func coalesceDaemonConfigAndBootstrap( hostBootstrap.Hosts[host.Name] = host if err := writeBootstrapToDataDir(hostBootstrap); err != nil { - return bootstrap.Bootstrap{}, daemon.Config{}, fmt.Errorf("writing bootstrap file: %w", err) + return bootstrap.Bootstrap{}, fmt.Errorf("writing bootstrap file: %w", err) } - return hostBootstrap, daemonConfig, nil -} - -func doOnce(ctx context.Context, fn func(context.Context) error) error { - for { - if err := fn(ctx); err == nil { - return nil - } else if ctxErr := ctx.Err(); ctxErr != nil { - return ctxErr - } - - time.Sleep(1 * time.Second) - } + return hostBootstrap, nil } diff --git a/go/cmd/entrypoint/garage_util.go b/go/cmd/entrypoint/garage_util.go index a96b688..6c63865 100644 --- a/go/cmd/entrypoint/garage_util.go +++ b/go/cmd/entrypoint/garage_util.go @@ -6,177 +6,10 @@ import ( "isle/bootstrap" "isle/daemon" "isle/garage" - "isle/garage/garagesrv" - "net" - "path/filepath" - "strconv" - "code.betamike.com/micropelago/pmux/pmuxlib" - "github.com/mediocregopher/mediocre-go-lib/v2/mctx" "github.com/mediocregopher/mediocre-go-lib/v2/mlog" ) -func garageAdminClientLogger(logger *mlog.Logger) *mlog.Logger { - return logger.WithNamespace("garageAdminClient") -} - -// newGarageAdminClient will return an AdminClient for a local garage instance, -// or it will _panic_ if there is no local instance configured. -func newGarageAdminClient( - logger *mlog.Logger, - hostBootstrap bootstrap.Bootstrap, - daemonConfig daemon.Config, -) *garage.AdminClient { - - thisHost := hostBootstrap.ThisHost() - - return garage.NewAdminClient( - garageAdminClientLogger(logger), - net.JoinHostPort( - thisHost.IP().String(), - strconv.Itoa(daemonConfig.Storage.Allocations[0].AdminPort), - ), - hostBootstrap.Garage.AdminToken, - ) -} - -func waitForGarageAndNebula( - ctx context.Context, - logger *mlog.Logger, - hostBootstrap bootstrap.Bootstrap, - daemonConfig daemon.Config, -) error { - - if err := waitForNebula(ctx, hostBootstrap); err != nil { - return fmt.Errorf("waiting for nebula to start: %w", err) - } - - allocs := daemonConfig.Storage.Allocations - - // if this host doesn't have any allocations specified then fall back to - // waiting for nebula - if len(allocs) == 0 { - return nil - } - - adminClientLogger := garageAdminClientLogger(logger) - - for _, alloc := range allocs { - - adminAddr := net.JoinHostPort( - hostBootstrap.ThisHost().IP().String(), - strconv.Itoa(alloc.AdminPort), - ) - - adminClient := garage.NewAdminClient( - adminClientLogger, - adminAddr, - hostBootstrap.Garage.AdminToken, - ) - - ctx := mctx.Annotate(ctx, "garageAdminAddr", adminAddr) - logger.Debug(ctx, "wating for garage instance to start") - - if err := adminClient.Wait(ctx); err != nil { - return fmt.Errorf("waiting for garage instance %q to start up: %w", adminAddr, err) - } - } - - return nil - -} - -// bootstrapGarageHostForAlloc returns the bootstrap.GarageHostInstance which -// corresponds with the given alloc from the daemon config. This will panic if -// no associated instance can be found. -// -// This assumes that coalesceDaemonConfigAndBootstrap has already been called. -func bootstrapGarageHostForAlloc( - host bootstrap.Host, - alloc daemon.ConfigStorageAllocation, -) bootstrap.GarageHostInstance { - - for _, inst := range host.Garage.Instances { - if inst.RPCPort == alloc.RPCPort { - return inst - } - } - - panic(fmt.Sprintf("could not find alloc %+v in the bootstrap data", alloc)) -} - -func garageWriteChildConfig( - hostBootstrap bootstrap.Bootstrap, - alloc daemon.ConfigStorageAllocation, -) ( - string, error, -) { - - thisHost := hostBootstrap.ThisHost() - id := bootstrapGarageHostForAlloc(thisHost, alloc).ID - - peer := garage.LocalPeer{ - RemotePeer: garage.RemotePeer{ - ID: id, - IP: thisHost.IP().String(), - RPCPort: alloc.RPCPort, - S3APIPort: alloc.S3APIPort, - }, - AdminPort: alloc.AdminPort, - } - - garageTomlPath := filepath.Join( - envRuntimeDirPath, fmt.Sprintf("garage-%d.toml", alloc.RPCPort), - ) - - err := garagesrv.WriteGarageTomlFile(garageTomlPath, garagesrv.GarageTomlData{ - MetaPath: alloc.MetaPath, - DataPath: alloc.DataPath, - - RPCSecret: hostBootstrap.Garage.RPCSecret, - AdminToken: hostBootstrap.Garage.AdminToken, - - LocalPeer: peer, - BootstrapPeers: hostBootstrap.GaragePeers(), - }) - - if err != nil { - return "", fmt.Errorf("creating garage.toml file at %q: %w", garageTomlPath, err) - } - - return garageTomlPath, nil -} - -func garagePmuxProcConfigs( - hostBootstrap bootstrap.Bootstrap, - daemonConfig daemon.Config, -) ( - []pmuxlib.ProcessConfig, error, -) { - - var pmuxProcConfigs []pmuxlib.ProcessConfig - - for _, alloc := range daemonConfig.Storage.Allocations { - - childConfigPath, err := garageWriteChildConfig(hostBootstrap, alloc) - - if err != nil { - return nil, fmt.Errorf("writing child config file for alloc %+v: %w", alloc, err) - } - - pmuxProcConfigs = append(pmuxProcConfigs, pmuxlib.ProcessConfig{ - Name: fmt.Sprintf("garage-%d", alloc.RPCPort), - Cmd: binPath("garage"), - Args: []string{"-c", childConfigPath, "server"}, - StartAfterFunc: func(ctx context.Context) error { - return waitForNebula(ctx, hostBootstrap) - }, - }) - } - - return pmuxProcConfigs, nil -} - func garageInitializeGlobalBucket( ctx context.Context, logger *mlog.Logger, @@ -185,7 +18,9 @@ func garageInitializeGlobalBucket( ) ( garage.S3APICredentials, error, ) { - adminClient := newGarageAdminClient(logger, hostBootstrap, daemonConfig) + adminClient := daemon.NewGarageAdminClient( + logger, hostBootstrap, daemonConfig, + ) creds, err := adminClient.CreateS3APICredentials( ctx, garage.GlobalBucketS3APICredentialsName, @@ -213,38 +48,3 @@ func garageInitializeGlobalBucket( return creds, nil } - -func garageApplyLayout( - ctx context.Context, - logger *mlog.Logger, - hostBootstrap bootstrap.Bootstrap, - daemonConfig daemon.Config, -) error { - - var ( - adminClient = newGarageAdminClient(logger, hostBootstrap, daemonConfig) - thisHost = hostBootstrap.ThisHost() - hostName = thisHost.Name - allocs = daemonConfig.Storage.Allocations - peers = make([]garage.PeerLayout, len(allocs)) - ) - - for i, alloc := range allocs { - - id := bootstrapGarageHostForAlloc(thisHost, alloc).ID - - zone := hostName - if alloc.Zone != "" { - zone = alloc.Zone - } - - peers[i] = garage.PeerLayout{ - ID: id, - Capacity: alloc.Capacity * 1_000_000_000, - Zone: zone, - Tags: []string{}, - } - } - - return adminClient.ApplyLayout(ctx, peers) -} diff --git a/go/cmd/entrypoint/main.go b/go/cmd/entrypoint/main.go index 52ed00d..111d037 100644 --- a/go/cmd/entrypoint/main.go +++ b/go/cmd/entrypoint/main.go @@ -44,10 +44,11 @@ var ( "STATE_DIRECTORY", filepath.Join(xdg.StateHome, "isle"), ) + envBinDirPath = filepath.Join(envAppDirPath, "bin") ) func binPath(name string) string { - return filepath.Join(envAppDirPath, "bin", name) + return filepath.Join(envBinDirPath, name) } func main() { diff --git a/go/cmd/entrypoint/dnsmasq_util.go b/go/daemon/child_dnsmasq.go similarity index 85% rename from go/cmd/entrypoint/dnsmasq_util.go rename to go/daemon/child_dnsmasq.go index 9752011..766c2bf 100644 --- a/go/cmd/entrypoint/dnsmasq_util.go +++ b/go/daemon/child_dnsmasq.go @@ -1,10 +1,9 @@ -package main +package daemon import ( - "isle/bootstrap" - "isle/daemon" - "isle/dnsmasq" "fmt" + "isle/bootstrap" + "isle/dnsmasq" "path/filepath" "sort" @@ -12,13 +11,13 @@ import ( ) func dnsmasqPmuxProcConfig( + runtimeDirPath, binDirPath string, hostBootstrap bootstrap.Bootstrap, - daemonConfig daemon.Config, + daemonConfig Config, ) ( pmuxlib.ProcessConfig, error, ) { - - confPath := filepath.Join(envRuntimeDirPath, "dnsmasq.conf") + confPath := filepath.Join(runtimeDirPath, "dnsmasq.conf") hostsSlice := make([]dnsmasq.ConfDataHost, 0, len(hostBootstrap.Hosts)) for _, host := range hostBootstrap.Hosts { @@ -45,7 +44,7 @@ func dnsmasqPmuxProcConfig( return pmuxlib.ProcessConfig{ Name: "dnsmasq", - Cmd: binPath("dnsmasq"), + Cmd: filepath.Join(binDirPath, "dnsmasq"), Args: []string{"-d", "-C", confPath}, }, nil } diff --git a/go/daemon/child_garage.go b/go/daemon/child_garage.go new file mode 100644 index 0000000..538b026 --- /dev/null +++ b/go/daemon/child_garage.go @@ -0,0 +1,206 @@ +package daemon + +import ( + "context" + "fmt" + "isle/bootstrap" + "isle/garage" + "isle/garage/garagesrv" + "net" + "path/filepath" + "strconv" + + "code.betamike.com/micropelago/pmux/pmuxlib" + "github.com/mediocregopher/mediocre-go-lib/v2/mctx" + "github.com/mediocregopher/mediocre-go-lib/v2/mlog" +) + +func garageAdminClientLogger(logger *mlog.Logger) *mlog.Logger { + return logger.WithNamespace("garageAdminClient") +} + +// NewGarageAdminClient will return an AdminClient for a local garage instance, +// or it will _panic_ if there is no local instance configured. +func NewGarageAdminClient( + logger *mlog.Logger, + hostBootstrap bootstrap.Bootstrap, + config Config, +) *garage.AdminClient { + + thisHost := hostBootstrap.ThisHost() + + return garage.NewAdminClient( + garageAdminClientLogger(logger), + net.JoinHostPort( + thisHost.IP().String(), + strconv.Itoa(config.Storage.Allocations[0].AdminPort), + ), + hostBootstrap.Garage.AdminToken, + ) +} + +func (d *daemon) waitForGarage(ctx context.Context) error { + + allocs := d.config.Storage.Allocations + + // if this host doesn't have any allocations specified then fall back to + // waiting for nebula + if len(allocs) == 0 { + return nil + } + + adminClientLogger := garageAdminClientLogger(d.logger) + + for _, alloc := range allocs { + + adminAddr := net.JoinHostPort( + d.hostBootstrap.ThisHost().IP().String(), + strconv.Itoa(alloc.AdminPort), + ) + + adminClient := garage.NewAdminClient( + adminClientLogger, + adminAddr, + d.hostBootstrap.Garage.AdminToken, + ) + + ctx := mctx.Annotate(ctx, "garageAdminAddr", adminAddr) + d.logger.Debug(ctx, "waiting for garage instance to start") + + if err := adminClient.Wait(ctx); err != nil { + return fmt.Errorf("waiting for garage instance %q to start up: %w", adminAddr, err) + } + } + + return nil + +} + +// bootstrapGarageHostForAlloc returns the bootstrap.GarageHostInstance which +// corresponds with the given alloc from the daemon config. This will panic if +// no associated instance can be found. +// +// This assumes that coalesceDaemonConfigAndBootstrap has already been called. +func bootstrapGarageHostForAlloc( + host bootstrap.Host, + alloc ConfigStorageAllocation, +) bootstrap.GarageHostInstance { + + for _, inst := range host.Garage.Instances { + if inst.RPCPort == alloc.RPCPort { + return inst + } + } + + panic(fmt.Sprintf("could not find alloc %+v in the bootstrap data", alloc)) +} + +func garageWriteChildConfig( + runtimeDirPath string, + hostBootstrap bootstrap.Bootstrap, + alloc ConfigStorageAllocation, +) ( + string, error, +) { + + thisHost := hostBootstrap.ThisHost() + id := bootstrapGarageHostForAlloc(thisHost, alloc).ID + + peer := garage.LocalPeer{ + RemotePeer: garage.RemotePeer{ + ID: id, + IP: thisHost.IP().String(), + RPCPort: alloc.RPCPort, + S3APIPort: alloc.S3APIPort, + }, + AdminPort: alloc.AdminPort, + } + + garageTomlPath := filepath.Join( + runtimeDirPath, fmt.Sprintf("garage-%d.toml", alloc.RPCPort), + ) + + err := garagesrv.WriteGarageTomlFile(garageTomlPath, garagesrv.GarageTomlData{ + MetaPath: alloc.MetaPath, + DataPath: alloc.DataPath, + + RPCSecret: hostBootstrap.Garage.RPCSecret, + AdminToken: hostBootstrap.Garage.AdminToken, + + LocalPeer: peer, + BootstrapPeers: hostBootstrap.GaragePeers(), + }) + + if err != nil { + return "", fmt.Errorf("creating garage.toml file at %q: %w", garageTomlPath, err) + } + + return garageTomlPath, nil +} + +func garagePmuxProcConfigs( + runtimeDirPath, binDirPath string, + hostBootstrap bootstrap.Bootstrap, + daemonConfig Config, +) ( + []pmuxlib.ProcessConfig, error, +) { + + var pmuxProcConfigs []pmuxlib.ProcessConfig + + for _, alloc := range daemonConfig.Storage.Allocations { + + childConfigPath, err := garageWriteChildConfig( + runtimeDirPath, hostBootstrap, alloc, + ) + if err != nil { + return nil, fmt.Errorf("writing child config file for alloc %+v: %w", alloc, err) + } + + pmuxProcConfigs = append(pmuxProcConfigs, pmuxlib.ProcessConfig{ + Name: fmt.Sprintf("garage-%d", alloc.RPCPort), + Cmd: filepath.Join(binDirPath, "garage"), + Args: []string{"-c", childConfigPath, "server"}, + StartAfterFunc: func(ctx context.Context) error { + return waitForNebula(ctx, hostBootstrap) + }, + }) + } + + return pmuxProcConfigs, nil +} + +func garageApplyLayout( + ctx context.Context, + logger *mlog.Logger, + hostBootstrap bootstrap.Bootstrap, + config Config, +) error { + + var ( + adminClient = NewGarageAdminClient(logger, hostBootstrap, config) + thisHost = hostBootstrap.ThisHost() + hostName = thisHost.Name + allocs = config.Storage.Allocations + peers = make([]garage.PeerLayout, len(allocs)) + ) + + for i, alloc := range allocs { + + id := bootstrapGarageHostForAlloc(thisHost, alloc).ID + + zone := hostName + if alloc.Zone != "" { + zone = alloc.Zone + } + + peers[i] = garage.PeerLayout{ + ID: id, + Capacity: alloc.Capacity * 1_000_000_000, + Zone: zone, + Tags: []string{}, + } + } + + return adminClient.ApplyLayout(ctx, peers) +} diff --git a/go/cmd/entrypoint/nebula_util.go b/go/daemon/child_nebula.go similarity index 90% rename from go/cmd/entrypoint/nebula_util.go rename to go/daemon/child_nebula.go index ffbcd1b..b9f2b7a 100644 --- a/go/cmd/entrypoint/nebula_util.go +++ b/go/daemon/child_nebula.go @@ -1,10 +1,9 @@ -package main +package daemon import ( "context" "fmt" "isle/bootstrap" - "isle/daemon" "isle/yamlutil" "net" "path/filepath" @@ -17,14 +16,16 @@ import ( // this by attempting to create a UDP connection which has the nebula IP set as // its source. If this succeeds we can assume that at the very least the nebula // interface has been initialized. -func waitForNebula(ctx context.Context, hostBootstrap bootstrap.Bootstrap) error { +func waitForNebula( + ctx context.Context, hostBootstrap bootstrap.Bootstrap, +) error { ip := hostBootstrap.ThisHost().IP() lUdpAddr := &net.UDPAddr{IP: ip, Port: 0} rUdpAddr := &net.UDPAddr{IP: ip, Port: 45535} - return doOnce(ctx, func(context.Context) error { + return until(ctx, func(context.Context) error { conn, err := net.DialUDP("udp", lUdpAddr, rUdpAddr) if err != nil { return err @@ -35,8 +36,9 @@ func waitForNebula(ctx context.Context, hostBootstrap bootstrap.Bootstrap) error } func nebulaPmuxProcConfig( + runtimeDirPath, binDirPath string, hostBootstrap bootstrap.Bootstrap, - daemonConfig daemon.Config, + daemonConfig Config, ) ( pmuxlib.ProcessConfig, error, ) { @@ -122,7 +124,7 @@ func nebulaPmuxProcConfig( } } - nebulaYmlPath := filepath.Join(envRuntimeDirPath, "nebula.yml") + nebulaYmlPath := filepath.Join(runtimeDirPath, "nebula.yml") if err := yamlutil.WriteYamlFile(config, nebulaYmlPath, 0440); err != nil { return pmuxlib.ProcessConfig{}, fmt.Errorf("writing nebula.yml to %q: %w", nebulaYmlPath, err) @@ -130,7 +132,7 @@ func nebulaPmuxProcConfig( return pmuxlib.ProcessConfig{ Name: "nebula", - Cmd: binPath("nebula"), + Cmd: filepath.Join(binDirPath, "nebula"), Args: []string{"-config", nebulaYmlPath}, }, nil } diff --git a/go/daemon/config.go b/go/daemon/config.go index a021112..8f49cac 100644 --- a/go/daemon/config.go +++ b/go/daemon/config.go @@ -1,6 +1,20 @@ package daemon -import "strconv" +import ( + "fmt" + "io" + "isle/yamlutil" + "os" + "path/filepath" + "strconv" + + "github.com/imdario/mergo" + "gopkg.in/yaml.v3" +) + +func defaultConfigPath(appDirPath string) string { + return filepath.Join(appDirPath, "etc", "daemon.yml") +} type ConfigTun struct { Device string `yaml:"device"` @@ -100,3 +114,70 @@ func (c *Config) fillDefaults() { firewallGarageInbound..., ) } + +// CopyDefaultConfig copies the daemon config file embedded in the AppDir into +// the given io.Writer. +func CopyDefaultConfig(into io.Writer, appDirPath string) error { + + defaultConfigPath := defaultConfigPath(appDirPath) + + f, err := os.Open(defaultConfigPath) + if err != nil { + return fmt.Errorf("opening daemon config at %q: %w", defaultConfigPath, err) + } + + defer f.Close() + + if _, err := io.Copy(into, f); err != nil { + return fmt.Errorf("copying daemon config from %q: %w", defaultConfigPath, err) + } + + return nil +} + +// LoadConfig loads the daemon config from userConfigPath, merges it with +// the default found in the appDirPath, and returns the result. +// +// If userConfigPath is not given then the default is loaded and returned. +func LoadConfig( + appDirPath, userConfigPath string, +) ( + Config, error, +) { + + defaultConfigPath := defaultConfigPath(appDirPath) + + var fullDaemon map[string]interface{} + + if err := yamlutil.LoadYamlFile(&fullDaemon, defaultConfigPath); err != nil { + return Config{}, fmt.Errorf("parsing default daemon config file: %w", err) + } + + if userConfigPath != "" { + + var daemonConfig map[string]interface{} + if err := yamlutil.LoadYamlFile(&daemonConfig, userConfigPath); err != nil { + return Config{}, fmt.Errorf("parsing %q: %w", userConfigPath, err) + } + + err := mergo.Merge(&fullDaemon, daemonConfig, mergo.WithOverride) + if err != nil { + return Config{}, fmt.Errorf("merging contents of file %q: %w", userConfigPath, err) + } + } + + fullDaemonB, err := yaml.Marshal(fullDaemon) + + if err != nil { + return Config{}, fmt.Errorf("yaml marshaling: %w", err) + } + + var config Config + if err := yaml.Unmarshal(fullDaemonB, &config); err != nil { + return Config{}, fmt.Errorf("yaml unmarshaling back into Config struct: %w", err) + } + + config.fillDefaults() + + return config, nil +} diff --git a/go/daemon/daemon.go b/go/daemon/daemon.go index 4379798..2610f6c 100644 --- a/go/daemon/daemon.go +++ b/go/daemon/daemon.go @@ -1,85 +1,200 @@ -// Package daemon contains types and functions related specifically to the -// isle daemon. +// Package daemon implements the isle daemon, which is a long-running service +// managing all isle background tasks and sub-processes for a single cluster. package daemon import ( + "context" "fmt" "io" - "isle/yamlutil" + "isle/bootstrap" "os" - "path/filepath" + "time" - "github.com/imdario/mergo" - "gopkg.in/yaml.v3" + "code.betamike.com/micropelago/pmux/pmuxlib" + "github.com/mediocregopher/mediocre-go-lib/v2/mlog" ) -func defaultConfigPath(appDirPath string) string { - return filepath.Join(appDirPath, "etc", "daemon.yml") +type daemon struct { + logger *mlog.Logger + config Config + hostBootstrap bootstrap.Bootstrap + opts Opts + + pmuxCancelFn context.CancelFunc + pmuxStoppedCh chan struct{} } -// CopyDefaultConfig copies the daemon config file embedded in the AppDir into -// the given io.Writer. -func CopyDefaultConfig(into io.Writer, appDirPath string) error { +// Daemon presents all functionality required for client frontends to interact +// with isle, typically via the unix socket. +type Daemon interface { - defaultConfigPath := defaultConfigPath(appDirPath) + // Shutdown blocks until all resources held or created by the daemon, + // including child processes it has started, have been cleaned up, or until + // the context is canceled. + // + // If this returns an error then it's possible that child processes are + // still running and are no longer managed. + Shutdown(context.Context) error +} - f, err := os.Open(defaultConfigPath) - if err != nil { - return fmt.Errorf("opening daemon config at %q: %w", defaultConfigPath, err) +// Opts are optional parameters which can be passed in when initializing a new +// Daemon instance. A nil Opts is equivalent to a zero value. +type Opts struct { + // SkipHostBootstrapPush, if set, will cause the Daemon to not push the + // bootstrap to garage upon a successful initialization. + SkipHostBootstrapPush bool + + // Stdout and Stderr are what the associated outputs from child processes + // will be directed to. + Stdout, Stderr io.Writer +} + +func (o *Opts) withDefaults() *Opts { + if o == nil { + o = new(Opts) } - defer f.Close() + if o.Stdout == nil { + o.Stdout = os.Stdout + } - if _, err := io.Copy(into, f); err != nil { - return fmt.Errorf("copying daemon config from %q: %w", defaultConfigPath, err) + if o.Stderr == nil { + o.Stderr = os.Stderr + } + + return o +} + +// New initialized and returns a Daemon. If initialization fails an error is +// returned. +func New( + ctx context.Context, + logger *mlog.Logger, + config Config, + hostBootstrap bootstrap.Bootstrap, + runtimeDirPath, binDirPath string, + opts *Opts, +) ( + Daemon, error, +) { + opts = opts.withDefaults() + + nebulaPmuxProcConfig, err := nebulaPmuxProcConfig( + runtimeDirPath, binDirPath, hostBootstrap, config, + ) + if err != nil { + return nil, fmt.Errorf("generating nebula config: %w", err) + } + + dnsmasqPmuxProcConfig, err := dnsmasqPmuxProcConfig( + runtimeDirPath, binDirPath, hostBootstrap, config, + ) + if err != nil { + return nil, fmt.Errorf("generating dnsmasq config: %w", err) + } + + garagePmuxProcConfigs, err := garagePmuxProcConfigs( + runtimeDirPath, binDirPath, hostBootstrap, config, + ) + if err != nil { + return nil, fmt.Errorf("generating garage children configs: %w", err) + } + + pmuxConfig := pmuxlib.Config{ + Processes: append( + []pmuxlib.ProcessConfig{ + nebulaPmuxProcConfig, + dnsmasqPmuxProcConfig, + }, + garagePmuxProcConfigs..., + ), + } + + pmuxCtx, pmuxCancelFn := context.WithCancel(context.Background()) + + d := &daemon{ + logger: logger, + config: config, + hostBootstrap: hostBootstrap, + opts: *opts, + pmuxCancelFn: pmuxCancelFn, + pmuxStoppedCh: make(chan struct{}), + } + + go func() { + defer close(d.pmuxStoppedCh) + pmuxlib.Run(pmuxCtx, d.opts.Stdout, d.opts.Stderr, pmuxConfig) + }() + + if initErr := d.postPmuxInit(ctx); initErr != nil { + logger.Warn(ctx, "failed to initialize daemon, shutting down child processes", err) + shutdownCtx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + if err := d.Shutdown(shutdownCtx); err != nil { + panic(fmt.Sprintf( + "failed to shut down child processes after initialization"+ + " error, there may be zombie children leftover."+ + " Original error: %v", + initErr, + )) + } + + return nil, initErr + } + + return d, nil +} + +func (d *daemon) postPmuxInit(ctx context.Context) error { + d.logger.Info(ctx, "waiting for nebula VPN to come online") + if err := waitForNebula(ctx, d.hostBootstrap); err != nil { + return fmt.Errorf("waiting for nebula to start: %w", err) + } + + d.logger.Info(ctx, "waiting for garage instances to come online") + if err := d.waitForGarage(ctx); err != nil { + return fmt.Errorf("waiting for garage to start: %w", err) + } + + if len(d.config.Storage.Allocations) > 0 { + + err := until(ctx, func(ctx context.Context) error { + err := garageApplyLayout(ctx, d.logger, d.hostBootstrap, d.config) + if err != nil { + d.logger.Error(ctx, "applying garage layout", err) + return err + } + + return nil + }) + + if err != nil { + return fmt.Errorf("applying garage layout: %w", err) + } + } + + if !d.opts.SkipHostBootstrapPush { + if err := until(ctx, func(ctx context.Context) error { + if err := d.hostBootstrap.PutGarageBoostrapHost(ctx); err != nil { + d.logger.Error(ctx, "updating host info in garage", err) + return err + } + + return nil + }); err != nil { + return fmt.Errorf("updating host info in garage: %w", err) + } } return nil } -// LoadConfig loads the daemon config from userConfigPath, merges it with -// the default found in the appDirPath, and returns the result. -// -// If userConfigPath is not given then the default is loaded and returned. -func LoadConfig( - appDirPath, userConfigPath string, -) ( - Config, error, -) { - - defaultConfigPath := defaultConfigPath(appDirPath) - - var fullDaemon map[string]interface{} - - if err := yamlutil.LoadYamlFile(&fullDaemon, defaultConfigPath); err != nil { - return Config{}, fmt.Errorf("parsing default daemon config file: %w", err) +func (d *daemon) Shutdown(ctx context.Context) error { + d.pmuxCancelFn() + select { + case <-ctx.Done(): + return ctx.Err() + case <-d.pmuxStoppedCh: + return nil } - - if userConfigPath != "" { - - var daemonConfig map[string]interface{} - if err := yamlutil.LoadYamlFile(&daemonConfig, userConfigPath); err != nil { - return Config{}, fmt.Errorf("parsing %q: %w", userConfigPath, err) - } - - err := mergo.Merge(&fullDaemon, daemonConfig, mergo.WithOverride) - if err != nil { - return Config{}, fmt.Errorf("merging contents of file %q: %w", userConfigPath, err) - } - } - - fullDaemonB, err := yaml.Marshal(fullDaemon) - - if err != nil { - return Config{}, fmt.Errorf("yaml marshaling: %w", err) - } - - var config Config - if err := yaml.Unmarshal(fullDaemonB, &config); err != nil { - return Config{}, fmt.Errorf("yaml unmarshaling back into Config struct: %w", err) - } - - config.fillDefaults() - - return config, nil } diff --git a/go/daemon/jigs.go b/go/daemon/jigs.go new file mode 100644 index 0000000..e9dafb1 --- /dev/null +++ b/go/daemon/jigs.go @@ -0,0 +1,18 @@ +package daemon + +import ( + "context" + "time" +) + +func until(ctx context.Context, fn func(context.Context) error) error { + for { + if err := fn(ctx); err == nil { + return nil + } else if ctxErr := ctx.Err(); ctxErr != nil { + return ctxErr + } + + time.Sleep(1 * time.Second) + } +}