Improve logic around waiting for garage to be considered ready

This commit is contained in:
Brian Picciano 2025-01-01 12:38:16 +01:00
parent e5718d75be
commit 03f187396b
15 changed files with 241 additions and 73 deletions

View File

@ -123,6 +123,8 @@ var subCmdGarageCLI = subCmd{
name: "cli",
descr: "Runs the garage binary, automatically configured to point to the garage sub-process of a running isle daemon",
do: func(ctx subCmdCtx) error {
rpcNodeAddr := ctx.flags.String("rpc-node-addr", "", "RPC address of the garage instance to hit in the form 'id@host:port'. Will default to a local garage instance if not given, or some random garage instance in the cluster if no local instance is configured.")
ctx, err := ctx.withParsedFlags(&withParsedFlagsOpts{
passthroughArgs: true,
})
@ -145,12 +147,17 @@ var subCmdGarageCLI = subCmd{
return errors.New("this host does not have the garage RPC secret")
}
chosenRPCNodeAddr := clientParams.Node.RPCNodeAddr()
if *rpcNodeAddr != "" {
chosenRPCNodeAddr = *rpcNodeAddr
}
var (
binPath = binPath("garage")
args = append([]string{"garage"}, ctx.opts.args...)
cliEnv = append(
os.Environ(),
"GARAGE_RPC_HOST="+clientParams.Node.RPCNodeAddr(),
"GARAGE_RPC_HOST="+chosenRPCNodeAddr,
"GARAGE_RPC_SECRET="+clientParams.RPCSecret,
)
)

View File

@ -17,6 +17,21 @@ import (
"isle/toolkit"
)
// Opts are optional fields which can be passed into New. A nil value is
// equivalent to a zero value.
type Opts struct {
// GarageNewCluster should be true if the garage instances being started
// are the first instances in a cluster which is being created.
GarageNewCluster bool
}
func (o *Opts) withDefaults() *Opts {
if o == nil {
o = new(Opts)
}
return o
}
// Children manages all child processes of a network. Child processes are
// comprised of:
// - nebula
@ -48,9 +63,12 @@ func New(
garageAdminToken string,
nebulaDeviceNamer *NebulaDeviceNamer,
hostBootstrap bootstrap.Bootstrap,
opts *Opts,
) (
*Children, error,
) {
opts = opts.withDefaults()
logger.Info(ctx, "Loading secrets")
garageRPCSecret, err := daecommon.GetGarageRPCSecret(ctx, secretsStore)
if err != nil && !errors.Is(err, secrets.ErrNotFound) {
@ -113,7 +131,12 @@ func New(
}
if err := waitForGarage(
ctx, c.logger, networkConfig, garageAdminToken, hostBootstrap,
ctx,
c.logger,
networkConfig,
garageAdminToken,
hostBootstrap,
opts.GarageNewCluster,
); err != nil {
logger.Warn(ctx, "Failed waiting for garage processes to initialize, shutting down child processes", err)
c.Shutdown()
@ -225,7 +248,12 @@ func (c *Children) reloadGarage(
if anyChanged {
if err := waitForGarage(
ctx, c.logger, networkConfig, c.garageAdminToken, hostBootstrap,
ctx,
c.logger,
networkConfig,
c.garageAdminToken,
hostBootstrap,
false,
); err != nil {
return fmt.Errorf("waiting for garage to start: %w", err)
}

View File

@ -27,6 +27,7 @@ func waitForGarage(
networkConfig daecommon.NetworkConfig,
adminToken string,
hostBootstrap bootstrap.Bootstrap,
newCluster bool,
) error {
allocs := networkConfig.Storage.Allocations
@ -52,7 +53,7 @@ func waitForGarage(
)
logger.Info(ctx, "Waiting for garage instance to be healthy")
if err := adminClient.Wait(ctx); err != nil {
if err := adminClient.Wait(ctx, !newCluster); err != nil {
return fmt.Errorf("waiting for garage instance %q to start up: %w", adminAddr, err)
}

View File

@ -11,6 +11,7 @@ import (
"isle/garage"
"isle/nebula"
"isle/secrets"
"isle/toolkit"
"net"
"net/netip"
"path/filepath"
@ -84,7 +85,6 @@ func garageApplyLayout(
adminToken string,
prevHost, currHost bootstrap.Host,
) error {
var (
adminClient = newGarageAdminClient(
logger, networkConfig, adminToken, currHost,
@ -321,24 +321,48 @@ func garageWaitForAlloc(
defer adminClient.Close()
ctx = mctx.WithAnnotator(ctx, alloc)
for {
logger.Info(ctx, "Checking if node has synced bucket list")
logger.Info(ctx, "Checking if garage instance has synced bucket list")
if err := toolkit.UntilTrue(
ctx,
logger,
2*time.Second,
func() (bool, error) {
buckets, err := adminClient.ListBuckets(ctx)
if err != nil {
return fmt.Errorf("listing buckets: %w", err)
return false, fmt.Errorf("listing buckets: %w", err)
} else if len(buckets) == 0 {
logger.WarnString(ctx, "No buckets found, will wait a bit and try again")
select {
case <-time.After(1 * time.Second):
continue
case <-ctx.Done():
return ctx.Err()
logger.Info(ctx, "Bucket list not yet synced")
return false, nil
}
return true, nil
},
); err != nil {
return fmt.Errorf("waiting for bucket list to sync: %w", err)
}
logger.Info(ctx, "Checking if garage instance has synced credentials")
if err := toolkit.UntilTrue(
ctx,
logger,
2*time.Second,
func() (bool, error) {
credentials, err := adminClient.ListS3APICredentials(ctx)
if err != nil {
return false, fmt.Errorf("listing credentials: %w", err)
} else if len(credentials) == 0 {
logger.Info(ctx, "Credentials not yet synced")
return false, nil
}
return true, nil
},
); err != nil {
return fmt.Errorf("waiting for credentials list to sync: %w", err)
}
return nil
}
}
// garageNodeBuddyPeers returns the "buddy" peers of the given host, based on
// the given garage cluster status. It will return zero values if the host has

View File

@ -479,6 +479,9 @@ func (n *network) initialize(
n.opts.GarageAdminToken,
n.nebulaDeviceNamer,
n.currBootstrap,
&children.Opts{
GarageNewCluster: isCreate,
},
)
if err != nil {
return fmt.Errorf("creating child processes: %w", err)

View File

@ -163,6 +163,7 @@ type ClusterLayout struct {
// ClusterStatus is returned from the Status endpoint, describing the currently
// known state of the cluster.
type ClusterStatus struct {
LayoutVersion int `json:"layoutVersion"`
Nodes []KnownNode `json:"nodes"`
}
@ -177,53 +178,63 @@ func (c *AdminClient) Status(ctx context.Context) (ClusterStatus, error) {
// Wait will block until the instance connected to can see at least
// ReplicationFactor other garage instances. If the context is canceled it
// will return the context error.
func (c *AdminClient) Wait(ctx context.Context) error {
for first := true; ; first = false {
if !first {
select {
case <-time.After(2 * time.Second):
case <-ctx.Done():
return ctx.Err()
}
}
c.logger.Debug(ctx, "Getting cluster status")
func (c *AdminClient) Wait(
ctx context.Context, clusterAlreadyInitialized bool,
) error {
c.logger.Info(ctx, "Waiting for instance to connect to other instances")
err := toolkit.UntilTrue(
ctx,
c.logger,
2*time.Second,
func() (bool, error) {
clusterStatus, err := c.Status(ctx)
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
return false, ctxErr
} else if err != nil {
ctx := mctx.Annotate(ctx, "errMsg", err.Error())
c.logger.Info(ctx, "Instance is not online yet")
continue
return false, nil
}
var numUp int
for _, node := range clusterStatus.Nodes {
// There seems to be some kind of step between IsUp becoming true
// and garage actually loading the full state of a node, so we check
// for the HostName as well. We could also use LastSeenSecsAgo, but
// that remains null for the node being queried so it's more
// annoying to use.
// There seems to be some kind of step between IsUp becoming
// true and garage actually loading the full state of a node, so
// we check for the HostName as well. We could also use
// LastSeenSecsAgo, but that remains null for the node being
// queried so it's more annoying to use.
if node.IsUp && node.HostName != "" {
numUp++
}
}
ctx := mctx.Annotate(ctx,
"layoutVersion", clusterStatus.LayoutVersion,
"numNodes", len(clusterStatus.Nodes),
"numUp", numUp,
)
if numUp >= ReplicationFactor {
c.logger.Debug(ctx, "instance appears to be online")
return nil
if clusterAlreadyInitialized && clusterStatus.LayoutVersion == 0 {
c.logger.Info(ctx, "Cluster layout has not yet propagated")
return false, nil
}
c.logger.Info(ctx, "Instance is not joined to the cluster yet")
if numUp < ReplicationFactor {
c.logger.Info(ctx, "Not enough other instances in the cluster are seen as 'up'")
return false, nil
}
return true, nil
},
)
if err != nil {
return fmt.Errorf("waiting for cluster status: %w", err)
}
return err
}
// CreateS3APICredentials creates an S3APICredentials with the given name. The
@ -252,6 +263,26 @@ func (c *AdminClient) CreateS3APICredentials(
}, err
}
// S3APICredentialsPublic are the publicly available fields of an
// S3APICredentials. These are visibile to all nodes in the cluster.
type S3APICredentialsPublic struct {
ID string
Name string
}
// ListS3APICredentials returns all S3APICredentials in the cluster, without
// returning their
func (c *AdminClient) ListS3APICredentials(
ctx context.Context,
) (
[]S3APICredentialsPublic, error,
) {
// https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Key/operation/ListKeys
var res []S3APICredentialsPublic
err := c.do(ctx, &res, "GET", "/v1/key?list", nil)
return res, err
}
// CreateBucket creates a bucket with the given global alias, returning its ID.
func (c *AdminClient) CreateBucket(
ctx context.Context, globalAlias string,

View File

@ -2,7 +2,14 @@
// specific part of isle.
package toolkit
import "reflect"
import (
"context"
"reflect"
"time"
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
)
// IsZero returns true if the value is equal to its zero value according to
// reflect.DeepEqual.
@ -15,3 +22,33 @@ func IsZero[T any](v T) bool {
func Ptr[T any](v T) *T {
return &v
}
// UntilTrue will repeatedly call the given callback, waiting the given duration
// between calls until:
// - The context is canceled, returning the context's error
// - The callback returns an error, returning that error
// - The callback returns true, returning nil
func UntilTrue(
ctx context.Context,
logger *mlog.Logger,
period time.Duration,
fn func() (bool, error),
) error {
ctx = mctx.Annotate(ctx, "retryAfter", period)
for {
if ok, err := fn(); err != nil {
return err
} else if ok {
return nil
}
logger.Info(ctx, "Waiting before retrying")
select {
case <-time.After(period):
case <-ctx.Done():
return ctx.Err()
}
}
}

View File

@ -1,5 +1,5 @@
---
type: host
type: task
---
When removing a storage allocation the new layout should be applied _before_ the

View File

@ -1,13 +0,0 @@
---
type: task
---
It seems in real world testing that this error came up after `isle storage
allocation add` had already completed:
```
ERROR [daemon/networks/micropelago.net] Background job failed errMsg="getting hosts from garage: listing objects: Forbidden: No such key: XXXXXXXXXXXXXXXXXXXXXXXXXX" workerLabel=reloadHosts
```
Isle should wait until bucket credentials are propagated to the new garage
instance before considering it to be online.

View File

@ -0,0 +1,8 @@
---
type: task
---
There is a race condition in the garage orphan removal worker. Basically when
host A is adding its first allocation, it's possible that it hasn't yet updated
the host info in the common bucket, and so host B removes it from the layout
prior to that host info being updated.

View File

@ -0,0 +1,11 @@
---
type: task
---
When SIGINT (ctrl-c) is sent to the integration tests as they are running, they
should cancel themselves and clean up all sub-processes as best as they can. A
second SIGINT can then be used to immediately kill, if for some reason cleanup
fails to occur cleanly.
Alternatively, maybe the `integration_test.sh` script can do some kind of
process cleanup.

View File

@ -0,0 +1,6 @@
---
type: task
---
On each message which is logged the test message logger (toolkit.NewTestLogger)
should include the duration since then the test started.

View File

@ -0,0 +1,11 @@
---
type: task
status: draft
---
It should be possible for a host to enable access to its daemon's RPC socket
_but only for a specific other host_.
At the same time, the `isle` tool would need to be able to specify a remote host
to operate against. How this interacts with local state (like staged firewall)
needs to be dealt with as well.

View File

@ -0,0 +1,9 @@
---
type: task
status: draft
---
It would be very convenient for debugging hosts remotely if we could see the
logs of another host. This would include CHILD logs and down (even if the daemon
is set to output a lower log level to stdout). Being able to tail the logs
remotely would be especially nice.

View File

@ -0,0 +1,5 @@
---
type: task
---
It should be possible to restart the daemon via its RPC socket.