Factor daemon.Children into its own package

This commit is contained in:
Brian Picciano 2024-09-07 15:46:59 +02:00
parent a840d0e701
commit 86b2ba7bfa
12 changed files with 269 additions and 216 deletions

View File

@ -1,13 +1,10 @@
package daemon
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"reflect"
"isle/bootstrap"
"isle/daemon/daecommon"
@ -81,49 +78,3 @@ func coalesceDaemonConfigAndBootstrap(
return hostBootstrap, nil
}
type bootstrapDiff struct {
hostsChanged bool
nebulaChanged bool
dnsChanged bool
}
func calcBootstrapDiff(
daemonConfig daecommon.Config,
prevBootstrap, nextBootstrap bootstrap.Bootstrap,
) (
diff bootstrapDiff, err error,
) {
{
prevHash, prevErr := bootstrap.HostsHash(prevBootstrap.Hosts)
nextHash, nextErr := bootstrap.HostsHash(nextBootstrap.Hosts)
if err = errors.Join(prevErr, nextErr); err != nil {
err = fmt.Errorf("calculating host hashes: %w", err)
return
}
diff.hostsChanged = !bytes.Equal(prevHash, nextHash)
}
{
prevNebulaConfig, prevErr := nebulaConfig(daemonConfig, prevBootstrap)
nextNebulaConfig, nextErr := nebulaConfig(daemonConfig, nextBootstrap)
if err = errors.Join(prevErr, nextErr); err != nil {
err = fmt.Errorf("calculating nebula config: %w", err)
return
}
diff.nebulaChanged = !reflect.DeepEqual(
prevNebulaConfig, nextNebulaConfig,
)
}
{
diff.dnsChanged = !reflect.DeepEqual(
dnsmasqConfig(daemonConfig, prevBootstrap),
dnsmasqConfig(daemonConfig, nextBootstrap),
)
}
return
}

View File

@ -1,38 +1,69 @@
package daemon
// Package children manages the creation, lifetime, and shutdown of child
// processes created by the daemon.
package children
import (
"context"
"errors"
"fmt"
"isle/bootstrap"
"isle/daemon/daecommon"
"isle/secrets"
"io"
"os"
"code.betamike.com/micropelago/pmux/pmuxlib"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
"isle/bootstrap"
"isle/daemon/daecommon"
"isle/secrets"
)
// Opts are optional parameters which can be passed in when initializing a new
// Children instance. A nil Opts is equivalent to a zero value.
type Opts struct {
// Stdout and Stderr are what the associated outputs from child processes
// will be directed to.
Stdout, Stderr io.Writer
}
func (o *Opts) withDefaults() *Opts {
if o == nil {
o = new(Opts)
}
if o.Stdout == nil {
o.Stdout = os.Stdout
}
if o.Stderr == nil {
o.Stderr = os.Stderr
}
return o
}
// Children manages all child processes of a network. Child processes are
// comprised of:
// - nebula
// - dnsmasq
// - garage (0 or more, depending on configured storage allocations)
type Children struct {
logger *mlog.Logger
daemonConfig daecommon.Config
opts Opts
logger *mlog.Logger
daemonConfig daecommon.Config
runtimeDirPath string
opts Opts
pmux *pmuxlib.Pmux
}
// NewChildren initialized and returns a Children instance. If initialization
// fails an error is returned.
func NewChildren(
// New initializes and returns a Children instance. If initialization fails an
// error is returned.
func New(
ctx context.Context,
logger *mlog.Logger,
binDirPath string,
secretsStore secrets.Store,
daemonConfig daecommon.Config,
runtimeDirPath string,
garageAdminToken string,
hostBootstrap bootstrap.Bootstrap,
opts *Opts,
@ -48,9 +79,10 @@ func NewChildren(
}
c := &Children{
logger: logger,
daemonConfig: daemonConfig,
opts: *opts,
logger: logger,
daemonConfig: daemonConfig,
runtimeDirPath: runtimeDirPath,
opts: *opts,
}
pmuxConfig, err := c.newPmuxConfig(
@ -80,9 +112,12 @@ func NewChildren(
}
// RestartDNSMasq rewrites the dnsmasq config and restarts the process.
//
// TODO block until process has been confirmed to have come back up
// successfully.
func (c *Children) RestartDNSMasq(hostBootstrap bootstrap.Bootstrap) error {
_, err := dnsmasqWriteConfig(
c.opts.EnvVars.RuntimeDirPath, c.daemonConfig, hostBootstrap,
c.runtimeDirPath, c.daemonConfig, hostBootstrap,
)
if err != nil {
return fmt.Errorf("writing new dnsmasq config: %w", err)
@ -93,9 +128,12 @@ func (c *Children) RestartDNSMasq(hostBootstrap bootstrap.Bootstrap) error {
}
// RestartNebula rewrites the nebula config and restarts the process.
//
// TODO block until process has been confirmed to have come back up
// successfully.
func (c *Children) RestartNebula(hostBootstrap bootstrap.Bootstrap) error {
_, err := nebulaWriteConfig(
c.opts.EnvVars.RuntimeDirPath, c.daemonConfig, hostBootstrap,
c.runtimeDirPath, c.daemonConfig, hostBootstrap,
)
if err != nil {
return fmt.Errorf("writing a new nebula config: %w", err)
@ -105,6 +143,30 @@ func (c *Children) RestartNebula(hostBootstrap bootstrap.Bootstrap) error {
return nil
}
// Reload applies a ReloadDiff to the Children, using the given bootstrap which
// must be the same one which was passed to CalculateReloadDiff.
func (c *Children) Reload(
ctx context.Context, newBootstrap bootstrap.Bootstrap, diff ReloadDiff,
) error {
var errs []error
if diff.DNSChanged {
c.logger.Info(ctx, "Restarting dnsmasq to account for bootstrap changes")
if err := c.RestartDNSMasq(newBootstrap); err != nil {
errs = append(errs, fmt.Errorf("restarting dnsmasq: %w", err))
}
}
if diff.NebulaChanged {
c.logger.Info(ctx, "Restarting nebula to account for bootstrap changes")
if err := c.RestartNebula(newBootstrap); err != nil {
errs = append(errs, fmt.Errorf("restarting nebula: %w", err))
}
}
return errors.Join(errs...)
}
// Shutdown blocks until all child processes have gracefully shut themselves
// down.
func (c *Children) Shutdown() {

View File

@ -0,0 +1,47 @@
package children
import (
"errors"
"fmt"
"isle/bootstrap"
"isle/daemon/daecommon"
"reflect"
)
// ReloadDiff describes which children had their configurations changed as part
// of a change in the bootstrap.
type ReloadDiff struct {
NebulaChanged bool
DNSChanged bool
}
// CalculateReloadDiff calculates a ReloadDiff based on an old and new
// bootstrap.
func CalculateReloadDiff(
daemonConfig daecommon.Config,
prevBootstrap, nextBootstrap bootstrap.Bootstrap,
) (
diff ReloadDiff, err error,
) {
{
prevNebulaConfig, prevErr := nebulaConfig(daemonConfig, prevBootstrap)
nextNebulaConfig, nextErr := nebulaConfig(daemonConfig, nextBootstrap)
if err = errors.Join(prevErr, nextErr); err != nil {
err = fmt.Errorf("calculating nebula config: %w", err)
return
}
diff.NebulaChanged = !reflect.DeepEqual(
prevNebulaConfig, nextNebulaConfig,
)
}
{
diff.DNSChanged = !reflect.DeepEqual(
dnsmasqConfig(daemonConfig, prevBootstrap),
dnsmasqConfig(daemonConfig, nextBootstrap),
)
}
return
}

View File

@ -1,4 +1,4 @@
package daemon
package children
import (
"context"

View File

@ -1,4 +1,4 @@
package daemon
package children
import (
"context"
@ -20,27 +20,6 @@ func garageAdminClientLogger(logger *mlog.Logger) *mlog.Logger {
return logger.WithNamespace("garageAdminClient")
}
// newGarageAdminClient will return an AdminClient for a local garage instance,
// or it will _panic_ if there is no local instance configured.
func newGarageAdminClient(
logger *mlog.Logger,
daemonConfig daecommon.Config,
adminToken string,
hostBootstrap bootstrap.Bootstrap,
) *garage.AdminClient {
thisHost := hostBootstrap.ThisHost()
return garage.NewAdminClient(
garageAdminClientLogger(logger),
net.JoinHostPort(
thisHost.IP().String(),
strconv.Itoa(daemonConfig.Storage.Allocations[0].AdminPort),
),
adminToken,
)
}
func waitForGarage(
ctx context.Context,
logger *mlog.Logger,
@ -82,25 +61,6 @@ func waitForGarage(
}
// bootstrapGarageHostForAlloc returns the bootstrap.GarageHostInstance which
// corresponds with the given alloc from the daemon config. This will panic if
// no associated instance can be found.
//
// This assumes that coalesceDaemonConfigAndBootstrap has already been called.
func bootstrapGarageHostForAlloc(
host bootstrap.Host,
alloc daecommon.ConfigStorageAllocation,
) bootstrap.GarageHostInstance {
for _, inst := range host.Garage.Instances {
if inst.RPCPort == alloc.RPCPort {
return inst
}
}
panic(fmt.Sprintf("could not find alloc %+v in the bootstrap data", alloc))
}
func garageWriteChildConfig(
rpcSecret, runtimeDirPath, adminToken string,
hostBootstrap bootstrap.Bootstrap,
@ -110,7 +70,7 @@ func garageWriteChildConfig(
) {
thisHost := hostBootstrap.ThisHost()
id := bootstrapGarageHostForAlloc(thisHost, alloc).ID
id := daecommon.BootstrapGarageHostForAlloc(thisHost, alloc).ID
peer := garage.LocalPeer{
RemotePeer: garage.RemotePeer{
@ -185,41 +145,3 @@ func garagePmuxProcConfigs(
return pmuxProcConfigs, nil
}
func garageApplyLayout(
ctx context.Context,
logger *mlog.Logger,
daemonConfig daecommon.Config,
adminToken string,
hostBootstrap bootstrap.Bootstrap,
) error {
var (
adminClient = newGarageAdminClient(
logger, daemonConfig, adminToken, hostBootstrap,
)
thisHost = hostBootstrap.ThisHost()
hostName = thisHost.Name
allocs = daemonConfig.Storage.Allocations
peers = make([]garage.PeerLayout, len(allocs))
)
for i, alloc := range allocs {
id := bootstrapGarageHostForAlloc(thisHost, alloc).ID
zone := string(hostName)
if alloc.Zone != "" {
zone = alloc.Zone
}
peers[i] = garage.PeerLayout{
ID: id,
Capacity: alloc.Capacity * 1_000_000_000,
Zone: zone,
Tags: []string{},
}
}
return adminClient.ApplyLayout(ctx, peers)
}

View File

@ -0,0 +1,30 @@
package children
import (
"context"
"time"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
)
// until keeps trying fn until it returns nil, returning true. If the context is
// canceled then until returns false.
func until(
ctx context.Context,
logger *mlog.Logger,
descr string,
fn func(context.Context) error,
) bool {
for {
logger.Info(ctx, descr)
err := fn(ctx)
if err == nil {
return true
} else if ctxErr := ctx.Err(); ctxErr != nil {
return false
}
logger.Warn(ctx, descr+" failed, retrying in one second", err)
time.Sleep(1 * time.Second)
}
}

View File

@ -1,4 +1,4 @@
package daemon
package children
import (
"context"

View File

@ -1,4 +1,4 @@
package daemon
package children
import (
"context"
@ -19,7 +19,7 @@ func (c *Children) newPmuxConfig(
pmuxlib.Config, error,
) {
nebulaPmuxProcConfig, err := nebulaPmuxProcConfig(
c.opts.EnvVars.RuntimeDirPath,
c.runtimeDirPath,
binDirPath,
daemonConfig,
hostBootstrap,
@ -30,7 +30,7 @@ func (c *Children) newPmuxConfig(
dnsmasqPmuxProcConfig, err := dnsmasqPmuxProcConfig(
c.logger,
c.opts.EnvVars.RuntimeDirPath,
c.runtimeDirPath,
binDirPath,
daemonConfig,
hostBootstrap,
@ -45,7 +45,7 @@ func (c *Children) newPmuxConfig(
ctx,
c.logger,
garageRPCSecret,
c.opts.EnvVars.RuntimeDirPath,
c.runtimeDirPath,
binDirPath,
daemonConfig,
garageAdminToken,

View File

@ -3,6 +3,7 @@ package daecommon
import (
"fmt"
"io"
"isle/bootstrap"
"isle/yamlutil"
"os"
"path/filepath"
@ -181,3 +182,20 @@ func LoadConfig(
return config, nil
}
// BootstrapGarageHostForAlloc returns the bootstrap.GarageHostInstance which
// corresponds with the given alloc from the daemon config. This will panic if
// no associated instance can be found.
func BootstrapGarageHostForAlloc(
host bootstrap.Host,
alloc ConfigStorageAllocation,
) bootstrap.GarageHostInstance {
for _, inst := range host.Garage.Instances {
if inst.RPCPort == alloc.RPCPort {
return inst
}
}
panic(fmt.Sprintf("could not find alloc %+v in the bootstrap data", alloc))
}

View File

@ -9,15 +9,14 @@ import (
"crypto/rand"
"errors"
"fmt"
"io"
"io/fs"
"isle/bootstrap"
"isle/daemon/children"
"isle/daemon/daecommon"
"isle/jsonutil"
"isle/nebula"
"isle/secrets"
"net/netip"
"os"
"path/filepath"
"slices"
"sync"
@ -30,10 +29,7 @@ import (
// Opts are optional parameters which can be passed in when initializing a new
// Daemon instance. A nil Opts is equivalent to a zero value.
type Opts struct {
// Stdout and Stderr are what the associated outputs from child processes
// will be directed to.
Stdout, Stderr io.Writer
ChildrenOpts *children.Opts
// Defaults to that returned by daecommon.GetEnvVars.
EnvVars daecommon.EnvVars
}
@ -43,14 +39,6 @@ func (o *Opts) withDefaults() *Opts {
o = new(Opts)
}
if o.Stdout == nil {
o.Stdout = os.Stdout
}
if o.Stderr == nil {
o.Stderr = os.Stderr
}
if o.EnvVars == (daecommon.EnvVars{}) {
o.EnvVars = daecommon.GetEnvVars()
}
@ -94,7 +82,7 @@ type Daemon struct {
l sync.RWMutex
state int
children *Children
children *children.Children
currBootstrap bootstrap.Bootstrap
shutdownCh chan struct{}
@ -198,15 +186,16 @@ func (d *Daemon) initialize(
d.state = daemonStateInitializing
d.logger.Info(ctx, "Creating child processes")
d.children, err = NewChildren(
d.children, err = children.New(
ctx,
d.logger.WithNamespace("children"),
d.envBinDirPath,
d.secretsStore,
d.daemonConfig,
d.opts.EnvVars.RuntimeDirPath,
d.garageAdminToken,
currBootstrap,
d.opts,
d.opts.ChildrenOpts,
)
if err != nil {
return fmt.Errorf("creating child processes: %w", err)
@ -282,10 +271,12 @@ func (d *Daemon) reload(
// whatever is in garage
newBootstrap.Hosts[thisHost.Name] = thisHost
diff, err := calcBootstrapDiff(d.daemonConfig, currBootstrap, newBootstrap)
diff, err := children.CalculateReloadDiff(
d.daemonConfig, currBootstrap, newBootstrap,
)
if err != nil {
return fmt.Errorf("calculating diff between bootstraps: %w", err)
} else if diff == (bootstrapDiff{}) {
} else if diff == (children.ReloadDiff{}) {
d.logger.Info(ctx, "No changes to bootstrap detected")
return nil
}
@ -295,29 +286,11 @@ func (d *Daemon) reload(
d.currBootstrap = newBootstrap
d.l.Unlock()
var errs []error
// TODO each of these changed cases should block until its respective
// service is confirmed to have come back online.
// TODO it's possible that reload could be called concurrently, and one call
// would override the reloading done by the other.
if diff.dnsChanged {
d.logger.Info(ctx, "Restarting dnsmasq to account for bootstrap changes")
if err := d.children.RestartDNSMasq(newBootstrap); err != nil {
errs = append(errs, fmt.Errorf("restarting dnsmasq: %w", err))
}
if err := d.children.Reload(ctx, newBootstrap, diff); err != nil {
return fmt.Errorf("reloading child processes (diff:%+v): %w", diff, err)
}
if diff.nebulaChanged {
d.logger.Info(ctx, "Restarting nebula to account for bootstrap changes")
if err := d.children.RestartNebula(newBootstrap); err != nil {
errs = append(errs, fmt.Errorf("restarting nebula: %w", err))
}
}
return errors.Join(errs...)
return nil
}
func (d *Daemon) postInit(ctx context.Context) error {

76
go/daemon/garage.go Normal file
View File

@ -0,0 +1,76 @@
package daemon
import (
"context"
"isle/bootstrap"
"isle/daemon/daecommon"
"net"
"strconv"
"isle/garage"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
)
func garageAdminClientLogger(logger *mlog.Logger) *mlog.Logger {
return logger.WithNamespace("garageAdminClient")
}
// newGarageAdminClient will return an AdminClient for a local garage instance,
// or it will _panic_ if there is no local instance configured.
func newGarageAdminClient(
logger *mlog.Logger,
daemonConfig daecommon.Config,
adminToken string,
hostBootstrap bootstrap.Bootstrap,
) *garage.AdminClient {
thisHost := hostBootstrap.ThisHost()
return garage.NewAdminClient(
garageAdminClientLogger(logger),
net.JoinHostPort(
thisHost.IP().String(),
strconv.Itoa(daemonConfig.Storage.Allocations[0].AdminPort),
),
adminToken,
)
}
func garageApplyLayout(
ctx context.Context,
logger *mlog.Logger,
daemonConfig daecommon.Config,
adminToken string,
hostBootstrap bootstrap.Bootstrap,
) error {
var (
adminClient = newGarageAdminClient(
logger, daemonConfig, adminToken, hostBootstrap,
)
thisHost = hostBootstrap.ThisHost()
hostName = thisHost.Name
allocs = daemonConfig.Storage.Allocations
peers = make([]garage.PeerLayout, len(allocs))
)
for i, alloc := range allocs {
id := daecommon.BootstrapGarageHostForAlloc(thisHost, alloc).ID
zone := string(hostName)
if alloc.Zone != "" {
zone = alloc.Zone
}
peers[i] = garage.PeerLayout{
ID: id,
Capacity: alloc.Capacity * 1_000_000_000,
Zone: zone,
Tags: []string{},
}
}
return adminClient.ApplyLayout(ctx, peers)
}

View File

@ -1,7 +1,6 @@
package daemon
import (
"context"
"crypto/rand"
"encoding/hex"
"errors"
@ -9,33 +8,8 @@ import (
"io/fs"
"os"
"path/filepath"
"time"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
)
// until keeps trying fn until it returns nil, returning true. If the context is
// canceled then until returns false.
func until(
ctx context.Context,
logger *mlog.Logger,
descr string,
fn func(context.Context) error,
) bool {
for {
logger.Info(ctx, descr)
err := fn(ctx)
if err == nil {
return true
} else if ctxErr := ctx.Err(); ctxErr != nil {
return false
}
logger.Warn(ctx, descr+" failed, retrying in one second", err)
time.Sleep(1 * time.Second)
}
}
func randStr(l int) string {
b := make([]byte, l)
if _, err := rand.Read(b); err != nil {