Compare commits

...

2 Commits

17 changed files with 613 additions and 379 deletions

View File

@ -93,12 +93,18 @@ func New(
}, nil }, nil
} }
// FromReader reads a bootstrap file from the given io.Reader. // FromFile reads a bootstrap from a file at the given path. The HostAssigned
func FromReader(r io.Reader) (Bootstrap, error) { // field will automatically be unwrapped.
func FromFile(path string) (Bootstrap, error) {
f, err := os.Open(path)
if err != nil {
return Bootstrap{}, fmt.Errorf("opening file: %w", err)
}
defer f.Close()
var b Bootstrap var b Bootstrap
err := json.NewDecoder(r).Decode(&b) if err := json.NewDecoder(f).Decode(&b); err != nil {
if err != nil {
return Bootstrap{}, fmt.Errorf("decoding json: %w", err) return Bootstrap{}, fmt.Errorf("decoding json: %w", err)
} }
@ -106,19 +112,7 @@ func FromReader(r io.Reader) (Bootstrap, error) {
return Bootstrap{}, fmt.Errorf("unwrapping host assigned: %w", err) return Bootstrap{}, fmt.Errorf("unwrapping host assigned: %w", err)
} }
return b, err return b, nil
}
// FromFile reads a bootstrap from a file at the given path.
func FromFile(path string) (Bootstrap, error) {
f, err := os.Open(path)
if err != nil {
return Bootstrap{}, fmt.Errorf("opening file: %w", err)
}
defer f.Close()
return FromReader(f)
} }
// WriteTo writes the Bootstrap as a new bootstrap to the given io.Writer. // WriteTo writes the Bootstrap as a new bootstrap to the given io.Writer.

View File

@ -171,9 +171,7 @@ var subCmdAdminCreateNetwork = subCmd{
logger.WithNamespace("daemon"), logger.WithNamespace("daemon"),
daemonConfig, daemonConfig,
hostBootstrap, hostBootstrap,
envRuntimeDirPath,
envBinDirPath, envBinDirPath,
envStateDirPath,
&daemon.Opts{ &daemon.Opts{
// SkipHostBootstrapPush is required, because the global bucket // SkipHostBootstrapPush is required, because the global bucket
// hasn't actually been initialized yet, so there's nowhere to // hasn't actually been initialized yet, so there's nowhere to
@ -201,7 +199,7 @@ var subCmdAdminCreateNetwork = subCmd{
return fmt.Errorf("initializing garage shared global bucket: %w", err) return fmt.Errorf("initializing garage shared global bucket: %w", err)
} }
if err := daemonInst.Shutdown(ctx); err != nil { if err := daemonInst.Shutdown(); err != nil {
return fmt.Errorf("shutting down daemon: %w (this can mean there are zombie children leftover)", err) return fmt.Errorf("shutting down daemon: %w (this can mean there are zombie children leftover)", err)
} }

View File

@ -11,7 +11,7 @@ import (
func loadHostBootstrap() (bootstrap.Bootstrap, error) { func loadHostBootstrap() (bootstrap.Bootstrap, error) {
stateDirPath := bootstrap.StateDirPath(envStateDirPath) stateDirPath := bootstrap.StateDirPath(daemonEnvVars.StateDirPath)
hostBootstrap, err := bootstrap.FromFile(stateDirPath) hostBootstrap, err := bootstrap.FromFile(stateDirPath)
if errors.Is(err, fs.ErrNotExist) { if errors.Is(err, fs.ErrNotExist) {
@ -29,7 +29,7 @@ func loadHostBootstrap() (bootstrap.Bootstrap, error) {
func writeBootstrapToStateDir(hostBootstrap bootstrap.Bootstrap) error { func writeBootstrapToStateDir(hostBootstrap bootstrap.Bootstrap) error {
path := bootstrap.StateDirPath(envStateDirPath) path := bootstrap.StateDirPath(daemonEnvVars.StateDirPath)
dirPath := filepath.Dir(path) dirPath := filepath.Dir(path)
if err := os.MkdirAll(dirPath, 0700); err != nil { if err := os.MkdirAll(dirPath, 0700); err != nil {

View File

@ -1,13 +1,11 @@
package main package main
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io/fs" "io/fs"
"os" "os"
"time"
"isle/bootstrap" "isle/bootstrap"
"isle/daemon" "isle/daemon"
@ -16,153 +14,6 @@ import (
"dev.mediocregopher.com/mediocre-go-lib.git/mlog" "dev.mediocregopher.com/mediocre-go-lib.git/mlog"
) )
// The daemon sub-command deals with starting an actual isle daemon
// process, which is required to be running for most other Isle
// functionality. The sub-command does the following:
//
// * Creates and locks the runtime directory.
//
// * Creates the data directory and copies the appdir bootstrap file into there,
// if it's not already there.
//
// * Merges daemon configuration into the bootstrap configuration, and rewrites
// the bootstrap file.
//
// * Sets up environment variables that all other sub-processes then use, based
// on the runtime dir.
//
// * Dynamically creates the root pmux config and runs pmux.
//
// * (On exit) cleans up the runtime directory.
// creates a new bootstrap file using available information from the network. If
// the new bootstrap file is different than the existing one, the existing one
// is overwritten and true is returned.
func reloadBootstrap(
ctx context.Context,
logger *mlog.Logger,
daemonInst daemon.Daemon,
hostBootstrap bootstrap.Bootstrap,
) (
bootstrap.Bootstrap, bool, error,
) {
thisHost := hostBootstrap.ThisHost()
newHosts, err := daemonInst.GetGarageBootstrapHosts(ctx)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("getting hosts from garage: %w", err)
}
// the daemon's view of this host's bootstrap info takes precedence over
// whatever is in garage
newHosts[thisHost.Name] = thisHost
newHostsHash, err := bootstrap.HostsHash(newHosts)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of new hosts: %w", err)
}
currHostsHash, err := bootstrap.HostsHash(hostBootstrap.Hosts)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of current hosts: %w", err)
}
if bytes.Equal(newHostsHash, currHostsHash) {
return hostBootstrap, false, nil
}
hostBootstrap.Hosts = newHosts
if err := writeBootstrapToStateDir(hostBootstrap); err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("writing new bootstrap to data dir: %w", err)
}
return hostBootstrap, true, nil
}
// runs a single pmux process of daemon, returning only once the env.Context has
// been canceled or bootstrap info has been changed. This will always block
// until the spawned pmux has returned, and returns a copy of hostBootstrap with
// updated boostrap info.
func runDaemonPmuxOnce(
ctx context.Context,
logger *mlog.Logger,
hostBootstrap bootstrap.Bootstrap,
daemonConfig daemon.Config,
) (
bootstrap.Bootstrap, error,
) {
daemonInst, err := daemon.New(
ctx,
logger.WithNamespace("daemon"),
daemonConfig,
hostBootstrap,
envRuntimeDirPath,
envBinDirPath,
envStateDirPath,
nil,
)
if err != nil {
return bootstrap.Bootstrap{}, fmt.Errorf("initializing daemon: %w", err)
}
defer func() {
// context.Background() is deliberate here. At this point the entire
// process is shutting down, so whatever owns the process should decide
// when it's been too long.
if err := daemonInst.Shutdown(context.Background()); err != nil {
logger.Error(ctx, "failed to cleanly shutdown daemon", err)
}
}()
{
logger := logger.WithNamespace("http")
httpSrv, err := newHTTPServer(
ctx, logger, daemon.NewRPC(daemonInst),
)
if err != nil {
return bootstrap.Bootstrap{}, fmt.Errorf("starting HTTP server: %w", err)
}
defer func() {
// see comment in daemonInst shutdown logic regarding background
// context.
if err := httpSrv.Shutdown(context.Background()); err != nil {
logger.Error(ctx, "Failed to cleanly shutdown http server", err)
}
}()
}
ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return bootstrap.Bootstrap{}, ctx.Err()
case <-ticker.C:
logger.Info(ctx, "checking for changes to bootstrap")
var (
changed bool
err error
)
if hostBootstrap, changed, err = reloadBootstrap(
ctx, logger, daemonInst, hostBootstrap,
); err != nil {
return bootstrap.Bootstrap{}, fmt.Errorf("reloading bootstrap: %w", err)
} else if changed {
fmt.Fprintln(os.Stderr, "bootstrap info has changed, restarting all processes")
return hostBootstrap, nil
}
}
}
}
var subCmdDaemon = subCmd{ var subCmdDaemon = subCmd{
name: "daemon", name: "daemon",
descr: "Runs the isle daemon (Default if no sub-command given)", descr: "Runs the isle daemon (Default if no sub-command given)",
@ -214,7 +65,7 @@ var subCmdDaemon = subCmd{
defer runtimeDirCleanup() defer runtimeDirCleanup()
var ( var (
bootstrapStateDirPath = bootstrap.StateDirPath(envStateDirPath) bootstrapStateDirPath = bootstrap.StateDirPath(daemonEnvVars.StateDirPath)
bootstrapAppDirPath = bootstrap.AppDirPath(envAppDirPath) bootstrapAppDirPath = bootstrap.AppDirPath(envAppDirPath)
hostBootstrapPath string hostBootstrapPath string
@ -276,16 +127,36 @@ var subCmdDaemon = subCmd{
return fmt.Errorf("merging daemon config into bootstrap data: %w", err) return fmt.Errorf("merging daemon config into bootstrap data: %w", err)
} }
for { daemonInst := daemon.NewDaemonRestarter(
logger, daemonConfig, envBinDirPath, hostBootstrap, nil,
hostBootstrap, err = runDaemonPmuxOnce(ctx, logger, hostBootstrap, daemonConfig) )
defer func() {
if errors.Is(err, context.Canceled) { logger.Info(ctx, "Stopping child processes")
return nil if err := daemonInst.Shutdown(); err != nil {
logger.Error(ctx, "Shutting down daemon cleanly failed, there may be orphaned child processes", err)
} else if err != nil {
return fmt.Errorf("running pmux for daemon: %w", err)
} }
logger.Info(ctx, "Child processes successfully stopped")
}()
{
logger := logger.WithNamespace("http")
httpSrv, err := newHTTPServer(
ctx, logger, daemon.NewRPC(daemonInst),
)
if err != nil {
return fmt.Errorf("starting HTTP server: %w", err)
}
defer func() {
// see comment in daemonInst shutdown logic regarding background
// context.
logger.Info(ctx, "Shutting down HTTP socket")
if err := httpSrv.Shutdown(context.Background()); err != nil {
logger.Error(ctx, "Failed to cleanly shutdown http server", err)
}
}()
} }
<-ctx.Done()
return nil
}, },
} }

View File

@ -66,10 +66,12 @@ func newHTTPServer(
) ( ) (
*http.Server, error, *http.Server, error,
) { ) {
l, err := net.Listen("unix", envSocketPath) l, err := net.Listen("unix", daemonEnvVars.HTTPSocketPath)
if err != nil { if err != nil {
return nil, fmt.Errorf( return nil, fmt.Errorf(
"failed to listen on socket %q: %w", envSocketPath, err, "failed to listen on socket %q: %w",
daemonEnvVars.HTTPSocketPath,
err,
) )
} }

View File

@ -13,7 +13,7 @@ import (
// order to prevent it from doing so. // order to prevent it from doing so.
func initMCConfigDir() (string, error) { func initMCConfigDir() (string, error) {
var ( var (
path = filepath.Join(envStateDirPath, "mc") path = filepath.Join(daemonEnvVars.StateDirPath, "mc")
sharePath = filepath.Join(path, "share") sharePath = filepath.Join(path, "share")
configJSONPath = filepath.Join(path, "config.json") configJSONPath = filepath.Join(path, "config.json")
) )

View File

@ -1,47 +0,0 @@
package main
import (
"errors"
"fmt"
"io/fs"
"os"
"slices"
"strings"
)
func envOr(name string, fallback func() string) string {
if v := os.Getenv(name); v != "" {
return v
}
return fallback()
}
func firstExistingDir(paths ...string) (string, error) {
var errs []error
for _, path := range paths {
stat, err := os.Stat(path)
switch {
case errors.Is(err, fs.ErrExist):
continue
case err != nil:
errs = append(
errs, fmt.Errorf("checking if path %q exists: %w", path, err),
)
case !stat.IsDir():
errs = append(
errs, fmt.Errorf("path %q exists but is not a directory", path),
)
default:
return path, nil
}
}
err := fmt.Errorf(
"no directory found at any of the following paths: %s",
strings.Join(paths, ", "),
)
if len(errs) > 0 {
err = errors.Join(slices.Insert(errs, 0, err)...)
}
return "", err
}

View File

@ -2,7 +2,7 @@ package main
import ( import (
"context" "context"
"fmt" "isle/daemon"
"os" "os"
"os/signal" "os/signal"
"path/filepath" "path/filepath"
@ -10,14 +10,8 @@ import (
"dev.mediocregopher.com/mediocre-go-lib.git/mctx" "dev.mediocregopher.com/mediocre-go-lib.git/mctx"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog" "dev.mediocregopher.com/mediocre-go-lib.git/mlog"
"github.com/adrg/xdg"
) )
// The purpose of this binary is to act as the entrypoint of the isle
// process. It processes the command-line arguments which are passed in, and
// then passes execution along to an appropriate binary housed in AppDir/bin
// (usually a bash script, which is more versatile than a go program).
func getAppDirPath() string { func getAppDirPath() string {
appDirPath := os.Getenv("APPDIR") appDirPath := os.Getenv("APPDIR")
if appDirPath == "" { if appDirPath == "" {
@ -26,39 +20,10 @@ func getAppDirPath() string {
return appDirPath return appDirPath
} }
func getRPCSocketDirPath() string {
path, err := firstExistingDir(
"/run",
"/var/run",
"/tmp",
"/dev/shm",
)
if err != nil {
panic(fmt.Sprintf("Failed to find directory for RPC socket: %v", err))
}
return path
}
// RUNTIME_DIRECTORY/STATE_DIRECTORY are used by the systemd service in
// conjunction with the RuntimeDirectory/StateDirectory directives.
var ( var (
envAppDirPath = getAppDirPath() daemonEnvVars = daemon.GetEnvVars()
envRuntimeDirPath = envOr( envAppDirPath = getAppDirPath()
"RUNTIME_DIRECTORY",
func() string { return filepath.Join(xdg.RuntimeDir, "isle") },
)
envStateDirPath = envOr(
"STATE_DIRECTORY",
func() string { return filepath.Join(xdg.StateHome, "isle") },
)
envBinDirPath = filepath.Join(envAppDirPath, "bin") envBinDirPath = filepath.Join(envAppDirPath, "bin")
envSocketPath = envOr(
"ISLE_SOCKET_PATH",
func() string {
return filepath.Join(getRPCSocketDirPath(), "isle-daemon.sock")
},
)
) )
func binPath(name string) string { func binPath(name string) string {

View File

@ -16,7 +16,7 @@ import (
var errDaemonNotRunning = errors.New("no isle daemon process running") var errDaemonNotRunning = errors.New("no isle daemon process running")
func lockFilePath() string { func lockFilePath() string {
return filepath.Join(envRuntimeDirPath, "lock") return filepath.Join(daemonEnvVars.RuntimeDirPath, "lock")
} }
func writeLock() error { func writeLock() error {
@ -49,11 +49,11 @@ func writeLock() error {
// returns a cleanup function which will clean up the created runtime directory. // returns a cleanup function which will clean up the created runtime directory.
func setupAndLockRuntimeDir(ctx context.Context, logger *mlog.Logger) (func(), error) { func setupAndLockRuntimeDir(ctx context.Context, logger *mlog.Logger) (func(), error) {
ctx = mctx.Annotate(ctx, "runtimeDirPath", envRuntimeDirPath) ctx = mctx.Annotate(ctx, "runtimeDirPath", daemonEnvVars.RuntimeDirPath)
logger.Info(ctx, "will use runtime directory for temporary state") logger.Info(ctx, "will use runtime directory for temporary state")
if err := os.MkdirAll(envRuntimeDirPath, 0700); err != nil { if err := os.MkdirAll(daemonEnvVars.RuntimeDirPath, 0700); err != nil {
return nil, fmt.Errorf("creating directory %q: %w", envRuntimeDirPath, err) return nil, fmt.Errorf("creating directory %q: %w", daemonEnvVars.RuntimeDirPath, err)
} else if err := writeLock(); err != nil { } else if err := writeLock(); err != nil {
return nil, err return nil, err
@ -61,7 +61,7 @@ func setupAndLockRuntimeDir(ctx context.Context, logger *mlog.Logger) (func(), e
return func() { return func() {
logger.Info(ctx, "cleaning up runtime directory") logger.Info(ctx, "cleaning up runtime directory")
if err := os.RemoveAll(envRuntimeDirPath); err != nil { if err := os.RemoveAll(daemonEnvVars.RuntimeDirPath); err != nil {
logger.Error(ctx, "removing temporary directory", err) logger.Error(ctx, "removing temporary directory", err)
} }
}, nil }, nil

View File

@ -110,7 +110,7 @@ func (ctx subCmdCtx) doSubCmd(subCmds ...subCmd) error {
} }
daemonRCPClient := jsonrpc2.NewUnixHTTPClient( daemonRCPClient := jsonrpc2.NewUnixHTTPClient(
envSocketPath, daemonHTTPRPCPath, daemonEnvVars.HTTPSocketPath, daemonHTTPRPCPath,
) )
err := subCmd.do(subCmdCtx{ err := subCmd.do(subCmdCtx{

50
go/daemon/bootstrap.go Normal file
View File

@ -0,0 +1,50 @@
package daemon
import (
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"isle/bootstrap"
)
func loadHostBootstrap(stateDirPath string) (bootstrap.Bootstrap, error) {
path := bootstrap.StateDirPath(stateDirPath)
hostBootstrap, err := bootstrap.FromFile(path)
if errors.Is(err, fs.ErrNotExist) {
return bootstrap.Bootstrap{}, fmt.Errorf(
"%q not found, has the daemon ever been run?",
stateDirPath,
)
} else if err != nil {
return bootstrap.Bootstrap{}, fmt.Errorf("loading %q: %w", stateDirPath, err)
}
return hostBootstrap, nil
}
func writeBootstrapToStateDir(
stateDirPath string, hostBootstrap bootstrap.Bootstrap,
) error {
var (
path = bootstrap.StateDirPath(stateDirPath)
dirPath = filepath.Dir(path)
)
if err := os.MkdirAll(dirPath, 0700); err != nil {
return fmt.Errorf("creating directory %q: %w", dirPath, err)
}
f, err := os.Create(path)
if err != nil {
return fmt.Errorf("creating file %q: %w", path, err)
}
defer f.Close()
return hostBootstrap.WriteTo(f)
}

95
go/daemon/child_pmux.go Normal file
View File

@ -0,0 +1,95 @@
package daemon
import (
"context"
"fmt"
"code.betamike.com/micropelago/pmux/pmuxlib"
)
func (d *daemon) newPmuxConfig() (pmuxlib.Config, error) {
nebulaPmuxProcConfig, err := nebulaPmuxProcConfig(
d.opts.EnvVars.RuntimeDirPath,
d.binDirPath,
d.hostBootstrap,
d.config,
)
if err != nil {
return pmuxlib.Config{}, fmt.Errorf("generating nebula config: %w", err)
}
dnsmasqPmuxProcConfig, err := dnsmasqPmuxProcConfig(
d.opts.EnvVars.RuntimeDirPath,
d.binDirPath,
d.hostBootstrap,
d.config,
)
if err != nil {
return pmuxlib.Config{}, fmt.Errorf(
"generating dnsmasq config: %w", err,
)
}
garagePmuxProcConfigs, err := garagePmuxProcConfigs(
d.opts.EnvVars.RuntimeDirPath, d.binDirPath, d.hostBootstrap, d.config,
)
if err != nil {
return pmuxlib.Config{}, fmt.Errorf(
"generating garage children configs: %w", err,
)
}
return pmuxlib.Config{
Processes: append(
[]pmuxlib.ProcessConfig{
nebulaPmuxProcConfig,
dnsmasqPmuxProcConfig,
},
garagePmuxProcConfigs...,
),
}, nil
}
func (d *daemon) postPmuxInit(ctx context.Context) error {
d.logger.Info(ctx, "waiting for nebula VPN to come online")
if err := waitForNebula(ctx, d.hostBootstrap); err != nil {
return fmt.Errorf("waiting for nebula to start: %w", err)
}
d.logger.Info(ctx, "waiting for garage instances to come online")
if err := d.waitForGarage(ctx); err != nil {
return fmt.Errorf("waiting for garage to start: %w", err)
}
if len(d.config.Storage.Allocations) > 0 {
err := until(ctx, func(ctx context.Context) error {
err := garageApplyLayout(ctx, d.logger, d.hostBootstrap, d.config)
if err != nil {
d.logger.Error(ctx, "applying garage layout", err)
return err
}
return nil
})
if err != nil {
return fmt.Errorf("applying garage layout: %w", err)
}
}
if !d.opts.SkipHostBootstrapPush {
if err := until(ctx, func(ctx context.Context) error {
if err := d.putGarageBoostrapHost(ctx); err != nil {
d.logger.Error(ctx, "updating host info in garage", err)
return err
}
return nil
}); err != nil {
return fmt.Errorf("updating host info in garage: %w", err)
}
}
return nil
}

View File

@ -8,23 +8,11 @@ import (
"io" "io"
"isle/bootstrap" "isle/bootstrap"
"os" "os"
"time"
"code.betamike.com/micropelago/pmux/pmuxlib" "code.betamike.com/micropelago/pmux/pmuxlib"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog" "dev.mediocregopher.com/mediocre-go-lib.git/mlog"
) )
type daemon struct {
logger *mlog.Logger
config Config
hostBootstrap bootstrap.Bootstrap
stateDirPath string
opts Opts
pmuxCancelFn context.CancelFunc
pmuxStoppedCh chan struct{}
}
// Daemon presents all functionality required for client frontends to interact // Daemon presents all functionality required for client frontends to interact
// with isle, typically via the unix socket. // with isle, typically via the unix socket.
type Daemon interface { type Daemon interface {
@ -38,12 +26,11 @@ type Daemon interface {
) )
// Shutdown blocks until all resources held or created by the daemon, // Shutdown blocks until all resources held or created by the daemon,
// including child processes it has started, have been cleaned up, or until // including child processes it has started, have been cleaned up.
// the context is canceled.
// //
// If this returns an error then it's possible that child processes are // If this returns an error then it's possible that child processes are
// still running and are no longer managed. // still running and are no longer managed.
Shutdown(context.Context) error Shutdown() error
} }
// Opts are optional parameters which can be passed in when initializing a new // Opts are optional parameters which can be passed in when initializing a new
@ -56,6 +43,8 @@ type Opts struct {
// Stdout and Stderr are what the associated outputs from child processes // Stdout and Stderr are what the associated outputs from child processes
// will be directed to. // will be directed to.
Stdout, Stderr io.Writer Stdout, Stderr io.Writer
EnvVars EnvVars // Defaults to that returned by GetEnvVars.
} }
func (o *Opts) withDefaults() *Opts { func (o *Opts) withDefaults() *Opts {
@ -71,9 +60,24 @@ func (o *Opts) withDefaults() *Opts {
o.Stderr = os.Stderr o.Stderr = os.Stderr
} }
if o.EnvVars == (EnvVars{}) {
o.EnvVars = GetEnvVars()
}
return o return o
} }
type daemon struct {
logger *mlog.Logger
config Config
hostBootstrap bootstrap.Bootstrap
binDirPath string
opts Opts
pmuxCancelFn context.CancelFunc
pmuxStoppedCh chan struct{}
}
// New initialized and returns a Daemon. If initialization fails an error is // New initialized and returns a Daemon. If initialization fails an error is
// returned. // returned.
func New( func New(
@ -81,66 +85,39 @@ func New(
logger *mlog.Logger, logger *mlog.Logger,
config Config, config Config,
hostBootstrap bootstrap.Bootstrap, hostBootstrap bootstrap.Bootstrap,
runtimeDirPath, binDirPath, stateDirPath string, binDirPath string,
opts *Opts, opts *Opts,
) ( ) (
Daemon, error, Daemon, error,
) { ) {
opts = opts.withDefaults() opts = opts.withDefaults()
nebulaPmuxProcConfig, err := nebulaPmuxProcConfig(
runtimeDirPath, binDirPath, hostBootstrap, config,
)
if err != nil {
return nil, fmt.Errorf("generating nebula config: %w", err)
}
dnsmasqPmuxProcConfig, err := dnsmasqPmuxProcConfig(
runtimeDirPath, binDirPath, hostBootstrap, config,
)
if err != nil {
return nil, fmt.Errorf("generating dnsmasq config: %w", err)
}
garagePmuxProcConfigs, err := garagePmuxProcConfigs(
runtimeDirPath, binDirPath, hostBootstrap, config,
)
if err != nil {
return nil, fmt.Errorf("generating garage children configs: %w", err)
}
pmuxConfig := pmuxlib.Config{
Processes: append(
[]pmuxlib.ProcessConfig{
nebulaPmuxProcConfig,
dnsmasqPmuxProcConfig,
},
garagePmuxProcConfigs...,
),
}
pmuxCtx, pmuxCancelFn := context.WithCancel(context.Background()) pmuxCtx, pmuxCancelFn := context.WithCancel(context.Background())
d := &daemon{ d := &daemon{
logger: logger, logger: logger,
config: config, config: config,
hostBootstrap: hostBootstrap, hostBootstrap: hostBootstrap,
stateDirPath: stateDirPath, binDirPath: binDirPath,
opts: *opts, opts: *opts,
pmuxCancelFn: pmuxCancelFn, pmuxCancelFn: pmuxCancelFn,
pmuxStoppedCh: make(chan struct{}), pmuxStoppedCh: make(chan struct{}),
} }
pmuxConfig, err := d.newPmuxConfig()
if err != nil {
return nil, fmt.Errorf("generating pmux config: %w", err)
}
go func() { go func() {
defer close(d.pmuxStoppedCh) defer close(d.pmuxStoppedCh)
pmuxlib.Run(pmuxCtx, d.opts.Stdout, d.opts.Stderr, pmuxConfig) pmuxlib.Run(pmuxCtx, d.opts.Stdout, d.opts.Stderr, pmuxConfig)
d.logger.Debug(pmuxCtx, "pmux stopped")
}() }()
if initErr := d.postPmuxInit(ctx); initErr != nil { if initErr := d.postPmuxInit(ctx); initErr != nil {
logger.Warn(ctx, "failed to initialize daemon, shutting down child processes", err) logger.Warn(ctx, "failed to initialize daemon, shutting down child processes", err)
shutdownCtx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) if err := d.Shutdown(); err != nil {
defer cancel()
if err := d.Shutdown(shutdownCtx); err != nil {
panic(fmt.Sprintf( panic(fmt.Sprintf(
"failed to shut down child processes after initialization"+ "failed to shut down child processes after initialization"+
" error, there may be zombie children leftover."+ " error, there may be zombie children leftover."+
@ -155,56 +132,8 @@ func New(
return d, nil return d, nil
} }
func (d *daemon) postPmuxInit(ctx context.Context) error { func (d *daemon) Shutdown() error {
d.logger.Info(ctx, "waiting for nebula VPN to come online") d.pmuxCancelFn()
if err := waitForNebula(ctx, d.hostBootstrap); err != nil { <-d.pmuxStoppedCh
return fmt.Errorf("waiting for nebula to start: %w", err)
}
d.logger.Info(ctx, "waiting for garage instances to come online")
if err := d.waitForGarage(ctx); err != nil {
return fmt.Errorf("waiting for garage to start: %w", err)
}
if len(d.config.Storage.Allocations) > 0 {
err := until(ctx, func(ctx context.Context) error {
err := garageApplyLayout(ctx, d.logger, d.hostBootstrap, d.config)
if err != nil {
d.logger.Error(ctx, "applying garage layout", err)
return err
}
return nil
})
if err != nil {
return fmt.Errorf("applying garage layout: %w", err)
}
}
if !d.opts.SkipHostBootstrapPush {
if err := until(ctx, func(ctx context.Context) error {
if err := d.putGarageBoostrapHost(ctx); err != nil {
d.logger.Error(ctx, "updating host info in garage", err)
return err
}
return nil
}); err != nil {
return fmt.Errorf("updating host info in garage: %w", err)
}
}
return nil return nil
} }
func (d *daemon) Shutdown(ctx context.Context) error {
d.pmuxCancelFn()
select {
case <-ctx.Done():
return ctx.Err()
case <-d.pmuxStoppedCh:
return nil
}
}

View File

@ -0,0 +1,262 @@
package daemon
import (
"bytes"
"context"
"errors"
"fmt"
"isle/bootstrap"
"sync"
"time"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
)
const (
daemonRestarterStateInitializing = iota
daemonRestarterStateOk
daemonRestarterStateRestarting
daemonRestarterStateShutdown
)
type daemonRestarter struct {
logger *mlog.Logger
daemonConfig Config
envBinDirPath string
opts *Opts
l sync.Mutex
state int
inner Daemon
currBootstrap bootstrap.Bootstrap
cancelFn context.CancelFunc
stoppedCh chan struct{}
}
// NewDaemonRestarter will wrap a Daemon such that it will be automatically
// shutdown and re-created whenever there's changes in the cluster which require
// the configuration to be changed (e.g. a new nebula lighthouse).
//
// While still starting up the daemon for the first time all methods will return
// ErrInitializing, except Shutdown which will block until initialization is
// canceled.
//
// During a restart all methods will return ErrRestarting, except Shutdown which
// will block until the currently executing restart is finished and then
// shutdown cleanly from there.
//
// TODO make daemonRestarter smarter, it currently restarts on _any_ change, but
// it should restart itself only when there's something actually requiring a
// restart.
func NewDaemonRestarter(
logger *mlog.Logger,
daemonConfig Config,
envBinDirPath string,
currBootstrap bootstrap.Bootstrap,
opts *Opts,
) Daemon {
ctx, cancelFn := context.WithCancel(context.Background())
dr := &daemonRestarter{
logger: logger,
daemonConfig: daemonConfig,
envBinDirPath: envBinDirPath,
opts: opts.withDefaults(),
currBootstrap: currBootstrap,
cancelFn: cancelFn,
stoppedCh: make(chan struct{}),
}
go func() {
dr.restartLoop(ctx)
dr.logger.Debug(ctx, "DaemonRestarter stopped")
close(dr.stoppedCh)
}()
return dr
}
func withInnerDaemon[Res any](
dr *daemonRestarter, fn func(Daemon) (Res, error),
) (Res, error) {
var zero Res
dr.l.Lock()
inner, state := dr.inner, dr.state
dr.l.Unlock()
switch state {
case daemonRestarterStateInitializing:
return zero, ErrInitializing
case daemonRestarterStateOk:
return fn(inner)
case daemonRestarterStateRestarting:
return zero, ErrRestarting
case daemonRestarterStateShutdown:
return zero, errors.New("already shutdown")
default:
panic(fmt.Sprintf("unknown state %d", dr.state))
}
}
// creates a new bootstrap file using available information from the network. If
// the new bootstrap file is different than the existing one, the existing one
// is overwritten and true is returned.
func (dr *daemonRestarter) checkBootstrap(
ctx context.Context, hostBootstrap bootstrap.Bootstrap,
) (
bootstrap.Bootstrap, bool, error,
) {
thisHost := hostBootstrap.ThisHost()
newHosts, err := dr.inner.GetGarageBootstrapHosts(ctx)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("getting hosts from garage: %w", err)
}
// the daemon's view of this host's bootstrap info takes precedence over
// whatever is in garage
newHosts[thisHost.Name] = thisHost
newHostsHash, err := bootstrap.HostsHash(newHosts)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of new hosts: %w", err)
}
currHostsHash, err := bootstrap.HostsHash(hostBootstrap.Hosts)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of current hosts: %w", err)
}
if bytes.Equal(newHostsHash, currHostsHash) {
return hostBootstrap, false, nil
}
hostBootstrap.Hosts = newHosts
return hostBootstrap, true, nil
}
// blocks until a new bootstrap is available or context is canceled
func (dr *daemonRestarter) watchForChanges(ctx context.Context) bootstrap.Bootstrap {
ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return bootstrap.Bootstrap{}
case <-ticker.C:
dr.logger.Info(ctx, "Checking for changes to bootstrap")
newBootstrap, changed, err := dr.checkBootstrap(
ctx, dr.currBootstrap,
)
if err != nil {
dr.logger.Error(ctx, "Checking bootstrap for changes failed", err)
continue
} else if !changed {
continue
}
err = writeBootstrapToStateDir(dr.opts.EnvVars.StateDirPath, newBootstrap)
if err != nil {
dr.logger.Error(ctx, "Writing new bootstrap to disk failed", err)
continue
}
return newBootstrap
}
}
}
func (dr *daemonRestarter) restartLoop(ctx context.Context) {
defer func() {
dr.l.Lock()
dr.state = daemonRestarterStateShutdown
inner := dr.inner
dr.l.Unlock()
if inner != nil {
if err := inner.Shutdown(); err != nil {
dr.logger.Fatal(ctx, "failed to cleanly shutdown daemon, there may be orphaned child processes", err)
}
}
}()
wait := func(d time.Duration) bool {
select {
case <-ctx.Done():
return false
case <-time.After(d):
return true
}
}
for {
if ctx.Err() != nil {
return
}
dr.logger.Info(ctx, "Creating new daemon")
daemonInst, err := New(
ctx,
dr.logger.WithNamespace("daemon"),
dr.daemonConfig,
dr.currBootstrap,
dr.envBinDirPath,
dr.opts,
)
if errors.Is(err, context.Canceled) {
return
} else if err != nil {
dr.logger.Error(ctx, "failed to initialize daemon", err)
if !wait(1 * time.Second) {
return
}
continue
}
dr.l.Lock()
dr.inner = daemonInst
dr.state = daemonRestarterStateOk
dr.l.Unlock()
newBootstrap := dr.watchForChanges(ctx)
if ctx.Err() != nil {
return
}
dr.logger.Info(ctx, "Bootstrap has changed, will restart daemon")
dr.l.Lock()
dr.currBootstrap = newBootstrap
dr.state = daemonRestarterStateRestarting
dr.l.Unlock()
dr.logger.Info(ctx, "Shutting down previous daemon")
if err := dr.inner.Shutdown(); err != nil {
dr.logger.Fatal(ctx, "failed to cleanly shutdown daemon, there may be orphaned child processes", err)
}
}
}
func (dr *daemonRestarter) GetGarageBootstrapHosts(
ctx context.Context,
) (
map[string]bootstrap.Host, error,
) {
return withInnerDaemon(dr, func(inner Daemon) (map[string]bootstrap.Host, error) {
return inner.GetGarageBootstrapHosts(ctx)
})
}
func (dr *daemonRestarter) Shutdown() error {
dr.cancelFn()
<-dr.stoppedCh
return nil
}

102
go/daemon/env.go Normal file
View File

@ -0,0 +1,102 @@
package daemon
import (
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"slices"
"strings"
"sync"
"github.com/adrg/xdg"
)
// EnvVars are variables which are derived based on the environment which the
// process is running in.
type EnvVars struct {
RuntimeDirPath string // TODO should be private to this package
StateDirPath string // TODO should be private to this package
HTTPSocketPath string
}
func getRPCSocketDirPath() string {
path, err := firstExistingDir(
"/run",
"/var/run",
"/tmp",
"/dev/shm",
)
if err != nil {
panic(fmt.Sprintf("Failed to find directory for RPC socket: %v", err))
}
return path
}
// GetEnvVars will return the EnvVars of the current processes, as determined by
// the process's environment.
var GetEnvVars = sync.OnceValue(func() (v EnvVars) {
// RUNTIME_DIRECTORY/STATE_DIRECTORY are used by the systemd service in
// conjunction with the RuntimeDirectory/StateDirectory directives.
v.RuntimeDirPath = envOr(
"RUNTIME_DIRECTORY",
func() string { return filepath.Join(xdg.RuntimeDir, "isle") },
)
v.StateDirPath = envOr(
"STATE_DIRECTORY",
func() string { return filepath.Join(xdg.StateHome, "isle") },
)
v.HTTPSocketPath = envOr(
"ISLE_SOCKET_PATH",
func() string {
return filepath.Join(getRPCSocketDirPath(), "isle-daemon.sock")
},
)
return
})
////////////////////////////////////////////////////////////////////////////////
// Jigs
func envOr(name string, fallback func() string) string {
if v := os.Getenv(name); v != "" {
return v
}
return fallback()
}
func firstExistingDir(paths ...string) (string, error) {
var errs []error
for _, path := range paths {
stat, err := os.Stat(path)
switch {
case errors.Is(err, fs.ErrExist):
continue
case err != nil:
errs = append(
errs, fmt.Errorf("checking if path %q exists: %w", path, err),
)
case !stat.IsDir():
errs = append(
errs, fmt.Errorf("path %q exists but is not a directory", path),
)
default:
return path, nil
}
}
err := fmt.Errorf(
"no directory found at any of the following paths: %s",
strings.Join(paths, ", "),
)
if len(errs) > 0 {
err = errors.Join(slices.Insert(errs, 0, err)...)
}
return "", err
}

13
go/daemon/errors.go Normal file
View File

@ -0,0 +1,13 @@
package daemon
import "isle/daemon/jsonrpc2"
var (
// ErrInitializing is returned when a cluster is unavailable due to still
// being initialized.
ErrInitializing = jsonrpc2.NewError(1, "Cluster is being initialized")
// ErrRestarting is returned when a cluster is unavailable due to being
// restarted.
ErrRestarting = jsonrpc2.NewError(2, "Cluster is being restarted")
)

View File

@ -58,7 +58,7 @@ EOF
--name "testing" \ --name "testing" \
> admin.json > admin.json
isle daemon --config-path daemon.yml >daemon.log 2>&1 & isle daemon -l debug --config-path daemon.yml >daemon.log 2>&1 &
pid="$!" pid="$!"
echo "Waiting for primus daemon (process $pid) to initialize" echo "Waiting for primus daemon (process $pid) to initialize"
@ -82,7 +82,7 @@ EOF
device: isle-secondus device: isle-secondus
EOF EOF
isle daemon -c daemon.yml -b "$secondus_bootstrap" >daemon.log 2>&1 & isle daemon -l debug -c daemon.yml -b "$secondus_bootstrap" >daemon.log 2>&1 &
pid="$!" pid="$!"
echo "Waiting for secondus daemon (process $!) to initialize" echo "Waiting for secondus daemon (process $!) to initialize"