Block for bucket list to sync across garage instances during init
This commit is contained in:
parent
8e264cf028
commit
04978fa9db
@ -6,24 +6,10 @@ import (
|
|||||||
|
|
||||||
// GaragePeers returns a Peer for each known garage instance in the network.
|
// GaragePeers returns a Peer for each known garage instance in the network.
|
||||||
func (b Bootstrap) GaragePeers() []garage.RemotePeer {
|
func (b Bootstrap) GaragePeers() []garage.RemotePeer {
|
||||||
|
|
||||||
var peers []garage.RemotePeer
|
var peers []garage.RemotePeer
|
||||||
|
|
||||||
for _, host := range b.Hosts {
|
for _, host := range b.Hosts {
|
||||||
|
peers = append(peers, host.GaragePeers()...)
|
||||||
for _, instance := range host.Garage.Instances {
|
|
||||||
|
|
||||||
peer := garage.RemotePeer{
|
|
||||||
ID: instance.ID,
|
|
||||||
IP: host.IP().String(),
|
|
||||||
RPCPort: instance.RPCPort,
|
|
||||||
S3APIPort: instance.S3APIPort,
|
|
||||||
}
|
|
||||||
|
|
||||||
peers = append(peers, peer)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return peers
|
return peers
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -31,18 +17,9 @@ func (b Bootstrap) GaragePeers() []garage.RemotePeer {
|
|||||||
// will prefer a garage instance on this particular host, if there is one, but
|
// will prefer a garage instance on this particular host, if there is one, but
|
||||||
// will otherwise return a random endpoint.
|
// will otherwise return a random endpoint.
|
||||||
func (b Bootstrap) ChooseGaragePeer() garage.RemotePeer {
|
func (b Bootstrap) ChooseGaragePeer() garage.RemotePeer {
|
||||||
|
|
||||||
thisHost := b.ThisHost()
|
thisHost := b.ThisHost()
|
||||||
|
|
||||||
if len(thisHost.Garage.Instances) > 0 {
|
if len(thisHost.Garage.Instances) > 0 {
|
||||||
|
return thisHost.GaragePeers()[0]
|
||||||
inst := thisHost.Garage.Instances[0]
|
|
||||||
return garage.RemotePeer{
|
|
||||||
ID: inst.ID,
|
|
||||||
IP: thisHost.IP().String(),
|
|
||||||
RPCPort: inst.RPCPort,
|
|
||||||
S3APIPort: inst.S3APIPort,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, peer := range b.GaragePeers() {
|
for _, peer := range b.GaragePeers() {
|
||||||
|
@ -2,6 +2,7 @@ package bootstrap
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"isle/garage"
|
||||||
"isle/nebula"
|
"isle/nebula"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
)
|
)
|
||||||
@ -91,3 +92,18 @@ func (h Host) IP() netip.Addr {
|
|||||||
|
|
||||||
return addr
|
return addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GaragePeers returns a RemotePeer for each garage instance advertised by this
|
||||||
|
// Host.
|
||||||
|
func (h Host) GaragePeers() []garage.RemotePeer {
|
||||||
|
var peers []garage.RemotePeer
|
||||||
|
for _, instance := range h.Garage.Instances {
|
||||||
|
peers = append(peers, garage.RemotePeer{
|
||||||
|
ID: instance.ID,
|
||||||
|
IP: h.IP().String(),
|
||||||
|
RPCPort: instance.RPCPort,
|
||||||
|
S3APIPort: instance.S3APIPort,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return peers
|
||||||
|
}
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
|
|
||||||
_ "embed"
|
_ "embed"
|
||||||
|
|
||||||
|
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -60,6 +61,16 @@ type ConfigStorageAllocation struct {
|
|||||||
Zone string `yaml:"zone"`
|
Zone string `yaml:"zone"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Annotate implements the mctx.Annotator interface.
|
||||||
|
func (csa ConfigStorageAllocation) Annotate(aa mctx.Annotations) {
|
||||||
|
aa["allocDataPath"] = csa.DataPath
|
||||||
|
aa["allocMetaPath"] = csa.MetaPath
|
||||||
|
aa["allocCapacity"] = csa.Capacity
|
||||||
|
aa["allocS3APIPort"] = csa.S3APIPort
|
||||||
|
aa["allocRPCPort"] = csa.RPCPort
|
||||||
|
aa["allocAdminPort"] = csa.AdminPort
|
||||||
|
}
|
||||||
|
|
||||||
// NetworkConfig describes the configuration of a single network.
|
// NetworkConfig describes the configuration of a single network.
|
||||||
type NetworkConfig struct {
|
type NetworkConfig struct {
|
||||||
DNS struct {
|
DNS struct {
|
||||||
|
@ -14,6 +14,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
|
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
|
||||||
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
|
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
|
||||||
@ -298,3 +299,45 @@ func removeGarageBootstrapHost(
|
|||||||
ctx, garage.GlobalBucket, filePath, minio.RemoveObjectOptions{},
|
ctx, garage.GlobalBucket, filePath, minio.RemoveObjectOptions{},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We can wait for the garage instance to appear healthy, but there are cases
|
||||||
|
// where they still haven't fully synced the list of buckets and bucket
|
||||||
|
// credentials. For those cases it's necessary to do this as an additional
|
||||||
|
// check.
|
||||||
|
func garageWaitForAlloc(
|
||||||
|
ctx context.Context,
|
||||||
|
logger *mlog.Logger,
|
||||||
|
alloc daecommon.ConfigStorageAllocation,
|
||||||
|
adminToken string,
|
||||||
|
host bootstrap.Host,
|
||||||
|
) error {
|
||||||
|
var (
|
||||||
|
hostIP = host.IP().String()
|
||||||
|
adminClient = garage.NewAdminClient(
|
||||||
|
garageAdminClientLogger(logger),
|
||||||
|
net.JoinHostPort(hostIP, strconv.Itoa(alloc.AdminPort)),
|
||||||
|
adminToken,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
defer adminClient.Close()
|
||||||
|
ctx = mctx.WithAnnotator(ctx, alloc)
|
||||||
|
|
||||||
|
for {
|
||||||
|
logger.Info(ctx, "Checking if node has synced bucket list")
|
||||||
|
buckets, err := adminClient.ListBuckets(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("listing buckets: %w", err)
|
||||||
|
} else if len(buckets) == 0 {
|
||||||
|
logger.WarnString(ctx, "No buckets found, will wait a bit and try again")
|
||||||
|
select {
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
continue
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -35,6 +35,8 @@ type GarageClientParams struct {
|
|||||||
GlobalBucketS3APICredentials garage.S3APICredentials
|
GlobalBucketS3APICredentials garage.S3APICredentials
|
||||||
|
|
||||||
// RPCSecret may be empty, if the secret is not available on the host.
|
// RPCSecret may be empty, if the secret is not available on the host.
|
||||||
|
//
|
||||||
|
// TODO this shouldn't really be here I don't think, remove it?
|
||||||
RPCSecret string
|
RPCSecret string
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -272,7 +274,7 @@ func Load(
|
|||||||
return nil, fmt.Errorf(
|
return nil, fmt.Errorf(
|
||||||
"loading bootstrap from %q: %w", bootstrapFilePath, err,
|
"loading bootstrap from %q: %w", bootstrapFilePath, err,
|
||||||
)
|
)
|
||||||
} else if err := n.initialize(ctx, currBootstrap); err != nil {
|
} else if err := n.initialize(ctx, currBootstrap, false); err != nil {
|
||||||
return nil, fmt.Errorf("initializing with bootstrap: %w", err)
|
return nil, fmt.Errorf("initializing with bootstrap: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -315,7 +317,7 @@ func Join(
|
|||||||
return nil, fmt.Errorf("importing secrets: %w", err)
|
return nil, fmt.Errorf("importing secrets: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := n.initialize(ctx, joiningBootstrap.Bootstrap); err != nil {
|
if err := n.initialize(ctx, joiningBootstrap.Bootstrap, false); err != nil {
|
||||||
return nil, fmt.Errorf("initializing with bootstrap: %w", err)
|
return nil, fmt.Errorf("initializing with bootstrap: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -399,7 +401,7 @@ func Create(
|
|||||||
return nil, fmt.Errorf("initializing bootstrap data: %w", err)
|
return nil, fmt.Errorf("initializing bootstrap data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := n.initialize(ctx, hostBootstrap); err != nil {
|
if err := n.initialize(ctx, hostBootstrap, true); err != nil {
|
||||||
return nil, fmt.Errorf("initializing with bootstrap: %w", err)
|
return nil, fmt.Errorf("initializing with bootstrap: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -449,7 +451,9 @@ func (n *network) periodically(
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *network) initialize(
|
func (n *network) initialize(
|
||||||
ctx context.Context, prevBootstrap bootstrap.Bootstrap,
|
ctx context.Context,
|
||||||
|
prevBootstrap bootstrap.Bootstrap,
|
||||||
|
isCreate bool,
|
||||||
) error {
|
) error {
|
||||||
prevThisHost := prevBootstrap.ThisHost()
|
prevThisHost := prevBootstrap.ThisHost()
|
||||||
|
|
||||||
@ -490,14 +494,16 @@ func (n *network) initialize(
|
|||||||
|
|
||||||
n.logger.Info(ctx, "Child processes created")
|
n.logger.Info(ctx, "Child processes created")
|
||||||
|
|
||||||
if err := n.postInit(ctx, prevThisHost); err != nil {
|
createGarageGlobalBucket := isCreate
|
||||||
|
err = n.postChildrenInit(ctx, prevThisHost, createGarageGlobalBucket)
|
||||||
|
if err != nil {
|
||||||
n.logger.Error(ctx, "Post-initialization failed, stopping child processes", err)
|
n.logger.Error(ctx, "Post-initialization failed, stopping child processes", err)
|
||||||
n.children.Shutdown()
|
n.children.Shutdown()
|
||||||
return fmt.Errorf("performing post-initialization: %w", err)
|
return fmt.Errorf("performing post-initialization: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do this now so that everything is stable before returning. This also
|
// Do this now so that everything is stable before returning. This also
|
||||||
// serves a dual-purpose, as it makes sure that the PUT from the postInit
|
// serves a dual-purpose, as it makes sure that the PUT from the postChildrenInit
|
||||||
// above has propagated from the local garage instance, if there is one.
|
// above has propagated from the local garage instance, if there is one.
|
||||||
n.logger.Info(ctx, "Reloading hosts from network storage")
|
n.logger.Info(ctx, "Reloading hosts from network storage")
|
||||||
if err = n.reloadHosts(ctx); err != nil {
|
if err = n.reloadHosts(ctx); err != nil {
|
||||||
@ -513,8 +519,12 @@ func (n *network) initialize(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *network) postInit(
|
// postChildrenInit performs steps which are required after children have been
|
||||||
ctx context.Context, prevThisHost bootstrap.Host,
|
// initialized.
|
||||||
|
func (n *network) postChildrenInit(
|
||||||
|
ctx context.Context,
|
||||||
|
prevThisHost bootstrap.Host,
|
||||||
|
createGarageGlobalBucket bool,
|
||||||
) error {
|
) error {
|
||||||
n.l.RLock()
|
n.l.RLock()
|
||||||
defer n.l.RUnlock()
|
defer n.l.RUnlock()
|
||||||
@ -534,15 +544,7 @@ func (n *network) postInit(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is only necessary during network creation, otherwise the bootstrap
|
if createGarageGlobalBucket {
|
||||||
// should already have these credentials built in.
|
|
||||||
//
|
|
||||||
// TODO this is pretty hacky, but there doesn't seem to be a better way to
|
|
||||||
// manage it at the moment.
|
|
||||||
_, err := daecommon.GetGarageS3APIGlobalBucketCredentials(
|
|
||||||
ctx, n.secretsStore,
|
|
||||||
)
|
|
||||||
if errors.Is(err, secrets.ErrNotFound) {
|
|
||||||
n.logger.Info(ctx, "Initializing garage shared global bucket")
|
n.logger.Info(ctx, "Initializing garage shared global bucket")
|
||||||
garageGlobalBucketCreds, err := garageInitializeGlobalBucket(
|
garageGlobalBucketCreds, err := garageInitializeGlobalBucket(
|
||||||
ctx,
|
ctx,
|
||||||
@ -563,8 +565,18 @@ func (n *network) postInit(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, alloc := range n.networkConfig.Storage.Allocations {
|
||||||
|
if err := garageWaitForAlloc(
|
||||||
|
ctx, n.logger, alloc, n.opts.GarageAdminToken, thisHost,
|
||||||
|
); err != nil {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"waiting for alloc %+v to initialize: %w", alloc, err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
n.logger.Info(ctx, "Updating host info in garage")
|
n.logger.Info(ctx, "Updating host info in garage")
|
||||||
err = putGarageBoostrapHost(ctx, n.secretsStore, n.currBootstrap)
|
err := putGarageBoostrapHost(ctx, n.secretsStore, n.currBootstrap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("updating host info in garage: %w", err)
|
return fmt.Errorf("updating host info in garage: %w", err)
|
||||||
}
|
}
|
||||||
@ -956,7 +968,7 @@ func (n *network) SetConfig(
|
|||||||
return fmt.Errorf("reloading config: %w", err)
|
return fmt.Errorf("reloading config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := n.postInit(ctx, prevBootstrap.ThisHost()); err != nil {
|
if err := n.postChildrenInit(ctx, prevBootstrap.ThisHost(), false); err != nil {
|
||||||
return fmt.Errorf("performing post-initialization: %w", err)
|
return fmt.Errorf("performing post-initialization: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,6 +29,12 @@ const (
|
|||||||
BucketPermissionOwner BucketPermission = "owner"
|
BucketPermissionOwner BucketPermission = "owner"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Bucket defines a bucket which has been created in a cluster
|
||||||
|
type Bucket struct {
|
||||||
|
ID BucketID `json:"id"`
|
||||||
|
GlobalAliases []string `json:"globalAliases"`
|
||||||
|
}
|
||||||
|
|
||||||
// AdminClientError gets returned from AdminClient Do methods for non-200
|
// AdminClientError gets returned from AdminClient Do methods for non-200
|
||||||
// errors.
|
// errors.
|
||||||
type AdminClientError struct {
|
type AdminClientError struct {
|
||||||
@ -249,6 +255,13 @@ func (c *AdminClient) CreateBucket(
|
|||||||
return BucketID(res.ID), err
|
return BucketID(res.ID), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListBuckets returns all buckets known to this garage node.
|
||||||
|
func (c *AdminClient) ListBuckets(ctx context.Context) ([]Bucket, error) {
|
||||||
|
var res []Bucket
|
||||||
|
err := c.do(ctx, &res, "GET", "/v1/bucket?list", nil)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
// GrantBucketPermissions grants the S3APICredentials with the given ID
|
// GrantBucketPermissions grants the S3APICredentials with the given ID
|
||||||
// permission(s) to interact with the bucket of the given ID.
|
// permission(s) to interact with the bucket of the given ID.
|
||||||
func (c *AdminClient) GrantBucketPermissions(
|
func (c *AdminClient) GrantBucketPermissions(
|
||||||
|
Loading…
Reference in New Issue
Block a user