Use GLM for garage layout management

This commit is contained in:
Brian Picciano 2025-01-07 15:40:50 +01:00
parent 24ac619798
commit 92802da2dd
14 changed files with 670 additions and 470 deletions

View File

@ -124,10 +124,12 @@ func (h Host) GarageNodes() []garage.RemoteNode {
var nodes []garage.RemoteNode var nodes []garage.RemoteNode
for _, instance := range h.Garage.Instances { for _, instance := range h.Garage.Instances {
nodes = append(nodes, garage.RemoteNode{ nodes = append(nodes, garage.RemoteNode{
ID: instance.ID, Node: garage.Node{
IP: h.IP().String(), IP: h.IP().String(),
RPCPort: instance.RPCPort, RPCPort: instance.RPCPort,
S3APIPort: instance.S3APIPort, S3APIPort: instance.S3APIPort,
},
ID: instance.ID,
}) })
} }
return nodes return nodes

View File

@ -17,6 +17,7 @@ import (
"isle/bootstrap" "isle/bootstrap"
"isle/daemon/daecommon" "isle/daemon/daecommon"
"isle/garage" "isle/garage"
"isle/garage/garagesrv"
"isle/secrets" "isle/secrets"
"isle/toolkit" "isle/toolkit"
) )
@ -33,12 +34,28 @@ type Opts struct {
// GarageNewCluster should be true if the garage instances being started // GarageNewCluster should be true if the garage instances being started
// are the first instances in a cluster which is being created. // are the first instances in a cluster which is being created.
GarageNewCluster bool GarageNewCluster bool
// GarageBootstrapPeers will be used as the set of peers each garage
// instance should use to find the rest of the garage cluster.
//
// Defaults to peer information contained in the bootstrap hosts.
GarageBootstrapPeers []garage.RemoteNode
// DEPRECATED can be used to manually set the db engine used by garage for
// new allocations. If not given then garagesrv.DBEngineSqlite will be used
// for new allocations.
GarageDefaultDBEngine garagesrv.DBEngine
} }
func (o *Opts) withDefaults() *Opts { func (o *Opts) withDefaults() *Opts {
if o == nil { if o == nil {
o = new(Opts) o = new(Opts)
} }
if o.GarageDefaultDBEngine == "" {
o.GarageDefaultDBEngine = garageDefaultDBEngine
}
return o return o
} }
@ -53,6 +70,7 @@ type Children struct {
runtimeDir toolkit.Dir runtimeDir toolkit.Dir
garageAdminToken string garageAdminToken string
nebulaDeviceNamer *NebulaDeviceNamer nebulaDeviceNamer *NebulaDeviceNamer
opts *Opts
garageRPCSecret string garageRPCSecret string
@ -91,6 +109,7 @@ func New(
runtimeDir: runtimeDir, runtimeDir: runtimeDir,
garageAdminToken: garageAdminToken, garageAdminToken: garageAdminToken,
nebulaDeviceNamer: nebulaDeviceNamer, nebulaDeviceNamer: nebulaDeviceNamer,
opts: opts,
garageRPCSecret: garageRPCSecret, garageRPCSecret: garageRPCSecret,
} }
@ -125,6 +144,11 @@ func New(
return nil, fmt.Errorf("starting dnsmasq: %w", err) return nil, fmt.Errorf("starting dnsmasq: %w", err)
} }
garageBootstrapPeers := opts.GarageBootstrapPeers
if garageBootstrapPeers == nil {
garageBootstrapPeers = hostBootstrap.GarageNodes()
}
if c.garageProcs, err = garagePmuxProcs( if c.garageProcs, err = garagePmuxProcs(
ctx, ctx,
c.logger, c.logger,
@ -134,6 +158,8 @@ func New(
networkConfig, networkConfig,
garageAdminToken, garageAdminToken,
hostBootstrap, hostBootstrap,
garageBootstrapPeers,
c.opts.GarageDefaultDBEngine,
); err != nil { ); err != nil {
logger.Warn(ctx, "Failed to start garage processes, shutting down child processes", err) logger.Warn(ctx, "Failed to start garage processes, shutting down child processes", err)
c.Shutdown() c.Shutdown()
@ -209,24 +235,28 @@ func (c *Children) reloadGarage(
networkConfig daecommon.NetworkConfig, networkConfig daecommon.NetworkConfig,
hostBootstrap bootstrap.Bootstrap, hostBootstrap bootstrap.Bootstrap,
) error { ) error {
allocs := networkConfig.Storage.Allocations
if len(allocs) == 0 {
return nil
}
thisHost := hostBootstrap.ThisHost() var (
allocs = networkConfig.Storage.Allocations
thisHost = hostBootstrap.ThisHost()
anyStarted bool
allocsM = map[daecommon.ConfigStorageAllocation]struct{}{}
)
var anyCreated bool
for _, alloc := range allocs { for _, alloc := range allocs {
allocsM[alloc] = struct{}{}
var ( var (
procName = garagePmuxProcName(alloc) procName = garagePmuxProcName(alloc)
ctx = mctx.Annotate( ctx = mctx.Annotate(
ctx, mctx.WithAnnotator(ctx, alloc),
"garageProcName", procName, "garageProcName", procName,
"garageDataPath", alloc.DataPath,
) )
) )
// Rewrite the child config always, even if we don't always restart it.
// If nothing else this will capture any changes to the bootstrap nodes,
// which will be useful if garage gets restarted for any reason.
childConfigPath, err := garageWriteChildConfig( childConfigPath, err := garageWriteChildConfig(
ctx, ctx,
c.logger, c.logger,
@ -234,20 +264,31 @@ func (c *Children) reloadGarage(
c.runtimeDir.Path, c.runtimeDir.Path,
c.garageAdminToken, c.garageAdminToken,
hostBootstrap, hostBootstrap,
hostBootstrap.GarageNodes(),
alloc, alloc,
garageDefaultDBEngine,
) )
if err != nil { if err != nil {
return fmt.Errorf("writing child config file for alloc %+v: %w", alloc, err) return fmt.Errorf("writing child config file for alloc %+v: %w", alloc, err)
} }
if _, ok := c.garageProcs[procName]; ok { if proc, ok := c.garageProcs[procName]; ok {
c.logger.Info(ctx, "Garage instance already exists") if proc.alloc == alloc {
c.logger.Info(ctx, "No changes to storage allocation, leaving garage process as-is")
continue continue
} }
anyCreated = true c.logger.Info(ctx, "Storage allocation modified, restarting garage process")
proc.Restart()
anyStarted = true
c.logger.Info(ctx, "Garage config has been added, creating process") proc.alloc = alloc
c.garageProcs[procName] = proc
continue
}
c.logger.Info(ctx, "New storage allocation, creating garage process")
c.garageProcs[procName] = garageProc{ c.garageProcs[procName] = garageProc{
Process: garagePmuxProc( Process: garagePmuxProc(
ctx, c.logger, c.binDirPath, procName, childConfigPath, ctx, c.logger, c.binDirPath, procName, childConfigPath,
@ -255,9 +296,26 @@ func (c *Children) reloadGarage(
alloc: alloc, alloc: alloc,
adminAddr: garageAllocAdminAddr(thisHost, alloc), adminAddr: garageAllocAdminAddr(thisHost, alloc),
} }
anyStarted = true
} }
if anyCreated { for procName, proc := range c.garageProcs {
if _, ok := allocsM[proc.alloc]; ok {
continue
}
ctx := mctx.Annotate(
mctx.WithAnnotator(ctx, proc.alloc),
"garageProcName", procName,
)
c.logger.Info(ctx, "Storage allocation removed, stopping garage process")
proc.Stop()
delete(c.garageProcs, procName)
}
if anyStarted {
if err := waitForGarage( if err := waitForGarage(
ctx, ctx,
c.logger, c.logger,
@ -296,6 +354,21 @@ func (c *Children) Reload(
return errors.Join(errs...) return errors.Join(errs...)
} }
// ActiveStorageAllocations returns the storage allocations which currently have
// active garage instances.
func (c *Children) ActiveStorageAllocations() []daecommon.ConfigStorageAllocation {
allocs := make([]daecommon.ConfigStorageAllocation, 0, len(c.garageProcs))
for _, proc := range c.garageProcs {
allocs = append(allocs, proc.alloc)
}
slices.SortFunc(allocs, func(a, b daecommon.ConfigStorageAllocation) int {
return cmp.Compare(a.RPCPort, b.RPCPort)
})
return allocs
}
// GarageAdminClient returns an admin client for an active local garage process, // GarageAdminClient returns an admin client for an active local garage process,
// or false if there are no garage processes. // or false if there are no garage processes.
func (c *Children) GarageAdminClient() (*garage.AdminClient, bool) { func (c *Children) GarageAdminClient() (*garage.AdminClient, bool) {

View File

@ -17,6 +17,10 @@ import (
"dev.mediocregopher.com/mediocre-go-lib.git/mlog" "dev.mediocregopher.com/mediocre-go-lib.git/mlog"
) )
const (
garageDefaultDBEngine = garagesrv.DBEngineSqlite
)
func garageAdminClientLogger(logger *mlog.Logger) *mlog.Logger { func garageAdminClientLogger(logger *mlog.Logger) *mlog.Logger {
return logger.WithNamespace("garageAdminClient") return logger.WithNamespace("garageAdminClient")
} }
@ -42,9 +46,8 @@ func waitForGarage(
) )
ctx := mctx.Annotate( ctx := mctx.Annotate(
ctx, mctx.WithAnnotator(ctx, proc.alloc),
"garageAdminAddr", proc.adminAddr, "garageAdminAddr", proc.adminAddr,
"garageDataPath", proc.alloc.DataPath,
) )
logger.Info(ctx, "Waiting for garage instance to be healthy") logger.Info(ctx, "Waiting for garage instance to be healthy")
@ -70,17 +73,16 @@ func garageWriteChildConfig(
logger *mlog.Logger, logger *mlog.Logger,
rpcSecret, runtimeDirPath, adminToken string, rpcSecret, runtimeDirPath, adminToken string,
hostBootstrap bootstrap.Bootstrap, hostBootstrap bootstrap.Bootstrap,
bootstrapPeers []garage.RemoteNode,
alloc daecommon.ConfigStorageAllocation, alloc daecommon.ConfigStorageAllocation,
defaultDBEngine garagesrv.DBEngine,
) ( ) (
string, error, string, error,
) { ) {
var ( var (
thisHost = hostBootstrap.ThisHost() thisHost = hostBootstrap.ThisHost()
id = daecommon.BootstrapGarageHostForAlloc(thisHost, alloc).ID
node = garage.LocalNode{ node = garage.LocalNode{
RemoteNode: garage.RemoteNode{ Node: garage.Node{
ID: id,
IP: thisHost.IP().String(), IP: thisHost.IP().String(),
RPCPort: alloc.RPCPort, RPCPort: alloc.RPCPort,
S3APIPort: alloc.S3APIPort, S3APIPort: alloc.S3APIPort,
@ -93,7 +95,7 @@ func garageWriteChildConfig(
) )
) )
dbEngine, err := garagesrv.GetDBEngine(alloc.MetaPath) dbEngine, err := garagesrv.GetDBEngine(alloc.MetaPath, defaultDBEngine)
if err != nil { if err != nil {
return "", fmt.Errorf("getting alloc db engine: %w", err) return "", fmt.Errorf("getting alloc db engine: %w", err)
} }
@ -111,7 +113,7 @@ func garageWriteChildConfig(
DBEngine: dbEngine, DBEngine: dbEngine,
LocalNode: node, LocalNode: node,
BootstrapPeers: hostBootstrap.GarageNodes(), BootstrapPeers: bootstrapPeers,
}, },
) )
@ -152,6 +154,8 @@ func garagePmuxProcs(
networkConfig daecommon.NetworkConfig, networkConfig daecommon.NetworkConfig,
adminToken string, adminToken string,
hostBootstrap bootstrap.Bootstrap, hostBootstrap bootstrap.Bootstrap,
bootstrapPeers []garage.RemoteNode,
defaultDBEngine garagesrv.DBEngine,
) ( ) (
map[string]garageProc, error, map[string]garageProc, error,
) { ) {
@ -171,7 +175,9 @@ func garagePmuxProcs(
logger, logger,
rpcSecret, runtimeDirPath, adminToken, rpcSecret, runtimeDirPath, adminToken,
hostBootstrap, hostBootstrap,
bootstrapPeers,
alloc, alloc,
defaultDBEngine,
) )
if err != nil { if err != nil {
return nil, fmt.Errorf("writing child config file for alloc %+v: %w", alloc, err) return nil, fmt.Errorf("writing child config file for alloc %+v: %w", alloc, err)

View File

@ -5,7 +5,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"isle/bootstrap"
"isle/toolkit" "isle/toolkit"
"isle/yamlutil" "isle/yamlutil"
"net" "net"
@ -325,18 +324,3 @@ func LoadConfig(userConfigPath string) (Config, error) {
err := yamlutil.LoadYamlFile(&config, userConfigPath) err := yamlutil.LoadYamlFile(&config, userConfigPath)
return config, err return config, err
} }
// BootstrapGarageHostForAlloc returns the bootstrap.GarageHostInstance which
// corresponds with the given alloc from the daemon config. This will panic if
// no associated instance can be found.
func BootstrapGarageHostForAlloc(
host bootstrap.Host, alloc ConfigStorageAllocation,
) bootstrap.GarageHostInstance {
for _, inst := range host.Garage.Instances {
if inst.RPCPort == alloc.RPCPort {
return inst
}
}
panic(fmt.Sprintf("could not find alloc %+v in the bootstrap data", alloc))
}

View File

@ -63,14 +63,11 @@ func applyNetworkConfigToBootstrap(
}, },
} }
if allocs := networkConfig.Storage.Allocations; len(allocs) > 0 { for _, alloc := range networkConfig.Storage.Allocations {
id, err := garagesrv.LoadAllocID(alloc.MetaPath)
for i, alloc := range allocs {
id, rpcPort, err := garagesrv.InitAlloc(alloc.MetaPath, alloc.RPCPort)
if err != nil { if err != nil {
return bootstrap.Bootstrap{}, fmt.Errorf( return bootstrap.Bootstrap{}, fmt.Errorf(
"initializing alloc at %q: %w", alloc.MetaPath, err, "getting ID of alloc at %q: %w", alloc.MetaPath, err,
) )
} }
@ -78,13 +75,10 @@ func applyNetworkConfigToBootstrap(
host.Garage.Instances, host.Garage.Instances,
bootstrap.GarageHostInstance{ bootstrap.GarageHostInstance{
ID: id, ID: id,
RPCPort: rpcPort, RPCPort: alloc.RPCPort,
S3APIPort: alloc.S3APIPort, S3APIPort: alloc.S3APIPort,
}, },
) )
allocs[i].RPCPort = rpcPort
}
} }
hostBootstrap.Hosts = maps.Clone(hostBootstrap.Hosts) hostBootstrap.Hosts = maps.Clone(hostBootstrap.Hosts)

View File

@ -9,9 +9,11 @@ import (
"isle/bootstrap" "isle/bootstrap"
"isle/daemon/daecommon" "isle/daemon/daecommon"
"isle/garage" "isle/garage"
"isle/garage/garagesrv"
"isle/nebula" "isle/nebula"
"isle/secrets" "isle/secrets"
"isle/toolkit" "isle/toolkit"
"net/netip"
"path/filepath" "path/filepath"
"time" "time"
@ -51,27 +53,47 @@ func getGarageClientParams(
}, nil }, nil
} }
func garageApplyLayout( func garageInitAllocs(
ctx context.Context, hostIP netip.Addr,
logger *mlog.Logger, allocs []daecommon.ConfigStorageAllocation,
networkConfig daecommon.NetworkConfig, ) (
adminClient *garage.AdminClient, []garage.RemoteNode, error,
prevHost, currHost bootstrap.Host, ) {
) error { peers := make([]garage.RemoteNode, len(allocs))
var ( for i, alloc := range allocs {
hostName = currHost.Name id, err := garagesrv.InitAlloc(alloc.MetaPath)
allocs = networkConfig.Storage.Allocations if err != nil {
roles = make([]garage.Role, len(allocs)) return nil, fmt.Errorf("initializing alloc %+v: %w", alloc, err)
roleIDs = map[string]struct{}{} }
idsToRemove = make([]string, 0, len(prevHost.Garage.Instances)) peers[i] = garage.RemoteNode{
Node: garage.Node{
IP: hostIP.String(),
RPCPort: alloc.RPCPort,
S3APIPort: alloc.S3APIPort,
},
ID: id,
}
}
return peers, nil
}
func garageAllocsToRoles(
host bootstrap.Host, allocs []daecommon.ConfigStorageAllocation,
) (
[]garage.Role, error,
) {
var (
hostName = host.Name
roles = make([]garage.Role, len(allocs))
) )
defer adminClient.Close()
for i, alloc := range allocs { for i, alloc := range allocs {
id := daecommon.BootstrapGarageHostForAlloc(currHost, alloc).ID id, err := garagesrv.LoadAllocID(alloc.MetaPath)
roleIDs[id] = struct{}{} if err != nil {
return nil, fmt.Errorf("getting ID of alloc %+v: %w", alloc, err)
}
roles[i] = garage.Role{ roles[i] = garage.Role{
ID: id, ID: id,
@ -81,21 +103,12 @@ func garageApplyLayout(
} }
} }
for _, prevInst := range prevHost.Garage.Instances { return roles, nil
if _, ok := roleIDs[prevInst.ID]; !ok {
idsToRemove = append(idsToRemove, prevInst.ID)
}
}
return adminClient.ApplyLayout(ctx, roles, idsToRemove)
} }
func garageInitializeGlobalBucket( func garageInitializeGlobalBucket(
ctx context.Context, ctx context.Context,
logger *mlog.Logger,
networkConfig daecommon.NetworkConfig,
adminClient *garage.AdminClient, adminClient *garage.AdminClient,
host bootstrap.Host,
) ( ) (
garage.S3APICredentials, error, garage.S3APICredentials, error,
) { ) {

View File

@ -14,7 +14,9 @@ import (
"isle/bootstrap" "isle/bootstrap"
"isle/daemon/children" "isle/daemon/children"
"isle/daemon/daecommon" "isle/daemon/daecommon"
"isle/daemon/network/glm"
"isle/garage" "isle/garage"
"isle/garage/garagesrv"
"isle/jsonutil" "isle/jsonutil"
"isle/nebula" "isle/nebula"
"isle/secrets" "isle/secrets"
@ -163,6 +165,9 @@ type Opts struct {
// testBlocker is used by tests to set blockpoints. // testBlocker is used by tests to set blockpoints.
testBlocker *toolkit.TestBlocker testBlocker *toolkit.TestBlocker
// DEPRECATED See corresponding field in [children.Opts]
garageDefaultDBEngine garagesrv.DBEngine
} }
func (o *Opts) withDefaults() *Opts { func (o *Opts) withDefaults() *Opts {
@ -188,6 +193,7 @@ type network struct {
opts *Opts opts *Opts
secretsStore secrets.Store secretsStore secrets.Store
garageLayoutMgr glm.GarageLayoutManager
l sync.RWMutex l sync.RWMutex
children *children.Children children *children.Children
@ -218,6 +224,7 @@ func newNetwork(
ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) ctx, cancel := context.WithCancel(context.WithoutCancel(ctx))
var ( var (
ip = currBootstrap.ThisHost().IP()
n = &network{ n = &network{
logger: logger, logger: logger,
envBinDirPath: envBinDirPath, envBinDirPath: envBinDirPath,
@ -225,6 +232,7 @@ func newNetwork(
stateDir: stateDir, stateDir: stateDir,
runtimeDir: runtimeDir, runtimeDir: runtimeDir,
opts: opts.withDefaults(), opts: opts.withDefaults(),
garageLayoutMgr: glm.NewGarageLayoutManager(stateDir, ip),
currBootstrap: currBootstrap, currBootstrap: currBootstrap,
workerCtx: ctx, workerCtx: ctx,
workerCancel: cancel, workerCancel: cancel,
@ -237,6 +245,12 @@ func newNetwork(
return nil, fmt.Errorf("resolving network config: %w", err) return nil, fmt.Errorf("resolving network config: %w", err)
} }
if err := n.garageLayoutMgr.Validate(
ctx, n.networkConfig.Storage.Allocations,
); err != nil {
return nil, ErrInvalidConfig.WithData(err.Error())
}
secretsDir, err := n.stateDir.MkChildDir("secrets", dirsMayExist) secretsDir, err := n.stateDir.MkChildDir("secrets", dirsMayExist)
if err != nil { if err != nil {
return nil, fmt.Errorf("creating secrets dir: %w", err) return nil, fmt.Errorf("creating secrets dir: %w", err)
@ -399,6 +413,11 @@ func (constructorsImpl) create(
) )
} }
err = n.garageLayoutMgr.Validate(ctx, n.networkConfig.Storage.Allocations)
if err != nil {
return nil, ErrInvalidConfig.WithData(err.Error())
}
err = daecommon.SetGarageRPCSecret(ctx, n.secretsStore, garageRPCSecret) err = daecommon.SetGarageRPCSecret(ctx, n.secretsStore, garageRPCSecret)
if err != nil { if err != nil {
return nil, fmt.Errorf("setting garage RPC secret: %w", err) return nil, fmt.Errorf("setting garage RPC secret: %w", err)
@ -411,6 +430,12 @@ func (constructorsImpl) create(
return nil, fmt.Errorf("setting nebula CA signing key secret: %w", err) return nil, fmt.Errorf("setting nebula CA signing key secret: %w", err)
} }
if err = n.garageLayoutMgr.SetActiveAllocations(
ctx, n.networkConfig.Storage.Allocations,
); err != nil {
return nil, fmt.Errorf("initializing GLM active allocations: %w", err)
}
if err := n.initialize(ctx, true); err != nil { if err := n.initialize(ctx, true); err != nil {
return nil, fmt.Errorf("initializing with bootstrap: %w", err) return nil, fmt.Errorf("initializing with bootstrap: %w", err)
} }
@ -418,10 +443,12 @@ func (constructorsImpl) create(
return n, nil return n, nil
} }
// preChildrenInit performs steps which are required prior to children being // updateBootstrapUnsafe updates both the locally saved bootstrap as well as
// initializes/reloaded. The lock must be held when this is called (if not being // this host's bootstrap host info in garage, first applying the network config
// called as part of initialize). // to the bootstrap host info.
func (n *network) preChildrenInit(ctx context.Context) error { //
// Must be called with the lock held.
func (n *network) updateBootstrapUnsafe(ctx context.Context) error {
var err error var err error
if n.currBootstrap, err = applyNetworkConfigToBootstrap( if n.currBootstrap, err = applyNetworkConfigToBootstrap(
n.networkConfig, n.currBootstrap, n.networkConfig, n.currBootstrap,
@ -436,61 +463,22 @@ func (n *network) preChildrenInit(ctx context.Context) error {
return fmt.Errorf("writing bootstrap to state dir: %w", err) return fmt.Errorf("writing bootstrap to state dir: %w", err)
} }
n.logger.Info(ctx, "Updating host info in garage")
if err := putGarageBoostrapHost(
ctx, n.secretsStore, n.currBootstrap,
); err != nil {
return fmt.Errorf("updating host info in garage: %w", err)
}
return nil return nil
} }
// postChildrenInit performs steps which are required after children have been // see comment on garageWaitForAlloc
// initialized/reloaded. The lock must be held when this is called (if not being func (n *network) garageWaitForAllocs(
// called as part of initialize). ctx context.Context, allocs []daecommon.ConfigStorageAllocation,
func (n *network) postChildrenInit(
ctx context.Context,
prevThisHost bootstrap.Host,
createGarageGlobalBucket bool,
) error { ) error {
var ( var errs []error
thisHost = n.currBootstrap.ThisHost() for _, alloc := range allocs {
garageAdminClient, hasGarage = n.children.GarageAdminClient()
)
if hasGarage {
defer garageAdminClient.Close()
}
if hasGarage {
n.logger.Info(ctx, "Applying garage layout")
if err := garageApplyLayout(
ctx,
n.logger,
n.networkConfig,
garageAdminClient,
prevThisHost, thisHost,
); err != nil {
return fmt.Errorf("applying garage layout: %w", err)
}
}
if createGarageGlobalBucket {
n.logger.Info(ctx, "Initializing garage shared global bucket")
garageGlobalBucketCreds, err := garageInitializeGlobalBucket(
ctx,
n.logger,
n.networkConfig,
garageAdminClient,
thisHost,
)
if err != nil {
return fmt.Errorf("initializing global bucket: %w", err)
}
err = daecommon.SetGarageS3APIGlobalBucketCredentials(
ctx, n.secretsStore, garageGlobalBucketCreds,
)
if err != nil {
return fmt.Errorf("storing global bucket creds: %w", err)
}
}
for _, alloc := range n.networkConfig.Storage.Allocations {
garageAdminClient, ok := n.children.GarageAdminClientForAlloc(alloc) garageAdminClient, ok := n.children.GarageAdminClientForAlloc(alloc)
if !ok { if !ok {
return fmt.Errorf("no garage instance created for %+v", alloc) return fmt.Errorf("no garage instance created for %+v", alloc)
@ -502,21 +490,99 @@ func (n *network) postChildrenInit(
if err := garageWaitForAlloc( if err := garageWaitForAlloc(
ctx, n.logger, garageAdminClient, ctx, n.logger, garageAdminClient,
); err != nil { ); err != nil {
return fmt.Errorf( errs = append(errs, fmt.Errorf(
"waiting for alloc %+v to initialize: %w", alloc, err, "waiting for alloc %+v to initialize: %w", alloc, err,
) ))
} }
} }
n.logger.Info(ctx, "Updating host info in garage") return errors.Join(errs...)
err := putGarageBoostrapHost(ctx, n.secretsStore, n.currBootstrap) }
// must hold lock to call this
func (n *network) glmStateTransitionUnsafe(ctx context.Context) error {
var knownNodes []garage.KnownNode
if adminClient, ok := n.children.GarageAdminClient(); ok {
defer adminClient.Close()
n.logger.Info(ctx, "Getting garage cluster status")
status, err := adminClient.Status(ctx)
if err != nil { if err != nil {
return fmt.Errorf("updating host info in garage: %w", err) return fmt.Errorf("getting garage cluster state: %w", err)
}
knownNodes = status.Nodes
}
n.logger.Info(ctx, "Calculating garage layout state transition")
stateTx, err := n.garageLayoutMgr.CalculateStateTransition(
ctx, knownNodes, n.networkConfig.Storage.Allocations,
)
if err != nil {
return fmt.Errorf("getting next state tx: %w", err)
}
childrenNetworkConfig := n.networkConfig
childrenNetworkConfig.Storage.Allocations = stateTx.ActiveAllocations()
n.logger.Info(ctx, "Reloading children with updated storage allocations")
err = n.children.Reload(ctx, childrenNetworkConfig, n.currBootstrap)
if err != nil {
return fmt.Errorf("reloading children: %w", err)
}
if adminClient, ok := n.children.GarageAdminClient(); ok {
defer adminClient.Close()
var (
host = n.currBootstrap.ThisHost()
// From garage's perspective a node is "removed" from the cluster by
// removing its role, which puts it into the draining state.
removeIDs = stateTx.DrainAllocationIDs()
)
addModifyRoles, err := garageAllocsToRoles(
host, stateTx.AddModifyAllocations,
)
if err != nil {
return fmt.Errorf("converting allocs to roles: %w", err)
}
n.logger.Info(ctx, "Applying state transition to garage layout")
if err := adminClient.ApplyLayout(
ctx, addModifyRoles, removeIDs,
); err != nil {
return fmt.Errorf("applying state tx to layout: %w", err)
}
} else {
n.logger.Info(ctx, "No garage instances running, no layout changes to make")
}
if err := n.garageWaitForAllocs(
ctx, n.networkConfig.Storage.Allocations,
); err != nil {
return fmt.Errorf(
"waiting for garage allocations to fully initialize: %w", err,
)
}
n.logger.Info(ctx, "Committing state transition")
if err = n.garageLayoutMgr.CommitStateTransition(
ctx, stateTx,
); err != nil {
return fmt.Errorf("commiting state tx: %w", err)
} }
return nil return nil
} }
func (n *network) glmStateTransition(ctx context.Context) error {
n.l.Lock()
defer n.l.Unlock()
return n.glmStateTransitionUnsafe(ctx)
}
func (n *network) reloadHosts(ctx context.Context) error { func (n *network) reloadHosts(ctx context.Context) error {
n.l.RLock() n.l.RLock()
currBootstrap := n.currBootstrap currBootstrap := n.currBootstrap
@ -541,8 +607,13 @@ func (n *network) reloadHosts(ctx context.Context) error {
return fmt.Errorf("writing bootstrap to state dir: %w", err) return fmt.Errorf("writing bootstrap to state dir: %w", err)
} }
childrenNetworkConfig, err := n.getChildrenNetworkConfig(ctx)
if err != nil {
return fmt.Errorf("getting network config for children: %w", err)
}
n.logger.Info(ctx, "Reloading child processes") n.logger.Info(ctx, "Reloading child processes")
err = n.children.Reload(ctx, n.networkConfig, n.currBootstrap) err = n.children.Reload(ctx, childrenNetworkConfig, n.currBootstrap)
if err != nil { if err != nil {
return fmt.Errorf( return fmt.Errorf(
"reloading child processes: %w", err, "reloading child processes: %w", err,
@ -566,8 +637,8 @@ func (n *network) periodically(
ticker := time.NewTicker(period) ticker := time.NewTicker(period)
defer ticker.Stop() defer ticker.Stop()
n.logger.Info(ctx, "Starting background job runner") n.logger.Info(ctx, "Starting background job worker")
defer n.logger.Info(ctx, "Stopping background job runner") defer n.logger.Info(ctx, "Stopping background job worker")
for { for {
select { select {
@ -575,7 +646,7 @@ func (n *network) periodically(
return return
case <-ticker.C: case <-ticker.C:
n.logger.Info(ctx, "Background job running") n.logger.Info(ctx, "Background job worker")
if err := fn(ctx); err != nil { if err := fn(ctx); err != nil {
n.logger.Error(ctx, "Background job failed", err) n.logger.Error(ctx, "Background job failed", err)
} }
@ -584,14 +655,102 @@ func (n *network) periodically(
}() }()
} }
func (n *network) initialize(ctx context.Context, isCreate bool) error { func (n *network) getChildrenNetworkConfig(
var ( ctx context.Context,
prevThisHost = n.currBootstrap.ThisHost() ) (
err error daecommon.NetworkConfig, error,
) ) {
childrenNetworkConfig := n.networkConfig
if err := n.preChildrenInit(ctx); err != nil { activeStorageAllocs, err := n.garageLayoutMgr.GetActiveAllocations(ctx)
return fmt.Errorf("performing pre-initialization: %w", err) if err != nil {
return daecommon.NetworkConfig{}, fmt.Errorf(
"getting active storage allocations: %w", err,
)
}
childrenNetworkConfig.Storage.Allocations = activeStorageAllocs
return childrenNetworkConfig, nil
}
func (n *network) initializePostChildren(
ctx context.Context, isCreate bool,
) error {
if !isCreate {
n.logger.Info(ctx, "Making any necessary changes to garage layout")
if err := n.glmStateTransitionUnsafe(ctx); err != nil {
return fmt.Errorf("performing garage layout transition: %w", err)
}
} else {
roles, err := garageAllocsToRoles(
n.currBootstrap.ThisHost(), n.networkConfig.Storage.Allocations,
)
if err != nil {
return fmt.Errorf("converting allocs to roles: %w", err)
}
garageAdminClient, _ := n.children.GarageAdminClient()
defer garageAdminClient.Close()
n.logger.Info(ctx, "Applying initial garage layout")
if err := garageAdminClient.ApplyLayout(ctx, roles, nil); err != nil {
return fmt.Errorf("applying initial garage layout: %w", err)
}
n.logger.Info(ctx, "Initializing garage shared global bucket")
garageGlobalBucketCreds, err := garageInitializeGlobalBucket(
ctx, garageAdminClient,
)
if err != nil {
return fmt.Errorf("initializing global bucket: %w", err)
}
if err = daecommon.SetGarageS3APIGlobalBucketCredentials(
ctx, n.secretsStore, garageGlobalBucketCreds,
); err != nil {
return fmt.Errorf("storing global bucket creds: %w", err)
}
n.logger.Info(ctx, "Waiting for garage instances to finish initializing")
if err := n.garageWaitForAllocs(
ctx, n.networkConfig.Storage.Allocations,
); err != nil {
return fmt.Errorf(
"waiting for garage allocations to fully initialize: %w", err,
)
}
}
if err := n.updateBootstrapUnsafe(ctx); err != nil {
return fmt.Errorf("updating bootstrap: %w", err)
}
// Do this now so that everything is stable before returning. This also
// serves a dual-purpose, as it makes sure that the above bootstrap update
// has propagated from the local garage instance, if any.
n.logger.Info(ctx, "Reloading hosts from network storage")
if err := n.reloadHosts(ctx); err != nil {
return fmt.Errorf("Reloading network bootstrap: %w", err)
}
return nil
}
func (n *network) initialize(ctx context.Context, isCreate bool) error {
childrenNetworkConfig, err := n.getChildrenNetworkConfig(ctx)
if err != nil {
return fmt.Errorf("getting network config for children: %w", err)
}
var garageBootstrapPeers []garage.RemoteNode
if isCreate {
garageBootstrapPeers, err = garageInitAllocs(
n.currBootstrap.ThisHost().IP(),
n.networkConfig.Storage.Allocations,
)
if err != nil {
return fmt.Errorf("initializing storage allocations: %w", err)
}
} }
n.logger.Info(ctx, "Creating child processes") n.logger.Info(ctx, "Creating child processes")
@ -600,37 +759,29 @@ func (n *network) initialize(ctx context.Context, isCreate bool) error {
n.logger.WithNamespace("children"), n.logger.WithNamespace("children"),
n.envBinDirPath, n.envBinDirPath,
n.secretsStore, n.secretsStore,
n.networkConfig, childrenNetworkConfig,
n.runtimeDir, n.runtimeDir,
n.opts.GarageAdminToken, n.opts.GarageAdminToken,
n.nebulaDeviceNamer, n.nebulaDeviceNamer,
n.currBootstrap, n.currBootstrap,
&children.Opts{ &children.Opts{
GarageNewCluster: isCreate, GarageNewCluster: isCreate,
GarageBootstrapPeers: garageBootstrapPeers,
GarageDefaultDBEngine: n.opts.garageDefaultDBEngine,
}, },
) )
if err != nil { if err != nil {
return fmt.Errorf("creating child processes: %w", err) return fmt.Errorf("creating child processes: %w", err)
} }
createGarageGlobalBucket := isCreate if err := n.initializePostChildren(ctx, isCreate); err != nil {
err = n.postChildrenInit(ctx, prevThisHost, createGarageGlobalBucket) n.logger.Error(ctx, "Failed to initialize Network, shutting down children", err)
if err != nil {
n.logger.Error(ctx, "Post-initialization failed, stopping child processes", err)
n.children.Shutdown() n.children.Shutdown()
return fmt.Errorf("performing post-initialization: %w", err) return err
}
// Do this now so that everything is stable before returning. This also
// serves a dual-purpose, as it makes sure that the PUT from the
// postChildrenInit above has propagated from the local garage instance, if
// there is one.
n.logger.Info(ctx, "Reloading hosts from network storage")
if err = n.reloadHosts(ctx); err != nil {
return fmt.Errorf("Reloading network bootstrap: %w", err)
} }
n.periodically("reloadHosts", n.reloadHosts, 3*time.Minute) n.periodically("reloadHosts", n.reloadHosts, 3*time.Minute)
n.periodically("glmStateTransition", n.glmStateTransition, 10*time.Minute)
return nil return nil
} }
@ -924,10 +1075,11 @@ func (n *network) SetConfig(
n.l.Lock() n.l.Lock()
defer n.l.Unlock() defer n.l.Unlock()
var ( if err := n.garageLayoutMgr.Validate(
prevThisHost = n.currBootstrap.ThisHost() ctx, config.Storage.Allocations,
err error ); err != nil {
) return ErrInvalidConfig.WithData(err.Error())
}
if _, err := loadStoreConfig(n.stateDir, &config); err != nil { if _, err := loadStoreConfig(n.stateDir, &config); err != nil {
return fmt.Errorf("storing new config: %w", err) return fmt.Errorf("storing new config: %w", err)
@ -935,18 +1087,13 @@ func (n *network) SetConfig(
n.networkConfig = config n.networkConfig = config
if err := n.preChildrenInit(ctx); err != nil { n.logger.Info(ctx, "Making any necessary changes to garage layout")
return fmt.Errorf("performing pre-initialization: %w", err) if err := n.glmStateTransitionUnsafe(ctx); err != nil {
return fmt.Errorf("performing garage layout transition: %w", err)
} }
n.logger.Info(ctx, "Reloading child processes") if err := n.updateBootstrapUnsafe(ctx); err != nil {
err = n.children.Reload(ctx, n.networkConfig, n.currBootstrap) return fmt.Errorf("updating bootstrap: %w", err)
if err != nil {
return fmt.Errorf("reloading child processes: %w", err)
}
if err := n.postChildrenInit(ctx, prevThisHost, false); err != nil {
return fmt.Errorf("performing post-initialization: %w", err)
} }
return nil return nil

View File

@ -15,7 +15,6 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestCreate(t *testing.T) { func TestCreate(t *testing.T) {
@ -59,6 +58,42 @@ func TestLoad(t *testing.T) {
assert.Equal(t, networkConfig, network.getConfig(t)) assert.Equal(t, networkConfig, network.getConfig(t))
}) })
t.Run("garage lmdb db engine", func(t *testing.T) {
var (
h = newIntegrationHarness(t)
network = h.createNetwork(t, "primus", &createNetworkOpts{
garageDefaultDBEngine: garagesrv.DBEngineLMDB,
})
metaPath = h.mkDir(t, "meta").Path
)
h.logger.Info(h.ctx, "Checking that garage is using the expected db engine")
garageConfig, err := os.ReadFile(
filepath.Join(network.runtimeDir.Path, "garage-3900.toml"),
)
assert.NoError(t, err)
assert.Contains(t,
string(garageConfig),
`db_engine = "`+garagesrv.DBEngineLMDB+`"`,
)
assert.NoFileExists(t, filepath.Join(metaPath, "db.sqlite"))
network.opts.garageDefaultDBEngine = ""
network.restart(t)
h.logger.Info(h.ctx, "Checking that garage is still using the expected db engine")
garageConfig, err = os.ReadFile(
filepath.Join(network.runtimeDir.Path, "garage-3900.toml"),
)
assert.NoError(t, err)
assert.Contains(t,
string(garageConfig),
`db_engine = "`+garagesrv.DBEngineLMDB+`"`,
)
assert.NoFileExists(t, filepath.Join(metaPath, "db.sqlite"))
})
} }
func TestJoin(t *testing.T) { func TestJoin(t *testing.T) {
@ -85,7 +120,7 @@ func TestJoin(t *testing.T) {
}) })
) )
t.Log("reloading primus' hosts") h.logger.Info(h.ctx, "reloading primus' hosts")
assert.NoError(t, primus.Network.(*network).reloadHosts(h.ctx)) assert.NoError(t, primus.Network.(*network).reloadHosts(h.ctx))
assert.Equal(t, primus.getHostsByName(t), secondus.getHostsByName(t)) assert.Equal(t, primus.getHostsByName(t), secondus.getHostsByName(t))
@ -94,57 +129,6 @@ func TestJoin(t *testing.T) {
secondus: 1, secondus: 1,
}) })
}) })
// Assert that if primus runs the orphan remover at the same moment that
// secondus is joining that the layout applied by secondus doesn't get
// overwritten.
t.Run("with alloc/remove orphans after garage layout applied", func(t *testing.T) {
t.Skip("This is currently expected to fail. Orphan removal is going to be reworked accordingly")
var (
h = newIntegrationHarness(t)
primus = h.createNetwork(t, "primus", nil)
primusAdminClient = primus.garageAdminClient(t)
secondusBlocker = toolkit.NewTestBlocker(t)
)
secondusBlocker.ExpectBlockpoint("garageLayoutApplied").On(
t, h.ctx, func() {
h.logger.Info(h.ctx, "Waiting for new layout to propagate to primus")
err := toolkit.UntilTrue(
h.ctx, h.logger, 1*time.Second, func() (bool, error) {
layout, err := primusAdminClient.GetLayout(h.ctx)
if err != nil {
return false, fmt.Errorf("getting layout: %w", err)
}
return len(layout.Roles) == 4, nil
},
)
if !assert.NoError(t, err) {
return
}
//h.logger.Info(h.ctx, "Calling removeOrphanGarageNodes")
//assert.NoError(
// t, primus.Network.(*network).removeOrphanGarageNodes(h.ctx),
//)
},
)
secondus := h.joinNetwork(t, primus, "secondus", &joinNetworkOpts{
networkConfigOpts: &networkConfigOpts{
numStorageAllocs: 1,
},
blocker: secondusBlocker,
})
assertGarageLayout(t, map[*integrationHarnessNetwork]int{
primus: 3,
secondus: 1,
})
})
} }
func TestNetwork_GetBootstrap(t *testing.T) { func TestNetwork_GetBootstrap(t *testing.T) {
@ -220,7 +204,7 @@ func TestNetwork_SetConfig(t *testing.T) {
assert.NoError(t, network.SetConfig(h.ctx, networkConfig)) assert.NoError(t, network.SetConfig(h.ctx, networkConfig))
t.Log("Checking that the Host information was updated") h.logger.Info(h.ctx, "Checking that the Host information was updated")
newHostsByName := network.getHostsByName(t) newHostsByName := network.getHostsByName(t)
newHost, ok := newHostsByName[network.hostName] newHost, ok := newHostsByName[network.hostName]
assert.True(t, ok) assert.True(t, ok)
@ -236,20 +220,20 @@ func TestNetwork_SetConfig(t *testing.T) {
RPCPort: 4901, RPCPort: 4901,
}, newAlloc) }, newAlloc)
t.Log("Checking that the bootstrap file was written with the new host config") h.logger.Info(h.ctx, "Checking that the bootstrap file was written with the new host config")
var storedBootstrap bootstrap.Bootstrap var storedBootstrap bootstrap.Bootstrap
assert.NoError(t, jsonutil.LoadFile( assert.NoError(t, jsonutil.LoadFile(
&storedBootstrap, bootstrap.StateDirPath(network.stateDir.Path), &storedBootstrap, bootstrap.StateDirPath(network.stateDir.Path),
)) ))
assert.Equal(t, newHostsByName, storedBootstrap.Hosts) assert.Equal(t, newHostsByName, storedBootstrap.Hosts)
t.Log("Checking that garage layout contains the new allocation") h.logger.Info(h.ctx, "Checking that garage layout contains the new allocation")
expRoles := allocsToRoles(network.hostName, allocs) expRoles := allocsToRoles(network.hostName, allocs)
layout, err := network.garageAdminClient(t).GetLayout(h.ctx) layout, err := network.garageAdminClient(t).GetLayout(h.ctx)
assert.NoError(t, err) assert.NoError(t, err)
assert.ElementsMatch(t, expRoles, layout.Roles) assert.ElementsMatch(t, expRoles, layout.Roles)
t.Log("Checking that garage is using the expected db engine") h.logger.Info(h.ctx, "Checking that garage is using the expected db engine")
garageConfig, err := os.ReadFile( garageConfig, err := os.ReadFile(
filepath.Join(network.runtimeDir.Path, "garage-4901.toml"), filepath.Join(network.runtimeDir.Path, "garage-4901.toml"),
) )
@ -261,46 +245,6 @@ func TestNetwork_SetConfig(t *testing.T) {
assert.FileExists(t, filepath.Join(metaPath, "db.sqlite")) assert.FileExists(t, filepath.Join(metaPath, "db.sqlite"))
}) })
t.Run("add storage alloc/lmdb", func(t *testing.T) {
var (
h = newIntegrationHarness(t)
network = h.createNetwork(t, "primus", nil)
networkConfig = network.getConfig(t)
dataPath = h.mkDir(t, "data").Path
metaPath = h.mkDir(t, "meta").Path
)
networkConfig.Storage.Allocations = append(
networkConfig.Storage.Allocations,
daecommon.ConfigStorageAllocation{
DataPath: dataPath,
MetaPath: metaPath,
Capacity: 1,
S3APIPort: 4900,
RPCPort: 4901,
AdminPort: 4902,
},
)
// Creating the directory is enough to ensure that Isle chooses LMDB as
// the db engine.
lmdbPath := filepath.Join(metaPath, "db.lmdb")
require.NoError(t, os.Mkdir(lmdbPath, 0755))
assert.NoError(t, network.SetConfig(h.ctx, networkConfig))
t.Log("Checking that garage is using the expected db engine")
garageConfig, err := os.ReadFile(
filepath.Join(network.runtimeDir.Path, "garage-4901.toml"),
)
assert.NoError(t, err)
assert.Contains(t,
string(garageConfig),
`db_engine = "`+garagesrv.DBEngineLMDB+`"`,
)
assert.NoFileExists(t, filepath.Join(metaPath, "db.sqlite"))
})
t.Run("remove storage alloc", func(t *testing.T) { t.Run("remove storage alloc", func(t *testing.T) {
var ( var (
h = newIntegrationHarness(t) h = newIntegrationHarness(t)
@ -311,15 +255,19 @@ func TestNetwork_SetConfig(t *testing.T) {
prevHost = network.getHostsByName(t)[network.hostName] prevHost = network.getHostsByName(t)[network.hostName]
removedAlloc = networkConfig.Storage.Allocations[3] removedAlloc = networkConfig.Storage.Allocations[3]
removedGarageInst = daecommon.BootstrapGarageHostForAlloc(
prevHost, removedAlloc,
)
) )
var removedGarageInst bootstrap.GarageHostInstance
for _, removedGarageInst = range prevHost.Garage.Instances {
if removedGarageInst.RPCPort == removedAlloc.RPCPort {
break
}
}
networkConfig.Storage.Allocations = networkConfig.Storage.Allocations[:3] networkConfig.Storage.Allocations = networkConfig.Storage.Allocations[:3]
assert.NoError(t, network.SetConfig(h.ctx, networkConfig)) assert.NoError(t, network.SetConfig(h.ctx, networkConfig))
t.Log("Checking that the Host information was updated") h.logger.Info(h.ctx, "Checking that the Host information was updated")
newHostsByName := network.getHostsByName(t) newHostsByName := network.getHostsByName(t)
newHost, ok := newHostsByName[network.hostName] newHost, ok := newHostsByName[network.hostName]
assert.True(t, ok) assert.True(t, ok)
@ -328,68 +276,20 @@ func TestNetwork_SetConfig(t *testing.T) {
assert.Len(t, allocs, 3) assert.Len(t, allocs, 3)
assert.NotContains(t, allocs, removedGarageInst) assert.NotContains(t, allocs, removedGarageInst)
t.Log("Checking that the bootstrap file was written with the new host config") h.logger.Info(h.ctx, "Checking that the bootstrap file was written with the new host config")
var storedBootstrap bootstrap.Bootstrap var storedBootstrap bootstrap.Bootstrap
assert.NoError(t, jsonutil.LoadFile( assert.NoError(t, jsonutil.LoadFile(
&storedBootstrap, bootstrap.StateDirPath(network.stateDir.Path), &storedBootstrap, bootstrap.StateDirPath(network.stateDir.Path),
)) ))
assert.Equal(t, newHostsByName, storedBootstrap.Hosts) assert.Equal(t, newHostsByName, storedBootstrap.Hosts)
t.Log("Checking that garage layout contains the new allocation") h.logger.Info(h.ctx, "Checking that garage layout contains the new allocation")
expRoles := allocsToRoles(network.hostName, allocs) expRoles := allocsToRoles(network.hostName, allocs)
layout, err := network.garageAdminClient(t).GetLayout(h.ctx) layout, err := network.garageAdminClient(t).GetLayout(h.ctx)
assert.NoError(t, err) assert.NoError(t, err)
assert.ElementsMatch(t, expRoles, layout.Roles) assert.ElementsMatch(t, expRoles, layout.Roles)
}) })
t.Run("remove all storage allocs", func(t *testing.T) {
t.Skip("This is currently expected to fail. Orphan removal is going to be reworked accordingly")
var (
h = newIntegrationHarness(t)
primus = h.createNetwork(t, "primus", nil)
secondus = h.joinNetwork(t, primus, "secondus", &joinNetworkOpts{
networkConfigOpts: &networkConfigOpts{
numStorageAllocs: 1,
},
})
networkConfig = secondus.getConfig(t)
prevHost = secondus.getHostsByName(t)[secondus.hostName]
removedRole = allocsToRoles(
secondus.hostName, prevHost.Garage.Instances,
)[0]
primusGarageAdminClient = primus.garageAdminClient(t)
)
networkConfig.Storage.Allocations = nil
assert.NoError(t, secondus.SetConfig(h.ctx, networkConfig))
t.Log("Checking that the Host information was updated")
newHostsByName := primus.getHostsByName(t)
newHost, ok := newHostsByName[secondus.hostName]
assert.True(t, ok)
allocs := newHost.HostConfigured.Garage.Instances
assert.Empty(t, allocs)
t.Log("Checking that garage layout still contains the old allocation")
layout, err := primusGarageAdminClient.GetLayout(h.ctx)
assert.NoError(t, err)
assert.Contains(t, layout.Roles, removedRole)
//t.Log("Removing orphan garage nodes with primus")
//assert.NoError(
// t, primus.Network.(*network).removeOrphanGarageNodes(h.ctx),
//)
t.Log("Checking that garage layout no longer contains the old allocation")
layout, err = primusGarageAdminClient.GetLayout(h.ctx)
assert.NoError(t, err)
assert.NotContains(t, layout.Roles, removedRole)
})
t.Run("changes reflected after restart", func(t *testing.T) { t.Run("changes reflected after restart", func(t *testing.T) {
var ( var (
h = newIntegrationHarness(t) h = newIntegrationHarness(t)
@ -408,3 +308,56 @@ func TestNetwork_SetConfig(t *testing.T) {
assert.Equal(t, networkConfig, network.getConfig(t)) assert.Equal(t, networkConfig, network.getConfig(t))
}) })
} }
func TestNetwork_glmStateTransition(t *testing.T) {
var (
h = newIntegrationHarness(t)
primus = h.createNetwork(t, "primus", nil)
secondus = h.joinNetwork(t, primus, "secondus", &joinNetworkOpts{
networkConfigOpts: &networkConfigOpts{
numStorageAllocs: 1,
},
})
secondusNetworkConfig = secondus.getConfig(t)
secondusAdminClient = secondus.garageAdminClient(t)
secondusNetworkDirect = secondus.Network.(*network)
secondsBootstrapHost = primus.getHostsByName(t)["secondus"]
)
assertGarageLayout(t, map[*integrationHarnessNetwork]int{
primus: 3,
secondus: 1,
})
secondusNetworkConfig.Storage.Allocations = nil
assert.NoError(t, secondus.SetConfig(h.ctx, secondusNetworkConfig))
assert.Len(t, secondusNetworkDirect.children.ActiveStorageAllocations(), 1)
h.logger.Info(h.ctx, "Waiting for secondus to finish draining")
err := toolkit.UntilTrue(
h.ctx, h.logger, 1*time.Second, func() (bool, error) {
status, err := secondusAdminClient.Status(h.ctx)
if err != nil {
return false, fmt.Errorf("getting status: %w", err)
}
for _, node := range status.Nodes {
if node.Addr.Addr() == secondsBootstrapHost.IP() {
return !node.Draining, nil
}
}
return false, fmt.Errorf("secondus not found in cluster status: %+v", status)
},
)
assert.NoError(t, err)
h.logger.Info(h.ctx, "Running GLM state transition")
assert.NoError(t, secondusNetworkDirect.glmStateTransition(h.ctx))
assertGarageLayout(t, map[*integrationHarnessNetwork]int{
primus: 3,
})
assert.Empty(t, secondusNetworkDirect.children.ActiveStorageAllocations())
}

View File

@ -8,6 +8,7 @@ import (
"isle/daemon/children" "isle/daemon/children"
"isle/daemon/daecommon" "isle/daemon/daecommon"
"isle/garage" "isle/garage"
"isle/garage/garagesrv"
"isle/nebula" "isle/nebula"
"isle/toolkit" "isle/toolkit"
"os" "os"
@ -17,6 +18,7 @@ import (
"sync/atomic" "sync/atomic"
"testing" "testing"
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog" "dev.mediocregopher.com/mediocre-go-lib.git/mlog"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -75,24 +77,32 @@ func newIntegrationHarness(t *testing.T) *integrationHarness {
t.Parallel() t.Parallel()
toolkit.MarkIntegrationTest(t) toolkit.MarkIntegrationTest(t)
var (
ctx = context.Background()
logger = toolkit.NewTestLogger(t)
)
rootDir, err := os.MkdirTemp("", "isle-network-it-test.*") rootDir, err := os.MkdirTemp("", "isle-network-it-test.*")
require.NoError(t, err) require.NoError(t, err)
t.Logf("Temporary test directory: %q", rootDir) {
ctx := mctx.Annotate(ctx, "rootDir", rootDir)
logger.Info(ctx, "Created temporary test root directory")
t.Cleanup(func() { t.Cleanup(func() {
if t.Failed() { if t.Failed() {
t.Logf("Temp directory for failed test not deleted: %q", rootDir) logger.Info(ctx, "Test failed, temporarty test root directory NOT deleted")
return return
} }
t.Logf("Deleting temp directory %q", rootDir) logger.Info(ctx, "Deleting temporary test root directory")
assert.NoError(t, os.RemoveAll(rootDir)) assert.NoError(t, os.RemoveAll(rootDir))
}) })
}
return &integrationHarness{ return &integrationHarness{
ctx: context.Background(), ctx: ctx,
logger: toolkit.NewTestLogger(t), logger: logger,
constructors: newConstructors(), constructors: newConstructors(),
rootDir: toolkit.Dir{Path: rootDir}, rootDir: toolkit.Dir{Path: rootDir},
nebulaDeviceNamer: children.NewNebulaDeviceNamer(), nebulaDeviceNamer: children.NewNebulaDeviceNamer(),
@ -102,7 +112,10 @@ func newIntegrationHarness(t *testing.T) *integrationHarness {
func (h *integrationHarness) mkDir(t *testing.T, name string) toolkit.Dir { func (h *integrationHarness) mkDir(t *testing.T, name string) toolkit.Dir {
fullName := fmt.Sprintf("%s-%d", name, h.dirCounter.Add(1)-1) fullName := fmt.Sprintf("%s-%d", name, h.dirCounter.Add(1)-1)
t.Logf("Creating directory %q", fullName) h.logger.Info(
mctx.Annotate(h.ctx, "dirName", fullName),
"Creating directory",
)
d, err := h.rootDir.MkChildDir(fullName, false) d, err := h.rootDir.MkChildDir(fullName, false)
require.NoError(t, err) require.NoError(t, err)
@ -152,6 +165,7 @@ type createNetworkOpts struct {
creationParams bootstrap.CreationParams creationParams bootstrap.CreationParams
manualShutdown bool manualShutdown bool
numStorageAllocs int numStorageAllocs int
garageDefaultDBEngine garagesrv.DBEngine
} }
func (o *createNetworkOpts) withDefaults() *createNetworkOpts { func (o *createNetworkOpts) withDefaults() *createNetworkOpts {
@ -187,11 +201,10 @@ func (h *integrationHarness) createNetwork(
hostNameStr string, hostNameStr string,
opts *createNetworkOpts, opts *createNetworkOpts,
) *integrationHarnessNetwork { ) *integrationHarnessNetwork {
t.Logf("Creating as %q", hostNameStr)
opts = opts.withDefaults() opts = opts.withDefaults()
var ( var (
logger = h.logger.WithNamespace("network").WithNamespace(hostNameStr) logger = h.logger.WithNamespace("networks").WithNamespace(hostNameStr)
networkConfig = h.mkNetworkConfig(t, &networkConfigOpts{ networkConfig = h.mkNetworkConfig(t, &networkConfigOpts{
hasPublicAddr: true, hasPublicAddr: true,
numStorageAllocs: opts.numStorageAllocs, numStorageAllocs: opts.numStorageAllocs,
@ -206,9 +219,11 @@ func (h *integrationHarness) createNetwork(
networkOpts = &Opts{ networkOpts = &Opts{
GarageAdminToken: "admin_token", GarageAdminToken: "admin_token",
Config: &networkConfig, Config: &networkConfig,
garageDefaultDBEngine: opts.garageDefaultDBEngine,
} }
) )
logger.Info(h.ctx, "Creating network")
network, err := h.constructors.create( network, err := h.constructors.create(
h.ctx, h.ctx,
logger, logger,
@ -239,9 +254,9 @@ func (h *integrationHarness) createNetwork(
if !opts.manualShutdown { if !opts.manualShutdown {
t.Cleanup(func() { t.Cleanup(func() {
t.Logf("Shutting down Network %q", hostNameStr) logger.Info(nh.ctx, "Shutting down Network")
if err := nh.Shutdown(); err != nil { if err := nh.Shutdown(); err != nil {
t.Logf("Shutting down Network %q failed: %v", hostNameStr, err) logger.Error(nh.ctx, "Shutting down Network failed", err)
} }
}) })
} }
@ -273,18 +288,24 @@ func (h *integrationHarness) joinNetwork(
opts *joinNetworkOpts, opts *joinNetworkOpts,
) *integrationHarnessNetwork { ) *integrationHarnessNetwork {
opts = opts.withDefaults() opts = opts.withDefaults()
hostName := nebula.HostName(hostNameStr)
t.Logf("Creating bootstrap for %q", hostNameStr) var (
joiningBootstrap, err := network.CreateHost(h.ctx, hostName, CreateHostOpts{ hostName = nebula.HostName(hostNameStr)
createHostCtx = mctx.Annotate(h.ctx, "hostName", hostName)
)
h.logger.Info(createHostCtx, "Creating bootstrap")
joiningBootstrap, err := network.CreateHost(
createHostCtx, hostName, CreateHostOpts{
CanCreateHosts: opts.canCreateHosts, CanCreateHosts: opts.canCreateHosts,
}) },
)
if err != nil { if err != nil {
t.Fatalf("creating host joining bootstrap: %v", err) t.Fatalf("creating host joining bootstrap: %v", err)
} }
var ( var (
logger = h.logger.WithNamespace("network").WithNamespace(hostNameStr) logger = h.logger.WithNamespace("networks").WithNamespace(hostNameStr)
networkConfig = h.mkNetworkConfig(t, opts.networkConfigOpts) networkConfig = h.mkNetworkConfig(t, opts.networkConfigOpts)
stateDir = h.mkDir(t, "state") stateDir = h.mkDir(t, "state")
runtimeDir = h.mkDir(t, "runtime") runtimeDir = h.mkDir(t, "runtime")
@ -295,7 +316,7 @@ func (h *integrationHarness) joinNetwork(
} }
) )
t.Logf("Joining as %q", hostNameStr) logger.Info(h.ctx, "Joining")
joinedNetwork, err := h.constructors.join( joinedNetwork, err := h.constructors.join(
h.ctx, h.ctx,
logger, logger,
@ -324,9 +345,9 @@ func (h *integrationHarness) joinNetwork(
if !opts.manualShutdown { if !opts.manualShutdown {
t.Cleanup(func() { t.Cleanup(func() {
t.Logf("Shutting down Network %q", hostNameStr) nh.logger.Info(nh.ctx, "Shutting down Network")
if err := nh.Shutdown(); err != nil { if err := nh.Shutdown(); err != nil {
t.Logf("Shutting down Network %q failed: %v", hostNameStr, err) nh.logger.Error(nh.ctx, "Shutting down Network failed", err)
} }
}) })
} }
@ -335,10 +356,10 @@ func (h *integrationHarness) joinNetwork(
} }
func (nh *integrationHarnessNetwork) restart(t *testing.T) { func (nh *integrationHarnessNetwork) restart(t *testing.T) {
t.Log("Shutting down network (restart)") nh.logger.Info(nh.ctx, "Shutting down network (restart)")
require.NoError(t, nh.Network.Shutdown()) require.NoError(t, nh.Network.Shutdown())
t.Log("Loading network (restart)") nh.logger.Info(nh.ctx, "Loading network (restart)")
var err error var err error
nh.Network, err = nh.constructors.load( nh.Network, err = nh.constructors.load(
nh.ctx, nh.ctx,

View File

@ -141,6 +141,7 @@ type KnownNode struct {
Addr netip.AddrPort `json:"addr"` Addr netip.AddrPort `json:"addr"`
IsUp bool `json:"isUp"` IsUp bool `json:"isUp"`
LastSeenSecsAgo int `json:"lastSeenSecsAgo"` LastSeenSecsAgo int `json:"lastSeenSecsAgo"`
Draining bool `json:"draining"`
HostName string `json:"hostname"` HostName string `json:"hostname"`
} }

View File

@ -11,7 +11,6 @@ import (
"io/fs" "io/fs"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
) )
// DBEngine enumerates the garage db engines which are supported by Isle. // DBEngine enumerates the garage db engines which are supported by Isle.
@ -31,13 +30,9 @@ func nodeKeyPubPath(metaDirPath string) string {
return filepath.Join(metaDirPath, "node_key.pub") return filepath.Join(metaDirPath, "node_key.pub")
} }
func nodeRPCPortPath(metaDirPath string) string { // LoadAllocID returns the peer ID (ie the public key) of the node at the given
return filepath.Join(metaDirPath, "isle", "rpc_port")
}
// loadAllocID returns the peer ID (ie the public key) of the node at the given
// meta directory. // meta directory.
func loadAllocID(metaDirPath string) (string, error) { func LoadAllocID(metaDirPath string) (string, error) {
nodeKeyPubPath := nodeKeyPubPath(metaDirPath) nodeKeyPubPath := nodeKeyPubPath(metaDirPath)
pubKey, err := os.ReadFile(nodeKeyPubPath) pubKey, err := os.ReadFile(nodeKeyPubPath)
@ -61,8 +56,8 @@ func generatePeerKey() (pubKey, privKey []byte) {
// InitAlloc initializes the meta directory and keys for a particular // InitAlloc initializes the meta directory and keys for a particular
// allocation, if it hasn't been done so already. It returns the peer ID (ie the // allocation, if it hasn't been done so already. It returns the peer ID (ie the
// public key) and the rpc port in any case. // public key) in any case.
func InitAlloc(metaDirPath string, initRPCPort int) (string, int, error) { func InitAlloc(metaDirPath string) (string, error) {
initDirFor := func(path string) error { initDirFor := func(path string) error {
dir := filepath.Dir(path) dir := filepath.Dir(path)
@ -70,110 +65,97 @@ func InitAlloc(metaDirPath string, initRPCPort int) (string, int, error) {
} }
var err error var err error
exists := func(path string) bool { exists := func(path string) bool {
if err != nil { if err != nil {
return false return false
} else if _, err = os.Stat(path); errors.Is(err, fs.ErrNotExist) { } else if _, err = os.Stat(path); errors.Is(err, fs.ErrNotExist) {
err = nil err = nil
return false return false
} else if err != nil { } else if err != nil {
err = fmt.Errorf("checking if %q exists: %w", path, err) err = fmt.Errorf("checking if %q exists: %w", path, err)
return false return false
} }
return true return true
} }
nodeKeyPath := nodeKeyPath(metaDirPath)
nodeKeyPubPath := nodeKeyPubPath(metaDirPath)
nodeRPCPortPath := nodeRPCPortPath(metaDirPath)
nodeKeyPathExists := exists(nodeKeyPath)
nodeKeyPubPathExists := exists(nodeKeyPubPath)
nodeRPCPortPathExists := exists(nodeRPCPortPath)
if err != nil {
return "", 0, err
} else if nodeKeyPubPathExists != nodeKeyPathExists {
return "", 0, fmt.Errorf("%q or %q exist without the other existing", nodeKeyPath, nodeKeyPubPath)
}
var ( var (
pubKeyStr string nodeKeyPath = nodeKeyPath(metaDirPath)
rpcPort int nodeKeyPubPath = nodeKeyPubPath(metaDirPath)
nodeKeyPathExists = exists(nodeKeyPath)
nodeKeyPubPathExists = exists(nodeKeyPubPath)
) )
if nodeKeyPathExists { if err != nil {
return "", err
} else if nodeKeyPubPathExists != nodeKeyPathExists {
return "", fmt.Errorf(
"%q or %q exist without the other existing",
nodeKeyPath,
nodeKeyPubPath,
)
}
if pubKeyStr, err = loadAllocID(metaDirPath); err != nil { var pubKeyStr string
return "", 0, fmt.Errorf("reading node public key file: %w", err) if nodeKeyPathExists {
if pubKeyStr, err = LoadAllocID(metaDirPath); err != nil {
return "", fmt.Errorf("reading node public key file: %w", err)
} }
} else { } else {
if err := initDirFor(nodeKeyPath); err != nil { if err := initDirFor(nodeKeyPath); err != nil {
return "", 0, fmt.Errorf("creating directory for %q: %w", nodeKeyPath, err) return "", fmt.Errorf(
"creating directory for %q: %w", nodeKeyPath, err,
)
} }
pubKey, privKey := generatePeerKey() pubKey, privKey := generatePeerKey()
if err := os.WriteFile(nodeKeyPath, privKey, 0400); err != nil { if err := os.WriteFile(nodeKeyPath, privKey, 0400); err != nil {
return "", 0, fmt.Errorf("writing private key to %q: %w", nodeKeyPath, err) return "", fmt.Errorf(
"writing private key to %q: %w", nodeKeyPath, err,
)
} else if err := os.WriteFile(nodeKeyPubPath, pubKey, 0440); err != nil { } else if err := os.WriteFile(nodeKeyPubPath, pubKey, 0440); err != nil {
return "", 0, fmt.Errorf("writing public key to %q: %w", nodeKeyPubPath, err) return "", fmt.Errorf(
"writing public key to %q: %w", nodeKeyPubPath, err,
)
} }
pubKeyStr = hex.EncodeToString(pubKey) pubKeyStr = hex.EncodeToString(pubKey)
} }
if nodeRPCPortPathExists { return pubKeyStr, nil
if rpcPortStr, err := os.ReadFile(nodeRPCPortPath); err != nil {
return "", 0, fmt.Errorf("reading rpc port from %q: %w", nodeRPCPortPath, err)
} else if rpcPort, err = strconv.Atoi(string(rpcPortStr)); err != nil {
return "", 0, fmt.Errorf("parsing rpc port %q from %q: %w", rpcPortStr, nodeRPCPortPath, err)
}
} else {
if err := initDirFor(nodeRPCPortPath); err != nil {
return "", 0, fmt.Errorf("creating directory for %q: %w", nodeRPCPortPath, err)
}
rpcPortStr := strconv.Itoa(initRPCPort)
if err := os.WriteFile(nodeRPCPortPath, []byte(rpcPortStr), 0440); err != nil {
return "", 0, fmt.Errorf("writing rpc port %q to %q: %w", rpcPortStr, nodeRPCPortPath, err)
}
rpcPort = initRPCPort
}
return pubKeyStr, rpcPort, nil
} }
// GetDBEngine returns the DBEngine currently being used at a particular meta // GetDBEngine returns the DBEngine currently being used at a particular meta
// data directory. Defaults to DBEngineSqlite if the directory doesn't exist or // data directory, or returns the default if the directory doesn't exist or
// hasn't been fully initialized yet. // hasn't been fully initialized yet.
func GetDBEngine(metaDirPath string) (DBEngine, error) { func GetDBEngine(
dbLMDBPath := filepath.Join(metaDirPath, "db.lmdb") metaDirPath string, defaultDBEngine DBEngine,
) (
DBEngine, error,
) {
search := []struct {
dbEngine DBEngine
path string
pathIsDir bool
}{
{DBEngineLMDB, filepath.Join(metaDirPath, "db.lmdb"), true},
{DBEngineSqlite, filepath.Join(metaDirPath, "db.sqlite"), false},
}
stat, err := os.Stat(dbLMDBPath) for _, s := range search {
stat, err := os.Stat(s.path)
if errors.Is(err, fs.ErrNotExist) { if errors.Is(err, fs.ErrNotExist) {
return DBEngineSqlite, nil continue
} else if err != nil { } else if err != nil {
return "", fmt.Errorf("checking if %q exists: %w", dbLMDBPath, err) return "", fmt.Errorf("checking if %q exists: %w", s.path, err)
} else if stat.IsDir() { } else if stat.IsDir() != s.pathIsDir {
return DBEngineLMDB, nil continue
} }
return DBEngineSqlite, nil return s.dbEngine, nil
}
return defaultDBEngine, nil
} }

View File

@ -6,25 +6,28 @@ import (
"strconv" "strconv"
) )
// RemoteNode describes all information necessary to connect to a given garage // Node describes the common fields of both a [RemoteNode] and [LocalNode]
// node. type Node struct {
type RemoteNode struct {
ID string
IP string IP string
RPCPort int RPCPort int
S3APIPort int S3APIPort int
} }
// LocalNode describes the configuration of a local garage instance. // RPCAddr returns the address of the node's RPC port.
type LocalNode struct { func (p Node) RPCAddr() string {
RemoteNode return net.JoinHostPort(p.IP, strconv.Itoa(p.RPCPort))
AdminPort int
} }
// RPCAddr returns the address of the node's RPC port. // S3APIAddr returns the address of the node's S3 API port.
func (p RemoteNode) RPCAddr() string { func (p Node) S3APIAddr() string {
return net.JoinHostPort(p.IP, strconv.Itoa(p.RPCPort)) return net.JoinHostPort(p.IP, strconv.Itoa(p.S3APIPort))
}
// RemoteNode describes all information necessary to connect to a given garage
// node.
type RemoteNode struct {
Node
ID string
} }
// RPCNodeAddr returns the full node address (e.g. "id@ip:port") of the garage // RPCNodeAddr returns the full node address (e.g. "id@ip:port") of the garage
@ -33,9 +36,10 @@ func (p RemoteNode) RPCNodeAddr() string {
return fmt.Sprintf("%s@%s", p.ID, p.RPCAddr()) return fmt.Sprintf("%s@%s", p.ID, p.RPCAddr())
} }
// S3APIAddr returns the address of the node's S3 API port. // LocalNode describes the configuration of a local garage instance.
func (p RemoteNode) S3APIAddr() string { type LocalNode struct {
return net.JoinHostPort(p.IP, strconv.Itoa(p.S3APIPort)) Node
AdminPort int
} }
// AdminAddr returns the address of the node's S3 API port. // AdminAddr returns the address of the node's S3 API port.

View File

@ -0,0 +1,10 @@
---
type: task
---
NetworkConfig should validate that no ports which have been configured to be
used conflict with each other. In other words, no port used by a storage
allocation should be used twice within that allocation, or used by another
storage allocation.
Same goes for directories being used.

View File

@ -0,0 +1,10 @@
---
type: task
---
There should be a `storage allocation status` command, separate from the
`storage allocation list` command, which shows whether any allocations are
currently draining (among any other information which might be useful).
Once implemented the "Contributing Storage" operator documentation should be
updated to recommend using this command during allocation removal.