Centralize creation of garage admin client logic into Children
This commit is contained in:
parent
9e508ef4e2
commit
4bce0e3fa0
@ -3,20 +3,30 @@
|
||||
package children
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
|
||||
"code.betamike.com/micropelago/pmux/pmuxlib"
|
||||
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
|
||||
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"isle/bootstrap"
|
||||
"isle/daemon/daecommon"
|
||||
"isle/garage"
|
||||
"isle/secrets"
|
||||
"isle/toolkit"
|
||||
)
|
||||
|
||||
type garageProc struct {
|
||||
*pmuxlib.Process
|
||||
alloc daecommon.ConfigStorageAllocation
|
||||
adminAddr string
|
||||
}
|
||||
|
||||
// Opts are optional fields which can be passed into New. A nil value is
|
||||
// equivalent to a zero value.
|
||||
type Opts struct {
|
||||
@ -48,7 +58,7 @@ type Children struct {
|
||||
|
||||
nebulaProc *pmuxlib.Process
|
||||
dnsmasqProc *pmuxlib.Process
|
||||
garageProcs map[string]*pmuxlib.Process
|
||||
garageProcs map[string]garageProc
|
||||
}
|
||||
|
||||
// New initializes and returns a Children instance. If initialization fails an
|
||||
@ -133,9 +143,8 @@ func New(
|
||||
if err := waitForGarage(
|
||||
ctx,
|
||||
c.logger,
|
||||
networkConfig,
|
||||
garageAdminToken,
|
||||
hostBootstrap,
|
||||
c.garageProcs,
|
||||
opts.GarageNewCluster,
|
||||
); err != nil {
|
||||
logger.Warn(ctx, "Failed waiting for garage processes to initialize, shutting down child processes", err)
|
||||
@ -205,6 +214,8 @@ func (c *Children) reloadGarage(
|
||||
return nil
|
||||
}
|
||||
|
||||
thisHost := hostBootstrap.ThisHost()
|
||||
|
||||
var anyCreated bool
|
||||
for _, alloc := range allocs {
|
||||
var (
|
||||
@ -237,18 +248,21 @@ func (c *Children) reloadGarage(
|
||||
anyCreated = true
|
||||
|
||||
c.logger.Info(ctx, "Garage config has been added, creating process")
|
||||
c.garageProcs[procName] = garagePmuxProc(
|
||||
c.garageProcs[procName] = garageProc{
|
||||
Process: garagePmuxProc(
|
||||
ctx, c.logger, c.binDirPath, procName, childConfigPath,
|
||||
)
|
||||
),
|
||||
alloc: alloc,
|
||||
adminAddr: garageAllocAdminAddr(thisHost, alloc),
|
||||
}
|
||||
}
|
||||
|
||||
if anyCreated {
|
||||
if err := waitForGarage(
|
||||
ctx,
|
||||
c.logger,
|
||||
networkConfig,
|
||||
c.garageAdminToken,
|
||||
hostBootstrap,
|
||||
c.garageProcs,
|
||||
false,
|
||||
); err != nil {
|
||||
return fmt.Errorf("waiting for garage to start: %w", err)
|
||||
@ -282,6 +296,46 @@ func (c *Children) Reload(
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// GarageAdminClient returns an admin client for an active local garage process,
|
||||
// or false if there are no garage processes.
|
||||
func (c *Children) GarageAdminClient() (*garage.AdminClient, bool) {
|
||||
if len(c.garageProcs) == 0 {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
procsSlice := maps.Values(c.garageProcs)
|
||||
slices.SortFunc(procsSlice, func(a, b garageProc) int {
|
||||
return cmp.Compare(a.alloc.RPCPort, b.alloc.RPCPort)
|
||||
})
|
||||
|
||||
return garage.NewAdminClient(
|
||||
garageAdminClientLogger(c.logger),
|
||||
procsSlice[0].adminAddr,
|
||||
c.garageAdminToken,
|
||||
), true
|
||||
}
|
||||
|
||||
// GarageAdminClientForAlloc returns an admin client for a particular allocation
|
||||
// which has a currently running garage instance, or false if there the
|
||||
// allocation has no currently running instance.
|
||||
func (c *Children) GarageAdminClientForAlloc(
|
||||
alloc daecommon.ConfigStorageAllocation,
|
||||
) (
|
||||
*garage.AdminClient, bool,
|
||||
) {
|
||||
procName := garagePmuxProcName(alloc)
|
||||
proc, ok := c.garageProcs[procName]
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return garage.NewAdminClient(
|
||||
garageAdminClientLogger(c.logger),
|
||||
proc.adminAddr,
|
||||
c.garageAdminToken,
|
||||
), true
|
||||
}
|
||||
|
||||
// Shutdown blocks until all child processes have gracefully shut themselves
|
||||
// down.
|
||||
func (c *Children) Shutdown() {
|
||||
|
@ -21,43 +21,44 @@ func garageAdminClientLogger(logger *mlog.Logger) *mlog.Logger {
|
||||
return logger.WithNamespace("garageAdminClient")
|
||||
}
|
||||
|
||||
func garageAllocAdminAddr(
|
||||
host bootstrap.Host, alloc daecommon.ConfigStorageAllocation,
|
||||
) string {
|
||||
return net.JoinHostPort(host.IP().String(), strconv.Itoa(alloc.AdminPort))
|
||||
}
|
||||
|
||||
func waitForGarage(
|
||||
ctx context.Context,
|
||||
logger *mlog.Logger,
|
||||
networkConfig daecommon.NetworkConfig,
|
||||
adminToken string,
|
||||
hostBootstrap bootstrap.Bootstrap,
|
||||
procs map[string]garageProc,
|
||||
newCluster bool,
|
||||
) error {
|
||||
|
||||
allocs := networkConfig.Storage.Allocations
|
||||
if len(allocs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
adminClientLogger := garageAdminClientLogger(logger)
|
||||
|
||||
for _, alloc := range allocs {
|
||||
|
||||
adminAddr := net.JoinHostPort(
|
||||
hostBootstrap.ThisHost().IP().String(),
|
||||
strconv.Itoa(alloc.AdminPort),
|
||||
)
|
||||
|
||||
for _, proc := range procs {
|
||||
adminClient := garage.NewAdminClient(
|
||||
adminClientLogger, adminAddr, adminToken,
|
||||
adminClientLogger, proc.adminAddr, adminToken,
|
||||
)
|
||||
|
||||
ctx := mctx.Annotate(
|
||||
ctx, "garageAdminAddr", adminAddr, "garageDataPath", alloc.DataPath,
|
||||
ctx,
|
||||
"garageAdminAddr", proc.adminAddr,
|
||||
"garageDataPath", proc.alloc.DataPath,
|
||||
)
|
||||
|
||||
logger.Info(ctx, "Waiting for garage instance to be healthy")
|
||||
|
||||
if err := adminClient.Wait(ctx, !newCluster); err != nil {
|
||||
return fmt.Errorf("waiting for garage instance %q to start up: %w", adminAddr, err)
|
||||
}
|
||||
|
||||
err := adminClient.Wait(ctx, !newCluster)
|
||||
adminClient.Close()
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf(
|
||||
"waiting for garage instance %q to start up: %w",
|
||||
proc.adminAddr,
|
||||
err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -152,11 +153,12 @@ func garagePmuxProcs(
|
||||
adminToken string,
|
||||
hostBootstrap bootstrap.Bootstrap,
|
||||
) (
|
||||
map[string]*pmuxlib.Process, error,
|
||||
map[string]garageProc, error,
|
||||
) {
|
||||
var (
|
||||
pmuxProcs = map[string]*pmuxlib.Process{}
|
||||
procs = map[string]garageProc{}
|
||||
allocs = networkConfig.Storage.Allocations
|
||||
thisHost = hostBootstrap.ThisHost()
|
||||
)
|
||||
|
||||
if len(allocs) > 0 && rpcSecret == "" {
|
||||
@ -176,10 +178,14 @@ func garagePmuxProcs(
|
||||
}
|
||||
|
||||
procName := garagePmuxProcName(alloc)
|
||||
pmuxProcs[procName] = garagePmuxProc(
|
||||
procs[procName] = garageProc{
|
||||
Process: garagePmuxProc(
|
||||
ctx, logger, binDirPath, procName, childConfigPath,
|
||||
)
|
||||
),
|
||||
alloc: alloc,
|
||||
adminAddr: garageAllocAdminAddr(thisHost, alloc),
|
||||
}
|
||||
}
|
||||
|
||||
return pmuxProcs, nil
|
||||
return procs, nil
|
||||
}
|
||||
|
@ -12,9 +12,7 @@ import (
|
||||
"isle/nebula"
|
||||
"isle/secrets"
|
||||
"isle/toolkit"
|
||||
"net"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
|
||||
@ -53,39 +51,14 @@ func getGarageClientParams(
|
||||
}, nil
|
||||
}
|
||||
|
||||
func garageAdminClientLogger(logger *mlog.Logger) *mlog.Logger {
|
||||
return logger.WithNamespace("garageAdminClient")
|
||||
}
|
||||
|
||||
// newGarageAdminClient will return an AdminClient for a local garage instance,
|
||||
// or it will _panic_ if there is no local instance configured.
|
||||
func newGarageAdminClient(
|
||||
logger *mlog.Logger,
|
||||
networkConfig daecommon.NetworkConfig,
|
||||
adminToken string,
|
||||
host bootstrap.Host,
|
||||
) *garage.AdminClient {
|
||||
return garage.NewAdminClient(
|
||||
garageAdminClientLogger(logger),
|
||||
net.JoinHostPort(
|
||||
host.IP().String(),
|
||||
strconv.Itoa(networkConfig.Storage.Allocations[0].AdminPort),
|
||||
),
|
||||
adminToken,
|
||||
)
|
||||
}
|
||||
|
||||
func garageApplyLayout(
|
||||
ctx context.Context,
|
||||
logger *mlog.Logger,
|
||||
networkConfig daecommon.NetworkConfig,
|
||||
adminToken string,
|
||||
adminClient *garage.AdminClient,
|
||||
prevHost, currHost bootstrap.Host,
|
||||
) error {
|
||||
var (
|
||||
adminClient = newGarageAdminClient(
|
||||
logger, networkConfig, adminToken, currHost,
|
||||
)
|
||||
hostName = currHost.Name
|
||||
allocs = networkConfig.Storage.Allocations
|
||||
roles = make([]garage.Role, len(allocs))
|
||||
@ -121,14 +94,11 @@ func garageInitializeGlobalBucket(
|
||||
ctx context.Context,
|
||||
logger *mlog.Logger,
|
||||
networkConfig daecommon.NetworkConfig,
|
||||
adminToken string,
|
||||
adminClient *garage.AdminClient,
|
||||
host bootstrap.Host,
|
||||
) (
|
||||
garage.S3APICredentials, error,
|
||||
) {
|
||||
adminClient := newGarageAdminClient(logger, networkConfig, adminToken, host)
|
||||
defer adminClient.Close()
|
||||
|
||||
creds, err := adminClient.CreateS3APICredentials(
|
||||
ctx, garage.GlobalBucketS3APICredentialsName,
|
||||
)
|
||||
@ -303,22 +273,8 @@ func removeGarageBootstrapHost(
|
||||
func garageWaitForAlloc(
|
||||
ctx context.Context,
|
||||
logger *mlog.Logger,
|
||||
alloc daecommon.ConfigStorageAllocation,
|
||||
adminToken string,
|
||||
host bootstrap.Host,
|
||||
adminClient *garage.AdminClient,
|
||||
) error {
|
||||
var (
|
||||
hostIP = host.IP().String()
|
||||
adminClient = garage.NewAdminClient(
|
||||
garageAdminClientLogger(logger),
|
||||
net.JoinHostPort(hostIP, strconv.Itoa(alloc.AdminPort)),
|
||||
adminToken,
|
||||
)
|
||||
)
|
||||
|
||||
defer adminClient.Close()
|
||||
ctx = mctx.WithAnnotator(ctx, alloc)
|
||||
|
||||
logger.Info(ctx, "Checking if garage instance has synced bucket list")
|
||||
if err := toolkit.UntilTrue(
|
||||
ctx,
|
||||
|
@ -524,15 +524,22 @@ func (n *network) postChildrenInit(
|
||||
n.l.RLock()
|
||||
defer n.l.RUnlock()
|
||||
|
||||
thisHost := n.currBootstrap.ThisHost()
|
||||
var (
|
||||
thisHost = n.currBootstrap.ThisHost()
|
||||
garageAdminClient, hasGarage = n.children.GarageAdminClient()
|
||||
)
|
||||
|
||||
if len(thisHost.Garage.Instances) > 0 {
|
||||
if hasGarage {
|
||||
defer garageAdminClient.Close()
|
||||
}
|
||||
|
||||
if hasGarage {
|
||||
n.logger.Info(ctx, "Applying garage layout")
|
||||
if err := garageApplyLayout(
|
||||
ctx,
|
||||
n.logger,
|
||||
n.networkConfig,
|
||||
n.opts.GarageAdminToken,
|
||||
garageAdminClient,
|
||||
prevThisHost, thisHost,
|
||||
); err != nil {
|
||||
return fmt.Errorf("applying garage layout: %w", err)
|
||||
@ -545,7 +552,7 @@ func (n *network) postChildrenInit(
|
||||
ctx,
|
||||
n.logger,
|
||||
n.networkConfig,
|
||||
n.opts.GarageAdminToken,
|
||||
garageAdminClient,
|
||||
thisHost,
|
||||
)
|
||||
if err != nil {
|
||||
@ -561,8 +568,16 @@ func (n *network) postChildrenInit(
|
||||
}
|
||||
|
||||
for _, alloc := range n.networkConfig.Storage.Allocations {
|
||||
garageAdminClient, ok := n.children.GarageAdminClientForAlloc(alloc)
|
||||
if !ok {
|
||||
return fmt.Errorf("no garage instance created for %+v", alloc)
|
||||
}
|
||||
defer garageAdminClient.Close()
|
||||
|
||||
ctx := mctx.WithAnnotator(ctx, alloc)
|
||||
|
||||
if err := garageWaitForAlloc(
|
||||
ctx, n.logger, alloc, n.opts.GarageAdminToken, thisHost,
|
||||
ctx, n.logger, garageAdminClient,
|
||||
); err != nil {
|
||||
return fmt.Errorf(
|
||||
"waiting for alloc %+v to initialize: %w", alloc, err,
|
||||
|
@ -369,12 +369,8 @@ func (nh *integrationHarnessNetwork) getBootstrap(
|
||||
func (nh *integrationHarnessNetwork) garageAdminClient(
|
||||
t *testing.T,
|
||||
) *garage.AdminClient {
|
||||
c := newGarageAdminClient(
|
||||
nh.logger,
|
||||
nh.getConfig(t),
|
||||
nh.opts.GarageAdminToken,
|
||||
nh.getBootstrap(t).ThisHost(),
|
||||
)
|
||||
c, ok := nh.Network.(*network).children.GarageAdminClient()
|
||||
require.True(t, ok)
|
||||
t.Cleanup(func() { assert.NoError(t, c.Close()) })
|
||||
return c
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user