Compare commits

..

3 Commits

8 changed files with 425 additions and 116 deletions

View File

@ -6,24 +6,10 @@ import (
// GaragePeers returns a Peer for each known garage instance in the network. // GaragePeers returns a Peer for each known garage instance in the network.
func (b Bootstrap) GaragePeers() []garage.RemotePeer { func (b Bootstrap) GaragePeers() []garage.RemotePeer {
var peers []garage.RemotePeer var peers []garage.RemotePeer
for _, host := range b.Hosts { for _, host := range b.Hosts {
peers = append(peers, host.GaragePeers()...)
for _, instance := range host.Garage.Instances {
peer := garage.RemotePeer{
ID: instance.ID,
IP: host.IP().String(),
RPCPort: instance.RPCPort,
S3APIPort: instance.S3APIPort,
}
peers = append(peers, peer)
}
} }
return peers return peers
} }
@ -31,18 +17,9 @@ func (b Bootstrap) GaragePeers() []garage.RemotePeer {
// will prefer a garage instance on this particular host, if there is one, but // will prefer a garage instance on this particular host, if there is one, but
// will otherwise return a random endpoint. // will otherwise return a random endpoint.
func (b Bootstrap) ChooseGaragePeer() garage.RemotePeer { func (b Bootstrap) ChooseGaragePeer() garage.RemotePeer {
thisHost := b.ThisHost() thisHost := b.ThisHost()
if len(thisHost.Garage.Instances) > 0 { if len(thisHost.Garage.Instances) > 0 {
return thisHost.GaragePeers()[0]
inst := thisHost.Garage.Instances[0]
return garage.RemotePeer{
ID: inst.ID,
IP: thisHost.IP().String(),
RPCPort: inst.RPCPort,
S3APIPort: inst.S3APIPort,
}
} }
for _, peer := range b.GaragePeers() { for _, peer := range b.GaragePeers() {

View File

@ -2,6 +2,7 @@ package bootstrap
import ( import (
"fmt" "fmt"
"isle/garage"
"isle/nebula" "isle/nebula"
"net/netip" "net/netip"
) )
@ -91,3 +92,18 @@ func (h Host) IP() netip.Addr {
return addr return addr
} }
// GaragePeers returns a RemotePeer for each garage instance advertised by this
// Host.
func (h Host) GaragePeers() []garage.RemotePeer {
var peers []garage.RemotePeer
for _, instance := range h.Garage.Instances {
peers = append(peers, garage.RemotePeer{
ID: instance.ID,
IP: h.IP().String(),
RPCPort: instance.RPCPort,
S3APIPort: instance.S3APIPort,
})
}
return peers
}

View File

@ -184,7 +184,6 @@ func (c *Children) reloadNebula(
return nil return nil
} }
// TODO this doesn't handle removing garage nodes
func (c *Children) reloadGarage( func (c *Children) reloadGarage(
ctx context.Context, ctx context.Context,
networkConfig daecommon.NetworkConfig, networkConfig daecommon.NetworkConfig,
@ -206,6 +205,8 @@ func (c *Children) reloadGarage(
) )
) )
// TODO it's possible that the config changed, but only the bootstrap
// peers, in which case we don't need to restart the node.
childConfigPath, changed, err := garageWriteChildConfig( childConfigPath, changed, err := garageWriteChildConfig(
ctx, ctx,
c.logger, c.logger,

View File

@ -12,6 +12,7 @@ import (
_ "embed" _ "embed"
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
) )
@ -60,6 +61,16 @@ type ConfigStorageAllocation struct {
Zone string `yaml:"zone"` Zone string `yaml:"zone"`
} }
// Annotate implements the mctx.Annotator interface.
func (csa ConfigStorageAllocation) Annotate(aa mctx.Annotations) {
aa["allocDataPath"] = csa.DataPath
aa["allocMetaPath"] = csa.MetaPath
aa["allocCapacity"] = csa.Capacity
aa["allocS3APIPort"] = csa.S3APIPort
aa["allocRPCPort"] = csa.RPCPort
aa["allocAdminPort"] = csa.AdminPort
}
// NetworkConfig describes the configuration of a single network. // NetworkConfig describes the configuration of a single network.
type NetworkConfig struct { type NetworkConfig struct {
DNS struct { DNS struct {

View File

@ -12,12 +12,16 @@ import (
"isle/nebula" "isle/nebula"
"isle/secrets" "isle/secrets"
"net" "net"
"net/netip"
"path/filepath" "path/filepath"
"slices"
"strconv" "strconv"
"time"
"dev.mediocregopher.com/mediocre-go-lib.git/mctx" "dev.mediocregopher.com/mediocre-go-lib.git/mctx"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog" "dev.mediocregopher.com/mediocre-go-lib.git/mlog"
"github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7"
"golang.org/x/exp/maps"
) )
// Paths within garage's global bucket. // Paths within garage's global bucket.
@ -118,6 +122,8 @@ func garageApplyLayout(
} }
} }
_, _ = adminClient.Status(ctx) // TODO remove this
return adminClient.ApplyLayout(ctx, peers, idsToRemove) return adminClient.ApplyLayout(ctx, peers, idsToRemove)
} }
@ -298,3 +304,109 @@ func removeGarageBootstrapHost(
ctx, garage.GlobalBucket, filePath, minio.RemoveObjectOptions{}, ctx, garage.GlobalBucket, filePath, minio.RemoveObjectOptions{},
) )
} }
// We can wait for the garage instance to appear healthy, but there are cases
// where they still haven't fully synced the list of buckets and bucket
// credentials. For those cases it's necessary to do this as an additional
// check.
func garageWaitForAlloc(
ctx context.Context,
logger *mlog.Logger,
alloc daecommon.ConfigStorageAllocation,
adminToken string,
host bootstrap.Host,
) error {
var (
hostIP = host.IP().String()
adminClient = garage.NewAdminClient(
garageAdminClientLogger(logger),
net.JoinHostPort(hostIP, strconv.Itoa(alloc.AdminPort)),
adminToken,
)
)
defer adminClient.Close()
ctx = mctx.WithAnnotator(ctx, alloc)
for {
logger.Info(ctx, "Checking if node has synced bucket list")
buckets, err := adminClient.ListBuckets(ctx)
if err != nil {
return fmt.Errorf("listing buckets: %w", err)
} else if len(buckets) == 0 {
logger.WarnString(ctx, "No buckets found, will wait a bit and try again")
select {
case <-time.After(1 * time.Second):
continue
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
}
// garageNodeBuddyPeers returns the "buddy" peers of the given host, based on
// the given garage cluster status. It will return zero values if the host has
// no buddy.
//
// For situations where we want one host to affect the cluster layout of another
// host's peers, we use a simple system to determine a single host which is
// responsible. The goal is not to be 100% race-proof (garage handles that), but
// rather to try to prevent all hosts from modifying the same host's layout at
// the same time.
//
// The system is to order all hosts by their IP, and say that each host is
// responsible for (aka the "buddy" of) the host immediately after their own in
// that list. The last host in that list is responsible for the first.
func garageNodeBuddyPeers(
status garage.ClusterStatus, host bootstrap.Host,
) (
netip.Addr, []garage.PeerLayout,
) {
var (
thisIP = host.IP()
peersByID = make(
map[string]garage.PeerLayout, len(status.Layout.Peers),
)
nodePeersByIP = map[netip.Addr][]garage.PeerLayout{}
)
for _, peer := range status.Layout.Peers {
peersByID[peer.ID] = peer
}
for _, node := range status.Nodes {
peer, ok := peersByID[node.ID]
if !ok {
continue
}
ip := node.Addr.Addr()
nodePeersByIP[ip] = append(nodePeersByIP[ip], peer)
}
// If there is only a single host in the cluster (or, somehow, none) then
// that host has no buddy.
if len(nodePeersByIP) < 2 {
return netip.Addr{}, nil
}
nodeIPs := maps.Keys(nodePeersByIP)
slices.SortFunc(nodeIPs, netip.Addr.Compare)
for i, nodeIP := range nodeIPs {
var buddyIP netip.Addr
if i == len(nodeIPs)-1 {
buddyIP = nodeIPs[0]
} else if nodeIP == thisIP {
buddyIP = nodeIPs[i+1]
} else {
continue
}
return buddyIP, nodePeersByIP[buddyIP]
}
panic("Unreachable")
}

View File

@ -23,6 +23,7 @@ import (
"sync" "sync"
"time" "time"
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog" "dev.mediocregopher.com/mediocre-go-lib.git/mlog"
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
) )
@ -34,6 +35,8 @@ type GarageClientParams struct {
GlobalBucketS3APICredentials garage.S3APICredentials GlobalBucketS3APICredentials garage.S3APICredentials
// RPCSecret may be empty, if the secret is not available on the host. // RPCSecret may be empty, if the secret is not available on the host.
//
// TODO this shouldn't really be here I don't think, remove it?
RPCSecret string RPCSecret string
} }
@ -181,13 +184,15 @@ type network struct {
networkConfig daecommon.NetworkConfig networkConfig daecommon.NetworkConfig
currBootstrap bootstrap.Bootstrap currBootstrap bootstrap.Bootstrap
shutdownCh chan struct{} workerCtx context.Context
wg sync.WaitGroup workerCancel context.CancelFunc
wg sync.WaitGroup
} }
// instatiateNetwork returns an instantiated *network instance which has not yet // instatiateNetwork returns an instantiated *network instance which has not yet
// been initialized. // been initialized.
func instatiateNetwork( func instatiateNetwork(
ctx context.Context,
logger *mlog.Logger, logger *mlog.Logger,
networkConfig daecommon.NetworkConfig, networkConfig daecommon.NetworkConfig,
envBinDirPath string, envBinDirPath string,
@ -195,6 +200,8 @@ func instatiateNetwork(
runtimeDir toolkit.Dir, runtimeDir toolkit.Dir,
opts *Opts, opts *Opts,
) *network { ) *network {
ctx = context.WithoutCancel(ctx)
ctx, cancel := context.WithCancel(ctx)
return &network{ return &network{
logger: logger, logger: logger,
networkConfig: networkConfig, networkConfig: networkConfig,
@ -202,7 +209,8 @@ func instatiateNetwork(
stateDir: stateDir, stateDir: stateDir,
runtimeDir: runtimeDir, runtimeDir: runtimeDir,
opts: opts.withDefaults(), opts: opts.withDefaults(),
shutdownCh: make(chan struct{}), workerCtx: ctx,
workerCancel: cancel,
} }
} }
@ -244,6 +252,7 @@ func Load(
Network, error, Network, error,
) { ) {
n := instatiateNetwork( n := instatiateNetwork(
ctx,
logger, logger,
networkConfig, networkConfig,
envBinDirPath, envBinDirPath,
@ -265,7 +274,7 @@ func Load(
return nil, fmt.Errorf( return nil, fmt.Errorf(
"loading bootstrap from %q: %w", bootstrapFilePath, err, "loading bootstrap from %q: %w", bootstrapFilePath, err,
) )
} else if err := n.initialize(ctx, currBootstrap); err != nil { } else if err := n.initialize(ctx, currBootstrap, false); err != nil {
return nil, fmt.Errorf("initializing with bootstrap: %w", err) return nil, fmt.Errorf("initializing with bootstrap: %w", err)
} }
@ -289,6 +298,7 @@ func Join(
Network, error, Network, error,
) { ) {
n := instatiateNetwork( n := instatiateNetwork(
ctx,
logger, logger,
networkConfig, networkConfig,
envBinDirPath, envBinDirPath,
@ -307,7 +317,7 @@ func Join(
return nil, fmt.Errorf("importing secrets: %w", err) return nil, fmt.Errorf("importing secrets: %w", err)
} }
if err := n.initialize(ctx, joiningBootstrap.Bootstrap); err != nil { if err := n.initialize(ctx, joiningBootstrap.Bootstrap, false); err != nil {
return nil, fmt.Errorf("initializing with bootstrap: %w", err) return nil, fmt.Errorf("initializing with bootstrap: %w", err)
} }
@ -355,6 +365,7 @@ func Create(
garageRPCSecret := toolkit.RandStr(32) garageRPCSecret := toolkit.RandStr(32)
n := instatiateNetwork( n := instatiateNetwork(
ctx,
logger, logger,
networkConfig, networkConfig,
envBinDirPath, envBinDirPath,
@ -390,7 +401,7 @@ func Create(
return nil, fmt.Errorf("initializing bootstrap data: %w", err) return nil, fmt.Errorf("initializing bootstrap data: %w", err)
} }
if err := n.initialize(ctx, hostBootstrap); err != nil { if err := n.initialize(ctx, hostBootstrap, true); err != nil {
return nil, fmt.Errorf("initializing with bootstrap: %w", err) return nil, fmt.Errorf("initializing with bootstrap: %w", err)
} }
@ -407,8 +418,42 @@ func (n *network) initializeDirs(mayExist bool) error {
return nil return nil
} }
func (n *network) periodically(
label string,
fn func(context.Context) error,
period time.Duration,
) {
n.wg.Add(1)
go func() {
defer n.wg.Done()
ctx := mctx.Annotate(n.workerCtx, "workerLabel", label)
ticker := time.NewTicker(period)
defer ticker.Stop()
n.logger.Info(ctx, "Starting background job runner")
defer n.logger.Info(ctx, "Stopping background job runner")
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
n.logger.Info(ctx, "Background job running")
if err := fn(ctx); err != nil {
n.logger.Error(ctx, "Background job failed", err)
}
}
}
}()
}
func (n *network) initialize( func (n *network) initialize(
ctx context.Context, prevBootstrap bootstrap.Bootstrap, ctx context.Context,
prevBootstrap bootstrap.Bootstrap,
isCreate bool,
) error { ) error {
prevThisHost := prevBootstrap.ThisHost() prevThisHost := prevBootstrap.ThisHost()
@ -449,41 +494,37 @@ func (n *network) initialize(
n.logger.Info(ctx, "Child processes created") n.logger.Info(ctx, "Child processes created")
if err := n.postInit(ctx, prevThisHost); err != nil { createGarageGlobalBucket := isCreate
err = n.postChildrenInit(ctx, prevThisHost, createGarageGlobalBucket)
if err != nil {
n.logger.Error(ctx, "Post-initialization failed, stopping child processes", err) n.logger.Error(ctx, "Post-initialization failed, stopping child processes", err)
n.children.Shutdown() n.children.Shutdown()
return fmt.Errorf("performing post-initialization: %w", err) return fmt.Errorf("performing post-initialization: %w", err)
} }
// Do this now so that everything is stable before returning. This also // Do this now so that everything is stable before returning. This also
// serves a dual-purpose, as it makes sure that the PUT from the postInit // serves a dual-purpose, as it makes sure that the PUT from the postChildrenInit
// above has propagated from the local garage instance, if there is one. // above has propagated from the local garage instance, if there is one.
n.logger.Info(ctx, "Reloading hosts from network storage") n.logger.Info(ctx, "Reloading hosts from network storage")
if err = n.reloadHosts(ctx); err != nil { if err = n.reloadHosts(ctx); err != nil {
return fmt.Errorf("Reloading network bootstrap: %w", err) return fmt.Errorf("Reloading network bootstrap: %w", err)
} }
ctx = context.WithoutCancel(ctx) n.periodically("reloadHosts", n.reloadHosts, 3*time.Minute)
ctx, cancel := context.WithCancel(ctx)
n.wg.Add(1)
go func() {
defer n.wg.Done()
<-n.shutdownCh
cancel()
}()
n.wg.Add(1) n.periodically(
go func() { "removeOrphanGarageNodes", n.removeOrphanGarageNodes, 1*time.Minute,
defer n.wg.Done() )
n.reloadLoop(ctx)
n.logger.Debug(ctx, "Daemon reload loop stopped")
}()
return nil return nil
} }
func (n *network) postInit( // postChildrenInit performs steps which are required after children have been
ctx context.Context, prevThisHost bootstrap.Host, // initialized.
func (n *network) postChildrenInit(
ctx context.Context,
prevThisHost bootstrap.Host,
createGarageGlobalBucket bool,
) error { ) error {
n.l.RLock() n.l.RLock()
defer n.l.RUnlock() defer n.l.RUnlock()
@ -503,15 +544,7 @@ func (n *network) postInit(
} }
} }
// This is only necessary during network creation, otherwise the bootstrap if createGarageGlobalBucket {
// should already have these credentials built in.
//
// TODO this is pretty hacky, but there doesn't seem to be a better way to
// manage it at the moment.
_, err := daecommon.GetGarageS3APIGlobalBucketCredentials(
ctx, n.secretsStore,
)
if errors.Is(err, secrets.ErrNotFound) {
n.logger.Info(ctx, "Initializing garage shared global bucket") n.logger.Info(ctx, "Initializing garage shared global bucket")
garageGlobalBucketCreds, err := garageInitializeGlobalBucket( garageGlobalBucketCreds, err := garageInitializeGlobalBucket(
ctx, ctx,
@ -532,8 +565,18 @@ func (n *network) postInit(
} }
} }
for _, alloc := range n.networkConfig.Storage.Allocations {
if err := garageWaitForAlloc(
ctx, n.logger, alloc, n.opts.GarageAdminToken, thisHost,
); err != nil {
return fmt.Errorf(
"waiting for alloc %+v to initialize: %w", alloc, err,
)
}
}
n.logger.Info(ctx, "Updating host info in garage") n.logger.Info(ctx, "Updating host info in garage")
err = putGarageBoostrapHost(ctx, n.secretsStore, n.currBootstrap) err := putGarageBoostrapHost(ctx, n.secretsStore, n.currBootstrap)
if err != nil { if err != nil {
return fmt.Errorf("updating host info in garage: %w", err) return fmt.Errorf("updating host info in garage: %w", err)
} }
@ -575,23 +618,75 @@ func (n *network) reloadHosts(ctx context.Context) error {
return nil return nil
} }
func (n *network) reloadLoop(ctx context.Context) { // In general each host will manage the garage cluster layout of its own storage
const period = 3 * time.Minute // allocations via garageApplyLayout. There are three situations which are
ticker := time.NewTicker(period) // handled here, rather than garageApplyLayout:
defer ticker.Stop() //
// - A host removes all of its allocations via SetConfig.
// - A host removes all of its allocations by calling Load with no allocations
// in the provided daecommon.NetworkConfig.
// - A host is removed from the network by another host.
//
// In all of these cases the host no longer has any garage instances running,
// and so can't call garageApplyLayout on itself. To combat this we have all
// hosts which do have garage instances running periodically check that there's
// not some garage nodes orphaned in the cluster layout, and remove them if so.
func (n *network) removeOrphanGarageNodes(ctx context.Context) error {
n.l.RLock()
defer n.l.RUnlock()
for { thisHost := n.currBootstrap.ThisHost()
select { if len(thisHost.Garage.Instances) == 0 {
case <-ctx.Done(): n.logger.Info(ctx, "No local garage instances, cannot remove orphans")
return return nil
case <-ticker.C:
if err := n.reloadHosts(ctx); err != nil {
n.logger.Error(ctx, "Attempting to reload", err)
continue
}
}
} }
adminClient := newGarageAdminClient(
n.logger, n.networkConfig, n.opts.GarageAdminToken, thisHost,
)
defer adminClient.Close()
clusterStatus, err := adminClient.Status(ctx)
if err != nil {
return fmt.Errorf("retrieving garage cluster status: %w", err)
}
buddyIP, buddyNodes := garageNodeBuddyPeers(clusterStatus, thisHost)
if len(buddyNodes) == 0 {
return nil
}
ctx = mctx.Annotate(ctx, "buddyIP", buddyIP)
for _, host := range n.currBootstrap.Hosts {
if host.IP() != buddyIP {
continue
} else if len(host.Garage.Instances) > 0 {
n.logger.Info(ctx, "Buddy instance has garage nodes configured in its bootstrap, doing nothing")
return nil
}
break
}
// Either the host is no longer in the network, or it no longer has any
// garage instances set on it. Either way, remove its nodes from the cluster
// layout.
buddyNodeIDs := make([]string, len(buddyNodes))
for i, buddyNode := range buddyNodes {
buddyNodeIDs[i] = buddyNode.ID
}
n.logger.Info(ctx, "Applying garage layout to remove orphaned garage nodes")
if err := adminClient.ApplyLayout(ctx, nil, buddyNodeIDs); err != nil {
return fmt.Errorf(
"applying garage cluster layout, removing nodes %+v: %w",
buddyNodes,
err,
)
}
return nil
} }
// returns the bootstrap prior to the reload being applied. // returns the bootstrap prior to the reload being applied.
@ -944,7 +1039,7 @@ func (n *network) SetConfig(
return fmt.Errorf("reloading config: %w", err) return fmt.Errorf("reloading config: %w", err)
} }
if err := n.postInit(ctx, prevBootstrap.ThisHost()); err != nil { if err := n.postChildrenInit(ctx, prevBootstrap.ThisHost(), false); err != nil {
return fmt.Errorf("performing post-initialization: %w", err) return fmt.Errorf("performing post-initialization: %w", err)
} }
@ -966,10 +1061,11 @@ func (n *network) GetNetworkCreationParams(
} }
func (n *network) Shutdown() error { func (n *network) Shutdown() error {
close(n.shutdownCh) n.workerCancel()
n.wg.Wait() n.wg.Wait()
if n.children != nil { if n.children != nil {
n.logger.Info(context.Background(), "Shutting down children")
n.children.Shutdown() n.children.Shutdown()
} }

View File

@ -54,19 +54,44 @@ func TestLoad(t *testing.T) {
} }
func TestJoin(t *testing.T) { func TestJoin(t *testing.T) {
var ( t.Run("simple", func(t *testing.T) {
h = newIntegrationHarness(t) var (
primus = h.createNetwork(t, "primus", nil) h = newIntegrationHarness(t)
secondus = h.joinNetwork(t, primus, "secondus", nil) primus = h.createNetwork(t, "primus", nil)
) secondus = h.joinNetwork(t, primus, "secondus", nil)
)
primusHosts, err := primus.GetHosts(h.ctx) primusHosts, err := primus.GetHosts(h.ctx)
assert.NoError(t, err) assert.NoError(t, err)
secondusHosts, err := secondus.GetHosts(h.ctx) secondusHosts, err := secondus.GetHosts(h.ctx)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, primusHosts, secondusHosts) assert.Equal(t, primusHosts, secondusHosts)
})
t.Run("with alloc", func(t *testing.T) {
var (
h = newIntegrationHarness(t)
primus = h.createNetwork(t, "primus", nil)
secondus = h.joinNetwork(t, primus, "secondus", &joinNetworkOpts{
networkConfigOpts: &networkConfigOpts{
numStorageAllocs: 1,
},
})
)
t.Log("reloading primus' hosts")
assert.NoError(t, primus.Network.(*network).reloadHosts(h.ctx))
primusHosts, err := primus.GetHosts(h.ctx)
assert.NoError(t, err)
secondusHosts, err := secondus.GetHosts(h.ctx)
assert.NoError(t, err)
assert.Equal(t, primusHosts, secondusHosts)
})
} }
func TestNetwork_GetConfig(t *testing.T) { func TestNetwork_GetConfig(t *testing.T) {
@ -189,5 +214,33 @@ func TestNetwork_SetConfig(t *testing.T) {
assert.ElementsMatch(t, expPeers, layout.Peers) assert.ElementsMatch(t, expPeers, layout.Peers)
}) })
// TODO a host having allocs but removing all of them t.Run("remove all storage allocs", func(t *testing.T) {
var (
h = newIntegrationHarness(t)
primus = h.createNetwork(t, "primus", nil)
secondus = h.joinNetwork(t, primus, "secondus", &joinNetworkOpts{
networkConfigOpts: &networkConfigOpts{
numStorageAllocs: 1,
},
})
networkConfig = secondus.getConfig(t)
prevHost = secondus.getHostsByName(t)[secondus.hostName]
//removedAlloc = networkConfig.Storage.Allocations[0]
removedPeer = allocsToPeerLayouts(
secondus.hostName, prevHost.Garage.Instances,
)[0]
//removedGarageInst = daecommon.BootstrapGarageHostForAlloc(
// prevHost, removedAlloc,
//)
)
networkConfig.Storage.Allocations = nil
assert.NoError(t, secondus.SetConfig(h.ctx, networkConfig))
t.Log("Checking that garage layout still contains the old allocation")
layout, err := secondus.garageAdminClient(t).GetLayout(h.ctx)
assert.NoError(t, err)
assert.Contains(t, layout.Peers, removedPeer)
})
} }

View File

@ -8,6 +8,7 @@ import (
"io" "io"
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"net/netip"
"time" "time"
"dev.mediocregopher.com/mediocre-go-lib.git/mctx" "dev.mediocregopher.com/mediocre-go-lib.git/mctx"
@ -29,6 +30,12 @@ const (
BucketPermissionOwner BucketPermission = "owner" BucketPermissionOwner BucketPermission = "owner"
) )
// Bucket defines a bucket which has been created in a cluster
type Bucket struct {
ID BucketID `json:"id"`
GlobalAliases []string `json:"globalAliases"`
}
// AdminClientError gets returned from AdminClient Do methods for non-200 // AdminClientError gets returned from AdminClient Do methods for non-200
// errors. // errors.
type AdminClientError struct { type AdminClientError struct {
@ -152,6 +159,47 @@ func (c *AdminClient) do(
return nil return nil
} }
// KnownNode describes the fields of a known node in the cluster, as returned
// as part of [ClusterStatus].
type KnownNode struct {
ID string `json:"id"`
Addr netip.AddrPort `json:"addr"`
IsUp bool `json:"isUp"`
LastSeenSecsAgo int `json:"lastSeenSecsAgo"`
HostName string `json:"hostname"`
}
// PeerLayout describes the properties of a garage peer in the context of the
// layout of the cluster.
//
// TODO This should be called Role.
type PeerLayout struct {
ID string `json:"id"`
Capacity int `json:"capacity"` // Gb (SI units)
Zone string `json:"zone"`
Tags []string `json:"tags"`
}
// ClusterLayout describes the layout of the cluster as a whole.
type ClusterLayout struct {
Peers []PeerLayout `json:"roles"`
}
// ClusterStatus is returned from the Status endpoint, describing the currently
// known state of the cluster.
type ClusterStatus struct {
Nodes []KnownNode `json:"nodes"`
Layout ClusterLayout `json:"layout"`
}
// Status returns the current state of the cluster.
func (c *AdminClient) Status(ctx context.Context) (ClusterStatus, error) {
// https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Nodes/operation/GetNodes
var clusterStatus ClusterStatus
err := c.do(ctx, &clusterStatus, "GET", "/v1/status", nil)
return clusterStatus, err
}
// Wait will block until the instance connected to can see at least // Wait will block until the instance connected to can see at least
// ReplicationFactor other garage instances. If the context is canceled it // ReplicationFactor other garage instances. If the context is canceled it
// will return the context error. // will return the context error.
@ -160,30 +208,32 @@ func (c *AdminClient) Wait(ctx context.Context) error {
for first := true; ; first = false { for first := true; ; first = false {
if !first { if !first {
time.Sleep(250 * time.Millisecond) select {
case <-time.After(500 * time.Millisecond):
case <-ctx.Done():
return ctx.Err()
}
} }
// https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Nodes/operation/GetNodes c.logger.Debug(ctx, "Getting cluster status")
var clusterStatus struct { clusterStatus, err := c.Status(ctx)
Nodes []struct {
IsUp bool `json:"isUp"`
} `json:"nodes"`
}
err := c.do(ctx, &clusterStatus, "GET", "/v1/status", nil)
if ctxErr := ctx.Err(); ctxErr != nil { if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr return ctxErr
} else if err != nil { } else if err != nil {
c.logger.Warn(ctx, "waiting for instance to become ready", err) c.logger.Warn(ctx, "Instance is not yet ready", err)
continue continue
} }
var numUp int var numUp int
for _, node := range clusterStatus.Nodes { for _, node := range clusterStatus.Nodes {
if node.IsUp { // There seems to be some kind of step between IsUp becoming true
// and garage actually loading the full state of a node, so we check
// for the HostName as well. We could also use LastSeenSecsAgo, but
// that remains null for the node being queried so it's more
// annoying to use.
if node.IsUp && node.HostName != "" {
numUp++ numUp++
} }
} }
@ -198,7 +248,7 @@ func (c *AdminClient) Wait(ctx context.Context) error {
return nil return nil
} }
c.logger.Debug(ctx, "instance not online yet, will continue waiting") c.logger.Debug(ctx, "Instance not online yet, will continue waiting")
} }
} }
@ -249,6 +299,13 @@ func (c *AdminClient) CreateBucket(
return BucketID(res.ID), err return BucketID(res.ID), err
} }
// ListBuckets returns all buckets known to this garage node.
func (c *AdminClient) ListBuckets(ctx context.Context) ([]Bucket, error) {
var res []Bucket
err := c.do(ctx, &res, "GET", "/v1/bucket?list", nil)
return res, err
}
// GrantBucketPermissions grants the S3APICredentials with the given ID // GrantBucketPermissions grants the S3APICredentials with the given ID
// permission(s) to interact with the bucket of the given ID. // permission(s) to interact with the bucket of the given ID.
func (c *AdminClient) GrantBucketPermissions( func (c *AdminClient) GrantBucketPermissions(
@ -270,20 +327,6 @@ func (c *AdminClient) GrantBucketPermissions(
}) })
} }
// PeerLayout describes the properties of a garage peer in the context of the
// layout of the cluster.
type PeerLayout struct {
ID string `json:"id"`
Capacity int `json:"capacity"` // Gb (SI units)
Zone string `json:"zone"`
Tags []string `json:"tags"`
}
// ClusterLayout describes the layout of the cluster as a whole.
type ClusterLayout struct {
Peers []PeerLayout `json:"roles"`
}
// GetLayout returns the currently applied ClusterLayout. // GetLayout returns the currently applied ClusterLayout.
func (c *AdminClient) GetLayout(ctx context.Context) (ClusterLayout, error) { func (c *AdminClient) GetLayout(ctx context.Context) (ClusterLayout, error) {
// https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Layout/operation/GetLayout // https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Layout/operation/GetLayout