isle/go/daemon/daemon.go

798 lines
19 KiB
Go
Raw Normal View History

// Package daemon implements the isle daemon, which is a long-running service
// managing all isle background tasks and sub-processes for a single network.
package daemon
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/fs"
"isle/bootstrap"
"isle/jsonutil"
"isle/nebula"
"isle/secrets"
"net/netip"
"os"
"path/filepath"
"sync"
"time"
2024-06-22 15:49:56 +00:00
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
)
// CreateHostOpts are optional parameters to the CreateHost method.
type CreateHostOpts struct {
// CanCreateHosts indicates that the bootstrap produced by CreateHost should
// give the new host the ability to create new hosts as well.
CanCreateHosts bool
}
// CreateNebulaCertificateOpts are optional parameters to the
// CreateNebulaCertificate method.
type CreateNebulaCertificateOpts struct {
// IP, if given will be used for the host's IP in the created cert. If this
// is given then it is not required that the host have an entry in garage.
//
// TODO once `hosts create` automatically adds the host to garage this can
// be removed.
IP netip.Addr
}
// Daemon presents all functionality required for client frontends to interact
// with isle, typically via the unix socket.
type Daemon interface {
// CreateNetwork will initialize a new network using the given parameters.
// - name: Human-readable name of the network.
// - domain: Primary domain name that network services are served under.
// - ipNet: An IP subnet, in CIDR form, which will be the overall range of
// possible IPs in the network. The first IP in this network range will
// become this first host's IP.
// - hostName: The name of this first host in the network.
//
// The daemon on which this is called will become the first host in the
// network, and will have full administrative privileges.
CreateNetwork(
ctx context.Context, name, domain string,
ipNet nebula.IPNet,
hostName nebula.HostName,
) error
// JoinNetwork joins the Daemon to an existing network using the given
// Bootstrap.
//
// Errors:
// - ErrAlreadyJoined
JoinNetwork(context.Context, JoiningBootstrap) error
// GetBootstraps returns the currently active Bootstrap.
GetBootstrap(context.Context) (bootstrap.Bootstrap, error)
2024-07-12 14:03:37 +00:00
// GetGarageClientParams returns a GarageClientParams for the current
// network state.
GetGarageClientParams(context.Context) (GarageClientParams, error)
2024-07-12 15:05:39 +00:00
// RemoveHost removes the host of the given name from the network.
RemoveHost(context.Context, nebula.HostName) error
// CreateHost creates a bootstrap for a new host with the given name and IP
// address.
CreateHost(
ctx context.Context,
hostName nebula.HostName,
ip netip.Addr, // TODO automatically choose IP address
opts CreateHostOpts,
) (
JoiningBootstrap, error,
)
// CreateNebulaCertificate creates and signs a new nebula certficate for an
// existing host, given the public key for that host. This is currently
// mostly useful for creating certs for mobile devices.
//
// Errors:
// - ErrHostNotFound
CreateNebulaCertificate(
ctx context.Context,
hostName nebula.HostName,
hostPubKey nebula.EncryptingPublicKey,
opts CreateNebulaCertificateOpts,
) (
nebula.Certificate, error,
)
// Shutdown blocks until all resources held or created by the daemon,
// including child processes it has started, have been cleaned up.
//
// If this returns an error then it's possible that child processes are
// still running and are no longer managed.
Shutdown() error
}
// Opts are optional parameters which can be passed in when initializing a new
// Daemon instance. A nil Opts is equivalent to a zero value.
type Opts struct {
// Stdout and Stderr are what the associated outputs from child processes
// will be directed to.
Stdout, Stderr io.Writer
EnvVars EnvVars // Defaults to that returned by GetEnvVars.
}
func (o *Opts) withDefaults() *Opts {
if o == nil {
o = new(Opts)
}
if o.Stdout == nil {
o.Stdout = os.Stdout
}
if o.Stderr == nil {
o.Stderr = os.Stderr
}
if o.EnvVars == (EnvVars{}) {
o.EnvVars = GetEnvVars()
}
return o
}
const (
daemonStateNoNetwork = iota
daemonStateInitializing
daemonStateOk
daemonStateRestarting
daemonStateShutdown
)
type daemon struct {
logger *mlog.Logger
daemonConfig Config
envBinDirPath string
opts *Opts
secretsStore secrets.Store
garageAdminToken string
l sync.RWMutex
state int
children *Children
currBootstrap bootstrap.Bootstrap
shutdownCh chan struct{}
wg sync.WaitGroup
}
// NewDaemon initializes and returns a Daemon instance which will manage all
// child processes and state required by the isle service, as well as an HTTP
// socket over which RPC calls will be served.
//
// Inner Children instance(s) will be wrapped such that they will be
// automatically shutdown and re-created whenever there's changes in the network
// which require the configuration to be changed (e.g. a new nebula lighthouse).
// During such an inner restart all methods will return ErrRestarting, except
// Shutdown which will block until the currently executing restart is finished
// and then shutdown cleanly from there.
//
// While still starting up the Daemon for the first time all methods will return
// ErrInitializing, except Shutdown which will block until initialization is
// canceled.
//
// TODO make daemon smarter, it currently restarts on _any_ change, but
// it should restart itself only when there's something actually requiring a
// restart.
func NewDaemon(
logger *mlog.Logger, daemonConfig Config, envBinDirPath string, opts *Opts,
) (
Daemon, error,
) {
var (
d = &daemon{
logger: logger,
daemonConfig: daemonConfig,
envBinDirPath: envBinDirPath,
opts: opts.withDefaults(),
garageAdminToken: randStr(32),
shutdownCh: make(chan struct{}),
}
bootstrapFilePath = bootstrap.StateDirPath(d.opts.EnvVars.StateDirPath)
)
if err := d.opts.EnvVars.init(); err != nil {
return nil, fmt.Errorf("initializing daemon directories: %w", err)
}
var (
secretsPath = filepath.Join(d.opts.EnvVars.StateDirPath, "secrets")
err error
)
if d.secretsStore, err = secrets.NewFSStore(secretsPath); err != nil {
return nil, fmt.Errorf(
"initializing secrets store at %q: %w", secretsPath, err,
)
}
var currBootstrap bootstrap.Bootstrap
err = jsonutil.LoadFile(&currBootstrap, bootstrapFilePath)
if errors.Is(err, fs.ErrNotExist) {
// daemon has never had a network created or joined
} else if err != nil {
return nil, fmt.Errorf(
"loading bootstrap from %q: %w", bootstrapFilePath, err,
)
} else if err := d.initialize(currBootstrap, nil); err != nil {
return nil, fmt.Errorf("initializing with bootstrap: %w", err)
}
return d, nil
}
// initialize must be called with d.l write lock held, _but_ the lock should be
// released just after initialize returns.
//
// readyCh will be written to everytime daemon changes state to daemonStateOk,
// unless it is nil or blocks.
func (d *daemon) initialize(
currBootstrap bootstrap.Bootstrap, readyCh chan<- struct{},
) error {
// we update this Host's data using whatever configuration has been provided
// by the daemon config. This way the daemon has the most up-to-date
// possible bootstrap. This updated bootstrap will later get updated in
// garage as a background daemon task, so other hosts will see it as well.
currBootstrap, err := coalesceDaemonConfigAndBootstrap(
d.daemonConfig, currBootstrap,
)
if err != nil {
return fmt.Errorf("combining daemon configuration into bootstrap: %w", err)
}
err = writeBootstrapToStateDir(d.opts.EnvVars.StateDirPath, currBootstrap)
if err != nil {
return fmt.Errorf("writing bootstrap to state dir: %w", err)
}
d.currBootstrap = currBootstrap
d.state = daemonStateInitializing
ctx, cancel := context.WithCancel(context.Background())
d.wg.Add(1)
go func() {
defer d.wg.Done()
<-d.shutdownCh
cancel()
}()
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.restartLoop(ctx, readyCh)
d.logger.Debug(ctx, "Daemon restart loop stopped")
}()
return nil
}
func withCurrBootstrap[Res any](
d *daemon, fn func(bootstrap.Bootstrap) (Res, error),
) (Res, error) {
var zero Res
d.l.RLock()
defer d.l.RUnlock()
currBootstrap, state := d.currBootstrap, d.state
switch state {
case daemonStateNoNetwork:
return zero, ErrNoNetwork
case daemonStateInitializing:
return zero, ErrInitializing
case daemonStateOk:
return fn(currBootstrap)
case daemonStateRestarting:
return zero, ErrRestarting
case daemonStateShutdown:
return zero, errors.New("already shutdown")
default:
panic(fmt.Sprintf("unknown state %d", d.state))
}
}
// creates a new bootstrap file using available information from the network. If
// the new bootstrap file is different than the existing one, the existing one
// is overwritten and true is returned.
func (d *daemon) checkBootstrap(
ctx context.Context, hostBootstrap bootstrap.Bootstrap,
) (
bootstrap.Bootstrap, bool, error,
) {
thisHost := hostBootstrap.ThisHost()
newHosts, err := d.getGarageBootstrapHosts(ctx, d.logger, hostBootstrap)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("getting hosts from garage: %w", err)
}
// the daemon's view of this host's bootstrap info takes precedence over
// whatever is in garage
newHosts[thisHost.Name] = thisHost
newHostsHash, err := bootstrap.HostsHash(newHosts)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of new hosts: %w", err)
}
currHostsHash, err := bootstrap.HostsHash(hostBootstrap.Hosts)
if err != nil {
return bootstrap.Bootstrap{}, false, fmt.Errorf("calculating hash of current hosts: %w", err)
}
if bytes.Equal(newHostsHash, currHostsHash) {
return hostBootstrap, false, nil
}
hostBootstrap.Hosts = newHosts
return hostBootstrap, true, nil
}
// blocks until a new bootstrap is available or context is canceled
func (d *daemon) watchForChanges(ctx context.Context) bootstrap.Bootstrap {
ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return bootstrap.Bootstrap{}
case <-ticker.C:
d.logger.Info(ctx, "Checking for changes to bootstrap")
newBootstrap, changed, err := d.checkBootstrap(
ctx, d.currBootstrap,
)
if err != nil {
d.logger.Error(ctx, "Checking bootstrap for changes failed", err)
continue
} else if !changed {
continue
}
err = writeBootstrapToStateDir(d.opts.EnvVars.StateDirPath, newBootstrap)
if err != nil {
d.logger.Error(ctx, "Writing new bootstrap to disk failed", err)
continue
}
return newBootstrap
}
}
}
func (d *daemon) postInit(ctx context.Context) bool {
if len(d.daemonConfig.Storage.Allocations) > 0 {
if !until(
ctx,
d.logger,
"Applying garage layout",
func(ctx context.Context) error {
return garageApplyLayout(
ctx,
d.logger,
d.daemonConfig,
d.garageAdminToken,
d.currBootstrap,
)
},
) {
return false
}
}
// This is only necessary during network creation, otherwise the bootstrap
// should already have these credentials built in.
//
// TODO this is pretty hacky, but there doesn't seem to be a better way to
// manage it at the moment.
_, err := getGarageS3APIGlobalBucketCredentials(ctx, d.secretsStore)
if errors.Is(err, secrets.ErrNotFound) {
if !until(
ctx,
d.logger,
"Initializing garage shared global bucket",
func(ctx context.Context) error {
garageGlobalBucketCreds, err := garageInitializeGlobalBucket(
ctx,
d.logger,
d.daemonConfig,
d.garageAdminToken,
d.currBootstrap,
)
if err != nil {
return fmt.Errorf("initializing global bucket: %w", err)
}
err = setGarageS3APIGlobalBucketCredentials(
ctx, d.secretsStore, garageGlobalBucketCreds,
)
if err != nil {
return fmt.Errorf("storing global bucket creds: %w", err)
}
return nil
},
) {
return false
}
}
if !until(
ctx,
d.logger,
"Updating host info in garage",
func(ctx context.Context) error {
return d.putGarageBoostrapHost(ctx, d.logger, d.currBootstrap)
},
) {
return false
}
return true
}
func (d *daemon) restartLoop(ctx context.Context, readyCh chan<- struct{}) {
wait := func(d time.Duration) bool {
select {
case <-ctx.Done():
return false
case <-time.After(d):
return true
}
}
ready := func() {
select {
case readyCh <- struct{}{}:
default:
}
}
for {
if ctx.Err() != nil {
return
}
d.logger.Info(ctx, "Creating child processes")
children, err := NewChildren(
ctx,
d.logger.WithNamespace("children"),
d.envBinDirPath,
d.secretsStore,
d.daemonConfig,
d.garageAdminToken,
d.currBootstrap,
d.opts,
)
if errors.Is(err, context.Canceled) {
return
} else if err != nil {
d.logger.Error(ctx, "failed to initialize child processes", err)
if !wait(1 * time.Second) {
return
}
continue
}
d.logger.Info(ctx, "Child processes created")
d.l.Lock()
d.children = children
d.l.Unlock()
if !d.postInit(ctx) {
return
}
d.l.Lock()
d.state = daemonStateOk
d.l.Unlock()
ready()
newBootstrap := d.watchForChanges(ctx)
if ctx.Err() != nil {
return
}
d.logger.Info(ctx, "Bootstrap has changed, will restart daemon")
d.l.Lock()
d.currBootstrap = newBootstrap
d.state = daemonStateRestarting
d.l.Unlock()
d.logger.Info(ctx, "Shutting down previous child processes")
if err := d.children.Shutdown(); err != nil {
d.logger.Fatal(ctx, "Failed to cleanly shutdown children, there may be orphaned child processes", err)
}
// in case context was canceled while shutting the Children down, we
// don't want the Shutdown method to re-attempt calling Shutdown on
// it.
d.children = nil
}
}
func (d *daemon) CreateNetwork(
ctx context.Context,
name, domain string, ipNet nebula.IPNet, hostName nebula.HostName,
) error {
nebulaCACreds, err := nebula.NewCACredentials(domain, ipNet)
if err != nil {
return fmt.Errorf("creating nebula CA cert: %w", err)
}
var (
creationParams = bootstrap.CreationParams{
ID: randStr(32),
Name: name,
Domain: domain,
}
garageRPCSecret = randStr(32)
)
err = setGarageRPCSecret(ctx, d.secretsStore, garageRPCSecret)
if err != nil {
return fmt.Errorf("setting garage RPC secret: %w", err)
}
err = setNebulaCASigningPrivateKey(ctx, d.secretsStore, nebulaCACreds.SigningPrivateKey)
if err != nil {
return fmt.Errorf("setting nebula CA signing key secret: %w", err)
}
hostBootstrap, err := bootstrap.New(
nebulaCACreds,
creationParams,
hostName,
ipNet.FirstAddr(),
)
if err != nil {
return fmt.Errorf("initializing bootstrap data: %w", err)
}
d.l.Lock()
if d.state != daemonStateNoNetwork {
d.l.Unlock()
return ErrAlreadyJoined
}
if len(d.daemonConfig.Storage.Allocations) < 3 {
d.l.Unlock()
return ErrInvalidConfig.WithData(
"At least three storage allocations are required.",
)
}
readyCh := make(chan struct{}, 1)
err = d.initialize(hostBootstrap, readyCh)
d.l.Unlock()
if err != nil {
return fmt.Errorf("initializing daemon: %w", err)
}
select {
case <-readyCh:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (d *daemon) JoinNetwork(
ctx context.Context, newBootstrap JoiningBootstrap,
) error {
d.l.Lock()
if d.state != daemonStateNoNetwork {
d.l.Unlock()
return ErrAlreadyJoined
}
readyCh := make(chan struct{}, 1)
err := secrets.Import(ctx, d.secretsStore, newBootstrap.Secrets)
if err != nil {
d.l.Unlock()
return fmt.Errorf("importing secrets: %w", err)
}
err = d.initialize(newBootstrap.Bootstrap, readyCh)
d.l.Unlock()
if err != nil {
return fmt.Errorf("initializing daemon: %w", err)
}
select {
case <-readyCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (d *daemon) GetBootstrap(ctx context.Context) (bootstrap.Bootstrap, error) {
2024-07-12 14:03:37 +00:00
return withCurrBootstrap(d, func(
currBootstrap bootstrap.Bootstrap,
) (
bootstrap.Bootstrap, error,
2024-07-12 14:03:37 +00:00
) {
return currBootstrap, nil
2024-07-12 14:03:37 +00:00
})
}
func (d *daemon) GetGarageClientParams(
ctx context.Context,
) (
GarageClientParams, error,
) {
return withCurrBootstrap(d, func(
currBootstrap bootstrap.Bootstrap,
) (
GarageClientParams, error,
) {
return d.getGarageClientParams(ctx, currBootstrap)
})
}
2024-07-12 15:05:39 +00:00
func (d *daemon) RemoveHost(ctx context.Context, hostName nebula.HostName) error {
// TODO RemoveHost should publish a certificate revocation for the host
// being removed.
_, err := withCurrBootstrap(d, func(
currBootstrap bootstrap.Bootstrap,
) (
struct{}, error,
) {
garageClientParams, err := d.getGarageClientParams(ctx, currBootstrap)
if err != nil {
return struct{}{}, fmt.Errorf("get garage client params: %w", err)
}
client := garageClientParams.GlobalBucketS3APIClient()
2024-07-12 15:05:39 +00:00
return struct{}{}, removeGarageBootstrapHost(ctx, client, hostName)
})
return err
}
func makeCACreds(
currBootstrap bootstrap.Bootstrap,
caSigningPrivateKey nebula.SigningPrivateKey,
) nebula.CACredentials {
return nebula.CACredentials{
Public: currBootstrap.CAPublicCredentials,
SigningPrivateKey: caSigningPrivateKey,
}
}
func (d *daemon) CreateHost(
ctx context.Context,
hostName nebula.HostName,
ip netip.Addr,
opts CreateHostOpts,
) (
JoiningBootstrap, error,
) {
return withCurrBootstrap(d, func(
currBootstrap bootstrap.Bootstrap,
) (
JoiningBootstrap, error,
) {
caSigningPrivateKey, err := getNebulaCASigningPrivateKey(
ctx, d.secretsStore,
)
if err != nil {
return JoiningBootstrap{}, fmt.Errorf("getting CA signing key: %w", err)
}
var joiningBootstrap JoiningBootstrap
joiningBootstrap.Bootstrap, err = bootstrap.New(
makeCACreds(currBootstrap, caSigningPrivateKey),
currBootstrap.NetworkCreationParams,
hostName,
ip,
)
if err != nil {
return JoiningBootstrap{}, fmt.Errorf(
"initializing bootstrap data: %w", err,
)
}
joiningBootstrap.Bootstrap.Hosts = currBootstrap.Hosts
secretsIDs := []secrets.ID{
garageRPCSecretSecretID,
garageS3APIGlobalBucketCredentialsSecretID,
}
if opts.CanCreateHosts {
secretsIDs = append(secretsIDs, nebulaCASigningPrivateKeySecretID)
}
if joiningBootstrap.Secrets, err = secrets.Export(
ctx, d.secretsStore, secretsIDs,
); err != nil {
return JoiningBootstrap{}, fmt.Errorf("exporting secrets: %w", err)
}
// TODO persist new bootstrap to garage. Requires making the daemon
// config change watching logic smarter, so only dnsmasq gets restarted.
return joiningBootstrap, nil
})
}
func (d *daemon) CreateNebulaCertificate(
ctx context.Context,
hostName nebula.HostName,
hostPubKey nebula.EncryptingPublicKey,
opts CreateNebulaCertificateOpts,
) (
nebula.Certificate, error,
) {
return withCurrBootstrap(d, func(
currBootstrap bootstrap.Bootstrap,
) (
nebula.Certificate, error,
) {
ip := opts.IP
if ip == (netip.Addr{}) {
host, ok := currBootstrap.Hosts[hostName]
if !ok {
return nebula.Certificate{}, ErrHostNotFound
}
ip = host.IP()
}
caSigningPrivateKey, err := getNebulaCASigningPrivateKey(
ctx, d.secretsStore,
)
if err != nil {
return nebula.Certificate{}, fmt.Errorf("getting CA signing key: %w", err)
}
caCreds := makeCACreds(currBootstrap, caSigningPrivateKey)
return nebula.NewHostCert(caCreds, hostPubKey, hostName, ip)
})
}
func (d *daemon) Shutdown() error {
d.l.Lock()
defer d.l.Unlock()
close(d.shutdownCh)
d.wg.Wait()
d.state = daemonStateShutdown
if d.children != nil {
if err := d.children.Shutdown(); err != nil {
return fmt.Errorf("shutting down children: %w", err)
}
}
return nil
}