Move daemon sub-process logic into daemon package
This commit is contained in:
parent
aa1a8ea806
commit
93301b9f4c
@ -1,7 +1,6 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
@ -15,7 +14,6 @@ import (
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"code.betamike.com/micropelago/pmux/pmuxlib"
|
||||
"github.com/mediocregopher/mediocre-go-lib/v2/mlog"
|
||||
)
|
||||
|
||||
@ -168,50 +166,26 @@ var subCmdAdminCreateNetwork = subCmd{
|
||||
return fmt.Errorf("merging daemon config into bootstrap data: %w", err)
|
||||
}
|
||||
|
||||
nebulaPmuxProcConfig, err := nebulaPmuxProcConfig(hostBootstrap, daemonConfig)
|
||||
daemonInst, err := daemon.New(
|
||||
ctx,
|
||||
logger.WithNamespace("daemon"),
|
||||
daemonConfig,
|
||||
hostBootstrap,
|
||||
envRuntimeDirPath,
|
||||
envBinDirPath,
|
||||
&daemon.Opts{
|
||||
// SkipHostBootstrapPush is required, because the global bucket
|
||||
// hasn't actually been initialized yet, so there's nowhere to
|
||||
// push to.
|
||||
SkipHostBootstrapPush: true,
|
||||
|
||||
// NOTE both stdout and stderr are sent to stderr, so that the
|
||||
// user can pipe the resulting admin.json to stdout.
|
||||
Stdout: os.Stderr,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("generating nebula config: %w", err)
|
||||
}
|
||||
|
||||
garagePmuxProcConfigs, err := garagePmuxProcConfigs(hostBootstrap, daemonConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("generating garage configs: %w", err)
|
||||
}
|
||||
|
||||
pmuxConfig := pmuxlib.Config{
|
||||
Processes: append(
|
||||
[]pmuxlib.ProcessConfig{
|
||||
nebulaPmuxProcConfig,
|
||||
},
|
||||
garagePmuxProcConfigs...,
|
||||
),
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
pmuxDoneCh := make(chan struct{})
|
||||
|
||||
logger.Info(ctx, "starting child processes")
|
||||
go func() {
|
||||
// NOTE both stdout and stderr are sent to stderr, so that the user
|
||||
// can pipe the resulting admin.json to stdout.
|
||||
pmuxlib.Run(ctx, os.Stderr, os.Stderr, pmuxConfig)
|
||||
close(pmuxDoneCh)
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
cancel()
|
||||
logger.Info(ctx, "waiting for child processes to exit")
|
||||
<-pmuxDoneCh
|
||||
}()
|
||||
|
||||
logger.Info(ctx, "waiting for garage instances to come online")
|
||||
if err := waitForGarageAndNebula(ctx, logger, hostBootstrap, daemonConfig); err != nil {
|
||||
return fmt.Errorf("waiting for garage to start up: %w", err)
|
||||
}
|
||||
|
||||
logger.Info(ctx, "applying initial garage layout")
|
||||
if err := garageApplyLayout(ctx, logger, hostBootstrap, daemonConfig); err != nil {
|
||||
return fmt.Errorf("applying initial garage layout: %w", err)
|
||||
return fmt.Errorf("initializing daemon: %w", err)
|
||||
}
|
||||
|
||||
logger.Info(ctx, "initializing garage shared global bucket")
|
||||
@ -219,6 +193,17 @@ var subCmdAdminCreateNetwork = subCmd{
|
||||
ctx, logger, hostBootstrap, daemonConfig,
|
||||
)
|
||||
|
||||
if cErr := (garage.AdminClientError{}); errors.As(err, &cErr) && cErr.StatusCode == 409 {
|
||||
return fmt.Errorf("shared global bucket has already been created, are the storage allocations from a previously initialized isle being used?")
|
||||
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("initializing garage shared global bucket: %w", err)
|
||||
}
|
||||
|
||||
if err := daemonInst.Shutdown(ctx); err != nil {
|
||||
return fmt.Errorf("shutting down daemon: %w (this can mean there are zombie children leftover)", err)
|
||||
}
|
||||
|
||||
hostBootstrap.Garage.GlobalBucketS3APICredentials = garageGlobalBucketCreds
|
||||
|
||||
// rewrite the bootstrap now that the global bucket creds have been
|
||||
@ -227,13 +212,6 @@ var subCmdAdminCreateNetwork = subCmd{
|
||||
return fmt.Errorf("writing bootstrap file: %w", err)
|
||||
}
|
||||
|
||||
if cErr := (garage.AdminClientError{}); errors.As(err, &cErr) && cErr.StatusCode == 409 {
|
||||
return fmt.Errorf("shared global bucket has already been created, are the storage allocations from a previously initialized isle being used?")
|
||||
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("initializing garage shared global bucket: %w", err)
|
||||
}
|
||||
|
||||
logger.Info(ctx, "cluster initialized successfully, writing admin.json to stdout")
|
||||
|
||||
adm := admin.Admin{
|
||||
|
@ -7,13 +7,11 @@ import (
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"isle/bootstrap"
|
||||
"isle/daemon"
|
||||
|
||||
"code.betamike.com/micropelago/pmux/pmuxlib"
|
||||
"github.com/mediocregopher/mediocre-go-lib/v2/mctx"
|
||||
"github.com/mediocregopher/mediocre-go-lib/v2/mlog"
|
||||
)
|
||||
@ -94,77 +92,24 @@ func runDaemonPmuxOnce(
|
||||
) (
|
||||
bootstrap.Bootstrap, error,
|
||||
) {
|
||||
|
||||
nebulaPmuxProcConfig, err := nebulaPmuxProcConfig(hostBootstrap, daemonConfig)
|
||||
daemonInst, err := daemon.New(
|
||||
ctx,
|
||||
logger.WithNamespace("daemon"),
|
||||
daemonConfig,
|
||||
hostBootstrap,
|
||||
envRuntimeDirPath,
|
||||
envBinDirPath,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return bootstrap.Bootstrap{}, fmt.Errorf("generating nebula config: %w", err)
|
||||
return bootstrap.Bootstrap{}, fmt.Errorf("initializing daemon: %w", err)
|
||||
}
|
||||
|
||||
dnsmasqPmuxProcConfig, err := dnsmasqPmuxProcConfig(hostBootstrap, daemonConfig)
|
||||
if err != nil {
|
||||
return bootstrap.Bootstrap{}, fmt.Errorf("generating dnsmasq config: %w", err)
|
||||
}
|
||||
|
||||
garagePmuxProcConfigs, err := garagePmuxProcConfigs(hostBootstrap, daemonConfig)
|
||||
if err != nil {
|
||||
return bootstrap.Bootstrap{}, fmt.Errorf("generating garage children configs: %w", err)
|
||||
}
|
||||
|
||||
pmuxConfig := pmuxlib.Config{
|
||||
Processes: append(
|
||||
[]pmuxlib.ProcessConfig{
|
||||
nebulaPmuxProcConfig,
|
||||
dnsmasqPmuxProcConfig,
|
||||
},
|
||||
garagePmuxProcConfigs...,
|
||||
),
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
defer wg.Wait()
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
pmuxlib.Run(ctx, os.Stdout, os.Stderr, pmuxConfig)
|
||||
defer func() {
|
||||
if err := daemonInst.Shutdown(ctx); err != nil {
|
||||
logger.Error(ctx, "failed to cleanly shutdown daemon", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := waitForGarageAndNebula(ctx, logger, hostBootstrap, daemonConfig); err != nil {
|
||||
return bootstrap.Bootstrap{}, fmt.Errorf("waiting for nebula/garage to start up: %w", err)
|
||||
}
|
||||
|
||||
if len(daemonConfig.Storage.Allocations) > 0 {
|
||||
|
||||
err := doOnce(ctx, func(ctx context.Context) error {
|
||||
if err := garageApplyLayout(ctx, logger, hostBootstrap, daemonConfig); err != nil {
|
||||
logger.Error(ctx, "applying garage layout", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return bootstrap.Bootstrap{}, fmt.Errorf("applying garage layout: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
err = doOnce(ctx, func(ctx context.Context) error {
|
||||
if err := hostBootstrap.PutGarageBoostrapHost(ctx); err != nil {
|
||||
logger.Error(ctx, "updating host info in garage", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return bootstrap.Bootstrap{}, fmt.Errorf("updating host info in garage: %w", err)
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(3 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
@ -176,7 +121,7 @@ func runDaemonPmuxOnce(
|
||||
|
||||
case <-ticker.C:
|
||||
|
||||
fmt.Fprintln(os.Stderr, "checking for changes to bootstrap")
|
||||
logger.Info(ctx, "checking for changes to bootstrap")
|
||||
|
||||
var (
|
||||
changed bool
|
||||
@ -301,8 +246,8 @@ var subCmdDaemon = subCmd{
|
||||
// we update this Host's data using whatever configuration has been
|
||||
// provided by the daemon config. This way the daemon has the most
|
||||
// up-to-date possible bootstrap. This updated bootstrap will later get
|
||||
// updated in garage using bootstrap.PutGarageBoostrapHost, so other
|
||||
// hosts will see it as well.
|
||||
// updated in garage as a background daemon task, so other hosts will
|
||||
// see it as well.
|
||||
if hostBootstrap, daemonConfig, err = coalesceDaemonConfigAndBootstrap(hostBootstrap, daemonConfig); err != nil {
|
||||
return fmt.Errorf("merging daemon config into bootstrap data: %w", err)
|
||||
}
|
||||
|
@ -1,12 +1,10 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"isle/bootstrap"
|
||||
"isle/daemon"
|
||||
"isle/garage/garagesrv"
|
||||
"time"
|
||||
)
|
||||
|
||||
func coalesceDaemonConfigAndBootstrap(
|
||||
@ -52,15 +50,3 @@ func coalesceDaemonConfigAndBootstrap(
|
||||
|
||||
return hostBootstrap, daemonConfig, nil
|
||||
}
|
||||
|
||||
func doOnce(ctx context.Context, fn func(context.Context) error) error {
|
||||
for {
|
||||
if err := fn(ctx); err == nil {
|
||||
return nil
|
||||
} else if ctxErr := ctx.Err(); ctxErr != nil {
|
||||
return ctxErr
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
@ -6,177 +6,10 @@ import (
|
||||
"isle/bootstrap"
|
||||
"isle/daemon"
|
||||
"isle/garage"
|
||||
"isle/garage/garagesrv"
|
||||
"net"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
"code.betamike.com/micropelago/pmux/pmuxlib"
|
||||
"github.com/mediocregopher/mediocre-go-lib/v2/mctx"
|
||||
"github.com/mediocregopher/mediocre-go-lib/v2/mlog"
|
||||
)
|
||||
|
||||
func garageAdminClientLogger(logger *mlog.Logger) *mlog.Logger {
|
||||
return logger.WithNamespace("garageAdminClient")
|
||||
}
|
||||
|
||||
// newGarageAdminClient will return an AdminClient for a local garage instance,
|
||||
// or it will _panic_ if there is no local instance configured.
|
||||
func newGarageAdminClient(
|
||||
logger *mlog.Logger,
|
||||
hostBootstrap bootstrap.Bootstrap,
|
||||
daemonConfig daemon.Config,
|
||||
) *garage.AdminClient {
|
||||
|
||||
thisHost := hostBootstrap.ThisHost()
|
||||
|
||||
return garage.NewAdminClient(
|
||||
garageAdminClientLogger(logger),
|
||||
net.JoinHostPort(
|
||||
thisHost.IP().String(),
|
||||
strconv.Itoa(daemonConfig.Storage.Allocations[0].AdminPort),
|
||||
),
|
||||
hostBootstrap.Garage.AdminToken,
|
||||
)
|
||||
}
|
||||
|
||||
func waitForGarageAndNebula(
|
||||
ctx context.Context,
|
||||
logger *mlog.Logger,
|
||||
hostBootstrap bootstrap.Bootstrap,
|
||||
daemonConfig daemon.Config,
|
||||
) error {
|
||||
|
||||
if err := waitForNebula(ctx, hostBootstrap); err != nil {
|
||||
return fmt.Errorf("waiting for nebula to start: %w", err)
|
||||
}
|
||||
|
||||
allocs := daemonConfig.Storage.Allocations
|
||||
|
||||
// if this host doesn't have any allocations specified then fall back to
|
||||
// waiting for nebula
|
||||
if len(allocs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
adminClientLogger := garageAdminClientLogger(logger)
|
||||
|
||||
for _, alloc := range allocs {
|
||||
|
||||
adminAddr := net.JoinHostPort(
|
||||
hostBootstrap.ThisHost().IP().String(),
|
||||
strconv.Itoa(alloc.AdminPort),
|
||||
)
|
||||
|
||||
adminClient := garage.NewAdminClient(
|
||||
adminClientLogger,
|
||||
adminAddr,
|
||||
hostBootstrap.Garage.AdminToken,
|
||||
)
|
||||
|
||||
ctx := mctx.Annotate(ctx, "garageAdminAddr", adminAddr)
|
||||
logger.Debug(ctx, "wating for garage instance to start")
|
||||
|
||||
if err := adminClient.Wait(ctx); err != nil {
|
||||
return fmt.Errorf("waiting for garage instance %q to start up: %w", adminAddr, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// bootstrapGarageHostForAlloc returns the bootstrap.GarageHostInstance which
|
||||
// corresponds with the given alloc from the daemon config. This will panic if
|
||||
// no associated instance can be found.
|
||||
//
|
||||
// This assumes that coalesceDaemonConfigAndBootstrap has already been called.
|
||||
func bootstrapGarageHostForAlloc(
|
||||
host bootstrap.Host,
|
||||
alloc daemon.ConfigStorageAllocation,
|
||||
) bootstrap.GarageHostInstance {
|
||||
|
||||
for _, inst := range host.Garage.Instances {
|
||||
if inst.RPCPort == alloc.RPCPort {
|
||||
return inst
|
||||
}
|
||||
}
|
||||
|
||||
panic(fmt.Sprintf("could not find alloc %+v in the bootstrap data", alloc))
|
||||
}
|
||||
|
||||
func garageWriteChildConfig(
|
||||
hostBootstrap bootstrap.Bootstrap,
|
||||
alloc daemon.ConfigStorageAllocation,
|
||||
) (
|
||||
string, error,
|
||||
) {
|
||||
|
||||
thisHost := hostBootstrap.ThisHost()
|
||||
id := bootstrapGarageHostForAlloc(thisHost, alloc).ID
|
||||
|
||||
peer := garage.LocalPeer{
|
||||
RemotePeer: garage.RemotePeer{
|
||||
ID: id,
|
||||
IP: thisHost.IP().String(),
|
||||
RPCPort: alloc.RPCPort,
|
||||
S3APIPort: alloc.S3APIPort,
|
||||
},
|
||||
AdminPort: alloc.AdminPort,
|
||||
}
|
||||
|
||||
garageTomlPath := filepath.Join(
|
||||
envRuntimeDirPath, fmt.Sprintf("garage-%d.toml", alloc.RPCPort),
|
||||
)
|
||||
|
||||
err := garagesrv.WriteGarageTomlFile(garageTomlPath, garagesrv.GarageTomlData{
|
||||
MetaPath: alloc.MetaPath,
|
||||
DataPath: alloc.DataPath,
|
||||
|
||||
RPCSecret: hostBootstrap.Garage.RPCSecret,
|
||||
AdminToken: hostBootstrap.Garage.AdminToken,
|
||||
|
||||
LocalPeer: peer,
|
||||
BootstrapPeers: hostBootstrap.GaragePeers(),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("creating garage.toml file at %q: %w", garageTomlPath, err)
|
||||
}
|
||||
|
||||
return garageTomlPath, nil
|
||||
}
|
||||
|
||||
func garagePmuxProcConfigs(
|
||||
hostBootstrap bootstrap.Bootstrap,
|
||||
daemonConfig daemon.Config,
|
||||
) (
|
||||
[]pmuxlib.ProcessConfig, error,
|
||||
) {
|
||||
|
||||
var pmuxProcConfigs []pmuxlib.ProcessConfig
|
||||
|
||||
for _, alloc := range daemonConfig.Storage.Allocations {
|
||||
|
||||
childConfigPath, err := garageWriteChildConfig(hostBootstrap, alloc)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("writing child config file for alloc %+v: %w", alloc, err)
|
||||
}
|
||||
|
||||
pmuxProcConfigs = append(pmuxProcConfigs, pmuxlib.ProcessConfig{
|
||||
Name: fmt.Sprintf("garage-%d", alloc.RPCPort),
|
||||
Cmd: binPath("garage"),
|
||||
Args: []string{"-c", childConfigPath, "server"},
|
||||
StartAfterFunc: func(ctx context.Context) error {
|
||||
return waitForNebula(ctx, hostBootstrap)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return pmuxProcConfigs, nil
|
||||
}
|
||||
|
||||
func garageInitializeGlobalBucket(
|
||||
ctx context.Context,
|
||||
logger *mlog.Logger,
|
||||
@ -185,7 +18,9 @@ func garageInitializeGlobalBucket(
|
||||
) (
|
||||
garage.S3APICredentials, error,
|
||||
) {
|
||||
adminClient := newGarageAdminClient(logger, hostBootstrap, daemonConfig)
|
||||
adminClient := daemon.NewGarageAdminClient(
|
||||
logger, hostBootstrap, daemonConfig,
|
||||
)
|
||||
|
||||
creds, err := adminClient.CreateS3APICredentials(
|
||||
ctx, garage.GlobalBucketS3APICredentialsName,
|
||||
@ -213,38 +48,3 @@ func garageInitializeGlobalBucket(
|
||||
|
||||
return creds, nil
|
||||
}
|
||||
|
||||
func garageApplyLayout(
|
||||
ctx context.Context,
|
||||
logger *mlog.Logger,
|
||||
hostBootstrap bootstrap.Bootstrap,
|
||||
daemonConfig daemon.Config,
|
||||
) error {
|
||||
|
||||
var (
|
||||
adminClient = newGarageAdminClient(logger, hostBootstrap, daemonConfig)
|
||||
thisHost = hostBootstrap.ThisHost()
|
||||
hostName = thisHost.Name
|
||||
allocs = daemonConfig.Storage.Allocations
|
||||
peers = make([]garage.PeerLayout, len(allocs))
|
||||
)
|
||||
|
||||
for i, alloc := range allocs {
|
||||
|
||||
id := bootstrapGarageHostForAlloc(thisHost, alloc).ID
|
||||
|
||||
zone := hostName
|
||||
if alloc.Zone != "" {
|
||||
zone = alloc.Zone
|
||||
}
|
||||
|
||||
peers[i] = garage.PeerLayout{
|
||||
ID: id,
|
||||
Capacity: alloc.Capacity * 1_000_000_000,
|
||||
Zone: zone,
|
||||
Tags: []string{},
|
||||
}
|
||||
}
|
||||
|
||||
return adminClient.ApplyLayout(ctx, peers)
|
||||
}
|
||||
|
@ -44,10 +44,11 @@ var (
|
||||
"STATE_DIRECTORY",
|
||||
filepath.Join(xdg.StateHome, "isle"),
|
||||
)
|
||||
envBinDirPath = filepath.Join(envAppDirPath, "bin")
|
||||
)
|
||||
|
||||
func binPath(name string) string {
|
||||
return filepath.Join(envAppDirPath, "bin", name)
|
||||
return filepath.Join(envBinDirPath, name)
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
@ -1,10 +1,9 @@
|
||||
package main
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"isle/bootstrap"
|
||||
"isle/daemon"
|
||||
"isle/dnsmasq"
|
||||
"fmt"
|
||||
"isle/bootstrap"
|
||||
"isle/dnsmasq"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
|
||||
@ -12,13 +11,13 @@ import (
|
||||
)
|
||||
|
||||
func dnsmasqPmuxProcConfig(
|
||||
runtimeDirPath, binDirPath string,
|
||||
hostBootstrap bootstrap.Bootstrap,
|
||||
daemonConfig daemon.Config,
|
||||
daemonConfig Config,
|
||||
) (
|
||||
pmuxlib.ProcessConfig, error,
|
||||
) {
|
||||
|
||||
confPath := filepath.Join(envRuntimeDirPath, "dnsmasq.conf")
|
||||
confPath := filepath.Join(runtimeDirPath, "dnsmasq.conf")
|
||||
|
||||
hostsSlice := make([]dnsmasq.ConfDataHost, 0, len(hostBootstrap.Hosts))
|
||||
for _, host := range hostBootstrap.Hosts {
|
||||
@ -45,7 +44,7 @@ func dnsmasqPmuxProcConfig(
|
||||
|
||||
return pmuxlib.ProcessConfig{
|
||||
Name: "dnsmasq",
|
||||
Cmd: binPath("dnsmasq"),
|
||||
Cmd: filepath.Join(binDirPath, "dnsmasq"),
|
||||
Args: []string{"-d", "-C", confPath},
|
||||
}, nil
|
||||
}
|
206
go/daemon/child_garage.go
Normal file
206
go/daemon/child_garage.go
Normal file
@ -0,0 +1,206 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"isle/bootstrap"
|
||||
"isle/garage"
|
||||
"isle/garage/garagesrv"
|
||||
"net"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
"code.betamike.com/micropelago/pmux/pmuxlib"
|
||||
"github.com/mediocregopher/mediocre-go-lib/v2/mctx"
|
||||
"github.com/mediocregopher/mediocre-go-lib/v2/mlog"
|
||||
)
|
||||
|
||||
func garageAdminClientLogger(logger *mlog.Logger) *mlog.Logger {
|
||||
return logger.WithNamespace("garageAdminClient")
|
||||
}
|
||||
|
||||
// NewGarageAdminClient will return an AdminClient for a local garage instance,
|
||||
// or it will _panic_ if there is no local instance configured.
|
||||
func NewGarageAdminClient(
|
||||
logger *mlog.Logger,
|
||||
hostBootstrap bootstrap.Bootstrap,
|
||||
config Config,
|
||||
) *garage.AdminClient {
|
||||
|
||||
thisHost := hostBootstrap.ThisHost()
|
||||
|
||||
return garage.NewAdminClient(
|
||||
garageAdminClientLogger(logger),
|
||||
net.JoinHostPort(
|
||||
thisHost.IP().String(),
|
||||
strconv.Itoa(config.Storage.Allocations[0].AdminPort),
|
||||
),
|
||||
hostBootstrap.Garage.AdminToken,
|
||||
)
|
||||
}
|
||||
|
||||
func (d *daemon) waitForGarage(ctx context.Context) error {
|
||||
|
||||
allocs := d.config.Storage.Allocations
|
||||
|
||||
// if this host doesn't have any allocations specified then fall back to
|
||||
// waiting for nebula
|
||||
if len(allocs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
adminClientLogger := garageAdminClientLogger(d.logger)
|
||||
|
||||
for _, alloc := range allocs {
|
||||
|
||||
adminAddr := net.JoinHostPort(
|
||||
d.hostBootstrap.ThisHost().IP().String(),
|
||||
strconv.Itoa(alloc.AdminPort),
|
||||
)
|
||||
|
||||
adminClient := garage.NewAdminClient(
|
||||
adminClientLogger,
|
||||
adminAddr,
|
||||
d.hostBootstrap.Garage.AdminToken,
|
||||
)
|
||||
|
||||
ctx := mctx.Annotate(ctx, "garageAdminAddr", adminAddr)
|
||||
d.logger.Debug(ctx, "waiting for garage instance to start")
|
||||
|
||||
if err := adminClient.Wait(ctx); err != nil {
|
||||
return fmt.Errorf("waiting for garage instance %q to start up: %w", adminAddr, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// bootstrapGarageHostForAlloc returns the bootstrap.GarageHostInstance which
|
||||
// corresponds with the given alloc from the daemon config. This will panic if
|
||||
// no associated instance can be found.
|
||||
//
|
||||
// This assumes that coalesceDaemonConfigAndBootstrap has already been called.
|
||||
func bootstrapGarageHostForAlloc(
|
||||
host bootstrap.Host,
|
||||
alloc ConfigStorageAllocation,
|
||||
) bootstrap.GarageHostInstance {
|
||||
|
||||
for _, inst := range host.Garage.Instances {
|
||||
if inst.RPCPort == alloc.RPCPort {
|
||||
return inst
|
||||
}
|
||||
}
|
||||
|
||||
panic(fmt.Sprintf("could not find alloc %+v in the bootstrap data", alloc))
|
||||
}
|
||||
|
||||
func garageWriteChildConfig(
|
||||
runtimeDirPath string,
|
||||
hostBootstrap bootstrap.Bootstrap,
|
||||
alloc ConfigStorageAllocation,
|
||||
) (
|
||||
string, error,
|
||||
) {
|
||||
|
||||
thisHost := hostBootstrap.ThisHost()
|
||||
id := bootstrapGarageHostForAlloc(thisHost, alloc).ID
|
||||
|
||||
peer := garage.LocalPeer{
|
||||
RemotePeer: garage.RemotePeer{
|
||||
ID: id,
|
||||
IP: thisHost.IP().String(),
|
||||
RPCPort: alloc.RPCPort,
|
||||
S3APIPort: alloc.S3APIPort,
|
||||
},
|
||||
AdminPort: alloc.AdminPort,
|
||||
}
|
||||
|
||||
garageTomlPath := filepath.Join(
|
||||
runtimeDirPath, fmt.Sprintf("garage-%d.toml", alloc.RPCPort),
|
||||
)
|
||||
|
||||
err := garagesrv.WriteGarageTomlFile(garageTomlPath, garagesrv.GarageTomlData{
|
||||
MetaPath: alloc.MetaPath,
|
||||
DataPath: alloc.DataPath,
|
||||
|
||||
RPCSecret: hostBootstrap.Garage.RPCSecret,
|
||||
AdminToken: hostBootstrap.Garage.AdminToken,
|
||||
|
||||
LocalPeer: peer,
|
||||
BootstrapPeers: hostBootstrap.GaragePeers(),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("creating garage.toml file at %q: %w", garageTomlPath, err)
|
||||
}
|
||||
|
||||
return garageTomlPath, nil
|
||||
}
|
||||
|
||||
func garagePmuxProcConfigs(
|
||||
runtimeDirPath, binDirPath string,
|
||||
hostBootstrap bootstrap.Bootstrap,
|
||||
daemonConfig Config,
|
||||
) (
|
||||
[]pmuxlib.ProcessConfig, error,
|
||||
) {
|
||||
|
||||
var pmuxProcConfigs []pmuxlib.ProcessConfig
|
||||
|
||||
for _, alloc := range daemonConfig.Storage.Allocations {
|
||||
|
||||
childConfigPath, err := garageWriteChildConfig(
|
||||
runtimeDirPath, hostBootstrap, alloc,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("writing child config file for alloc %+v: %w", alloc, err)
|
||||
}
|
||||
|
||||
pmuxProcConfigs = append(pmuxProcConfigs, pmuxlib.ProcessConfig{
|
||||
Name: fmt.Sprintf("garage-%d", alloc.RPCPort),
|
||||
Cmd: filepath.Join(binDirPath, "garage"),
|
||||
Args: []string{"-c", childConfigPath, "server"},
|
||||
StartAfterFunc: func(ctx context.Context) error {
|
||||
return waitForNebula(ctx, hostBootstrap)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return pmuxProcConfigs, nil
|
||||
}
|
||||
|
||||
func garageApplyLayout(
|
||||
ctx context.Context,
|
||||
logger *mlog.Logger,
|
||||
hostBootstrap bootstrap.Bootstrap,
|
||||
config Config,
|
||||
) error {
|
||||
|
||||
var (
|
||||
adminClient = NewGarageAdminClient(logger, hostBootstrap, config)
|
||||
thisHost = hostBootstrap.ThisHost()
|
||||
hostName = thisHost.Name
|
||||
allocs = config.Storage.Allocations
|
||||
peers = make([]garage.PeerLayout, len(allocs))
|
||||
)
|
||||
|
||||
for i, alloc := range allocs {
|
||||
|
||||
id := bootstrapGarageHostForAlloc(thisHost, alloc).ID
|
||||
|
||||
zone := hostName
|
||||
if alloc.Zone != "" {
|
||||
zone = alloc.Zone
|
||||
}
|
||||
|
||||
peers[i] = garage.PeerLayout{
|
||||
ID: id,
|
||||
Capacity: alloc.Capacity * 1_000_000_000,
|
||||
Zone: zone,
|
||||
Tags: []string{},
|
||||
}
|
||||
}
|
||||
|
||||
return adminClient.ApplyLayout(ctx, peers)
|
||||
}
|
@ -1,10 +1,9 @@
|
||||
package main
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"isle/bootstrap"
|
||||
"isle/daemon"
|
||||
"isle/yamlutil"
|
||||
"net"
|
||||
"path/filepath"
|
||||
@ -17,14 +16,16 @@ import (
|
||||
// this by attempting to create a UDP connection which has the nebula IP set as
|
||||
// its source. If this succeeds we can assume that at the very least the nebula
|
||||
// interface has been initialized.
|
||||
func waitForNebula(ctx context.Context, hostBootstrap bootstrap.Bootstrap) error {
|
||||
func waitForNebula(
|
||||
ctx context.Context, hostBootstrap bootstrap.Bootstrap,
|
||||
) error {
|
||||
|
||||
ip := hostBootstrap.ThisHost().IP()
|
||||
|
||||
lUdpAddr := &net.UDPAddr{IP: ip, Port: 0}
|
||||
rUdpAddr := &net.UDPAddr{IP: ip, Port: 45535}
|
||||
|
||||
return doOnce(ctx, func(context.Context) error {
|
||||
return until(ctx, func(context.Context) error {
|
||||
conn, err := net.DialUDP("udp", lUdpAddr, rUdpAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -35,8 +36,9 @@ func waitForNebula(ctx context.Context, hostBootstrap bootstrap.Bootstrap) error
|
||||
}
|
||||
|
||||
func nebulaPmuxProcConfig(
|
||||
runtimeDirPath, binDirPath string,
|
||||
hostBootstrap bootstrap.Bootstrap,
|
||||
daemonConfig daemon.Config,
|
||||
daemonConfig Config,
|
||||
) (
|
||||
pmuxlib.ProcessConfig, error,
|
||||
) {
|
||||
@ -122,7 +124,7 @@ func nebulaPmuxProcConfig(
|
||||
}
|
||||
}
|
||||
|
||||
nebulaYmlPath := filepath.Join(envRuntimeDirPath, "nebula.yml")
|
||||
nebulaYmlPath := filepath.Join(runtimeDirPath, "nebula.yml")
|
||||
|
||||
if err := yamlutil.WriteYamlFile(config, nebulaYmlPath, 0440); err != nil {
|
||||
return pmuxlib.ProcessConfig{}, fmt.Errorf("writing nebula.yml to %q: %w", nebulaYmlPath, err)
|
||||
@ -130,7 +132,7 @@ func nebulaPmuxProcConfig(
|
||||
|
||||
return pmuxlib.ProcessConfig{
|
||||
Name: "nebula",
|
||||
Cmd: binPath("nebula"),
|
||||
Cmd: filepath.Join(binDirPath, "nebula"),
|
||||
Args: []string{"-config", nebulaYmlPath},
|
||||
}, nil
|
||||
}
|
@ -1,6 +1,20 @@
|
||||
package daemon
|
||||
|
||||
import "strconv"
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"isle/yamlutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
"github.com/imdario/mergo"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
func defaultConfigPath(appDirPath string) string {
|
||||
return filepath.Join(appDirPath, "etc", "daemon.yml")
|
||||
}
|
||||
|
||||
type ConfigTun struct {
|
||||
Device string `yaml:"device"`
|
||||
@ -100,3 +114,70 @@ func (c *Config) fillDefaults() {
|
||||
firewallGarageInbound...,
|
||||
)
|
||||
}
|
||||
|
||||
// CopyDefaultConfig copies the daemon config file embedded in the AppDir into
|
||||
// the given io.Writer.
|
||||
func CopyDefaultConfig(into io.Writer, appDirPath string) error {
|
||||
|
||||
defaultConfigPath := defaultConfigPath(appDirPath)
|
||||
|
||||
f, err := os.Open(defaultConfigPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening daemon config at %q: %w", defaultConfigPath, err)
|
||||
}
|
||||
|
||||
defer f.Close()
|
||||
|
||||
if _, err := io.Copy(into, f); err != nil {
|
||||
return fmt.Errorf("copying daemon config from %q: %w", defaultConfigPath, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoadConfig loads the daemon config from userConfigPath, merges it with
|
||||
// the default found in the appDirPath, and returns the result.
|
||||
//
|
||||
// If userConfigPath is not given then the default is loaded and returned.
|
||||
func LoadConfig(
|
||||
appDirPath, userConfigPath string,
|
||||
) (
|
||||
Config, error,
|
||||
) {
|
||||
|
||||
defaultConfigPath := defaultConfigPath(appDirPath)
|
||||
|
||||
var fullDaemon map[string]interface{}
|
||||
|
||||
if err := yamlutil.LoadYamlFile(&fullDaemon, defaultConfigPath); err != nil {
|
||||
return Config{}, fmt.Errorf("parsing default daemon config file: %w", err)
|
||||
}
|
||||
|
||||
if userConfigPath != "" {
|
||||
|
||||
var daemonConfig map[string]interface{}
|
||||
if err := yamlutil.LoadYamlFile(&daemonConfig, userConfigPath); err != nil {
|
||||
return Config{}, fmt.Errorf("parsing %q: %w", userConfigPath, err)
|
||||
}
|
||||
|
||||
err := mergo.Merge(&fullDaemon, daemonConfig, mergo.WithOverride)
|
||||
if err != nil {
|
||||
return Config{}, fmt.Errorf("merging contents of file %q: %w", userConfigPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
fullDaemonB, err := yaml.Marshal(fullDaemon)
|
||||
|
||||
if err != nil {
|
||||
return Config{}, fmt.Errorf("yaml marshaling: %w", err)
|
||||
}
|
||||
|
||||
var config Config
|
||||
if err := yaml.Unmarshal(fullDaemonB, &config); err != nil {
|
||||
return Config{}, fmt.Errorf("yaml unmarshaling back into Config struct: %w", err)
|
||||
}
|
||||
|
||||
config.fillDefaults()
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
@ -1,85 +1,200 @@
|
||||
// Package daemon contains types and functions related specifically to the
|
||||
// isle daemon.
|
||||
// Package daemon implements the isle daemon, which is a long-running service
|
||||
// managing all isle background tasks and sub-processes for a single cluster.
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"isle/yamlutil"
|
||||
"isle/bootstrap"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/imdario/mergo"
|
||||
"gopkg.in/yaml.v3"
|
||||
"code.betamike.com/micropelago/pmux/pmuxlib"
|
||||
"github.com/mediocregopher/mediocre-go-lib/v2/mlog"
|
||||
)
|
||||
|
||||
func defaultConfigPath(appDirPath string) string {
|
||||
return filepath.Join(appDirPath, "etc", "daemon.yml")
|
||||
type daemon struct {
|
||||
logger *mlog.Logger
|
||||
config Config
|
||||
hostBootstrap bootstrap.Bootstrap
|
||||
opts Opts
|
||||
|
||||
pmuxCancelFn context.CancelFunc
|
||||
pmuxStoppedCh chan struct{}
|
||||
}
|
||||
|
||||
// CopyDefaultConfig copies the daemon config file embedded in the AppDir into
|
||||
// the given io.Writer.
|
||||
func CopyDefaultConfig(into io.Writer, appDirPath string) error {
|
||||
// Daemon presents all functionality required for client frontends to interact
|
||||
// with isle, typically via the unix socket.
|
||||
type Daemon interface {
|
||||
|
||||
defaultConfigPath := defaultConfigPath(appDirPath)
|
||||
// Shutdown blocks until all resources held or created by the daemon,
|
||||
// including child processes it has started, have been cleaned up, or until
|
||||
// the context is canceled.
|
||||
//
|
||||
// If this returns an error then it's possible that child processes are
|
||||
// still running and are no longer managed.
|
||||
Shutdown(context.Context) error
|
||||
}
|
||||
|
||||
f, err := os.Open(defaultConfigPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening daemon config at %q: %w", defaultConfigPath, err)
|
||||
// Opts are optional parameters which can be passed in when initializing a new
|
||||
// Daemon instance. A nil Opts is equivalent to a zero value.
|
||||
type Opts struct {
|
||||
// SkipHostBootstrapPush, if set, will cause the Daemon to not push the
|
||||
// bootstrap to garage upon a successful initialization.
|
||||
SkipHostBootstrapPush bool
|
||||
|
||||
// Stdout and Stderr are what the associated outputs from child processes
|
||||
// will be directed to.
|
||||
Stdout, Stderr io.Writer
|
||||
}
|
||||
|
||||
func (o *Opts) withDefaults() *Opts {
|
||||
if o == nil {
|
||||
o = new(Opts)
|
||||
}
|
||||
|
||||
defer f.Close()
|
||||
if o.Stdout == nil {
|
||||
o.Stdout = os.Stdout
|
||||
}
|
||||
|
||||
if _, err := io.Copy(into, f); err != nil {
|
||||
return fmt.Errorf("copying daemon config from %q: %w", defaultConfigPath, err)
|
||||
if o.Stderr == nil {
|
||||
o.Stderr = os.Stderr
|
||||
}
|
||||
|
||||
return o
|
||||
}
|
||||
|
||||
// New initialized and returns a Daemon. If initialization fails an error is
|
||||
// returned.
|
||||
func New(
|
||||
ctx context.Context,
|
||||
logger *mlog.Logger,
|
||||
config Config,
|
||||
hostBootstrap bootstrap.Bootstrap,
|
||||
runtimeDirPath, binDirPath string,
|
||||
opts *Opts,
|
||||
) (
|
||||
Daemon, error,
|
||||
) {
|
||||
opts = opts.withDefaults()
|
||||
|
||||
nebulaPmuxProcConfig, err := nebulaPmuxProcConfig(
|
||||
runtimeDirPath, binDirPath, hostBootstrap, config,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("generating nebula config: %w", err)
|
||||
}
|
||||
|
||||
dnsmasqPmuxProcConfig, err := dnsmasqPmuxProcConfig(
|
||||
runtimeDirPath, binDirPath, hostBootstrap, config,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("generating dnsmasq config: %w", err)
|
||||
}
|
||||
|
||||
garagePmuxProcConfigs, err := garagePmuxProcConfigs(
|
||||
runtimeDirPath, binDirPath, hostBootstrap, config,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("generating garage children configs: %w", err)
|
||||
}
|
||||
|
||||
pmuxConfig := pmuxlib.Config{
|
||||
Processes: append(
|
||||
[]pmuxlib.ProcessConfig{
|
||||
nebulaPmuxProcConfig,
|
||||
dnsmasqPmuxProcConfig,
|
||||
},
|
||||
garagePmuxProcConfigs...,
|
||||
),
|
||||
}
|
||||
|
||||
pmuxCtx, pmuxCancelFn := context.WithCancel(context.Background())
|
||||
|
||||
d := &daemon{
|
||||
logger: logger,
|
||||
config: config,
|
||||
hostBootstrap: hostBootstrap,
|
||||
opts: *opts,
|
||||
pmuxCancelFn: pmuxCancelFn,
|
||||
pmuxStoppedCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(d.pmuxStoppedCh)
|
||||
pmuxlib.Run(pmuxCtx, d.opts.Stdout, d.opts.Stderr, pmuxConfig)
|
||||
}()
|
||||
|
||||
if initErr := d.postPmuxInit(ctx); initErr != nil {
|
||||
logger.Warn(ctx, "failed to initialize daemon, shutting down child processes", err)
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
||||
defer cancel()
|
||||
if err := d.Shutdown(shutdownCtx); err != nil {
|
||||
panic(fmt.Sprintf(
|
||||
"failed to shut down child processes after initialization"+
|
||||
" error, there may be zombie children leftover."+
|
||||
" Original error: %v",
|
||||
initErr,
|
||||
))
|
||||
}
|
||||
|
||||
return nil, initErr
|
||||
}
|
||||
|
||||
return d, nil
|
||||
}
|
||||
|
||||
func (d *daemon) postPmuxInit(ctx context.Context) error {
|
||||
d.logger.Info(ctx, "waiting for nebula VPN to come online")
|
||||
if err := waitForNebula(ctx, d.hostBootstrap); err != nil {
|
||||
return fmt.Errorf("waiting for nebula to start: %w", err)
|
||||
}
|
||||
|
||||
d.logger.Info(ctx, "waiting for garage instances to come online")
|
||||
if err := d.waitForGarage(ctx); err != nil {
|
||||
return fmt.Errorf("waiting for garage to start: %w", err)
|
||||
}
|
||||
|
||||
if len(d.config.Storage.Allocations) > 0 {
|
||||
|
||||
err := until(ctx, func(ctx context.Context) error {
|
||||
err := garageApplyLayout(ctx, d.logger, d.hostBootstrap, d.config)
|
||||
if err != nil {
|
||||
d.logger.Error(ctx, "applying garage layout", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("applying garage layout: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if !d.opts.SkipHostBootstrapPush {
|
||||
if err := until(ctx, func(ctx context.Context) error {
|
||||
if err := d.hostBootstrap.PutGarageBoostrapHost(ctx); err != nil {
|
||||
d.logger.Error(ctx, "updating host info in garage", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return fmt.Errorf("updating host info in garage: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoadConfig loads the daemon config from userConfigPath, merges it with
|
||||
// the default found in the appDirPath, and returns the result.
|
||||
//
|
||||
// If userConfigPath is not given then the default is loaded and returned.
|
||||
func LoadConfig(
|
||||
appDirPath, userConfigPath string,
|
||||
) (
|
||||
Config, error,
|
||||
) {
|
||||
|
||||
defaultConfigPath := defaultConfigPath(appDirPath)
|
||||
|
||||
var fullDaemon map[string]interface{}
|
||||
|
||||
if err := yamlutil.LoadYamlFile(&fullDaemon, defaultConfigPath); err != nil {
|
||||
return Config{}, fmt.Errorf("parsing default daemon config file: %w", err)
|
||||
func (d *daemon) Shutdown(ctx context.Context) error {
|
||||
d.pmuxCancelFn()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-d.pmuxStoppedCh:
|
||||
return nil
|
||||
}
|
||||
|
||||
if userConfigPath != "" {
|
||||
|
||||
var daemonConfig map[string]interface{}
|
||||
if err := yamlutil.LoadYamlFile(&daemonConfig, userConfigPath); err != nil {
|
||||
return Config{}, fmt.Errorf("parsing %q: %w", userConfigPath, err)
|
||||
}
|
||||
|
||||
err := mergo.Merge(&fullDaemon, daemonConfig, mergo.WithOverride)
|
||||
if err != nil {
|
||||
return Config{}, fmt.Errorf("merging contents of file %q: %w", userConfigPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
fullDaemonB, err := yaml.Marshal(fullDaemon)
|
||||
|
||||
if err != nil {
|
||||
return Config{}, fmt.Errorf("yaml marshaling: %w", err)
|
||||
}
|
||||
|
||||
var config Config
|
||||
if err := yaml.Unmarshal(fullDaemonB, &config); err != nil {
|
||||
return Config{}, fmt.Errorf("yaml unmarshaling back into Config struct: %w", err)
|
||||
}
|
||||
|
||||
config.fillDefaults()
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
18
go/daemon/jigs.go
Normal file
18
go/daemon/jigs.go
Normal file
@ -0,0 +1,18 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
func until(ctx context.Context, fn func(context.Context) error) error {
|
||||
for {
|
||||
if err := fn(ctx); err == nil {
|
||||
return nil
|
||||
} else if ctxErr := ctx.Err(); ctxErr != nil {
|
||||
return ctxErr
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user