Compare commits
No commits in common. "2bf36a8ead763e78cfdfdf55a27bb42ea794ca41" and "efdab29ae66ed12087425593dfe7d757dcd63bc0" have entirely different histories.
2bf36a8ead
...
efdab29ae6
@ -6,10 +6,24 @@ import (
|
|||||||
|
|
||||||
// GaragePeers returns a Peer for each known garage instance in the network.
|
// GaragePeers returns a Peer for each known garage instance in the network.
|
||||||
func (b Bootstrap) GaragePeers() []garage.RemotePeer {
|
func (b Bootstrap) GaragePeers() []garage.RemotePeer {
|
||||||
|
|
||||||
var peers []garage.RemotePeer
|
var peers []garage.RemotePeer
|
||||||
|
|
||||||
for _, host := range b.Hosts {
|
for _, host := range b.Hosts {
|
||||||
peers = append(peers, host.GaragePeers()...)
|
|
||||||
|
for _, instance := range host.Garage.Instances {
|
||||||
|
|
||||||
|
peer := garage.RemotePeer{
|
||||||
|
ID: instance.ID,
|
||||||
|
IP: host.IP().String(),
|
||||||
|
RPCPort: instance.RPCPort,
|
||||||
|
S3APIPort: instance.S3APIPort,
|
||||||
|
}
|
||||||
|
|
||||||
|
peers = append(peers, peer)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return peers
|
return peers
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -17,9 +31,18 @@ func (b Bootstrap) GaragePeers() []garage.RemotePeer {
|
|||||||
// will prefer a garage instance on this particular host, if there is one, but
|
// will prefer a garage instance on this particular host, if there is one, but
|
||||||
// will otherwise return a random endpoint.
|
// will otherwise return a random endpoint.
|
||||||
func (b Bootstrap) ChooseGaragePeer() garage.RemotePeer {
|
func (b Bootstrap) ChooseGaragePeer() garage.RemotePeer {
|
||||||
|
|
||||||
thisHost := b.ThisHost()
|
thisHost := b.ThisHost()
|
||||||
|
|
||||||
if len(thisHost.Garage.Instances) > 0 {
|
if len(thisHost.Garage.Instances) > 0 {
|
||||||
return thisHost.GaragePeers()[0]
|
|
||||||
|
inst := thisHost.Garage.Instances[0]
|
||||||
|
return garage.RemotePeer{
|
||||||
|
ID: inst.ID,
|
||||||
|
IP: thisHost.IP().String(),
|
||||||
|
RPCPort: inst.RPCPort,
|
||||||
|
S3APIPort: inst.S3APIPort,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, peer := range b.GaragePeers() {
|
for _, peer := range b.GaragePeers() {
|
||||||
|
@ -2,7 +2,6 @@ package bootstrap
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"isle/garage"
|
|
||||||
"isle/nebula"
|
"isle/nebula"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
)
|
)
|
||||||
@ -92,18 +91,3 @@ func (h Host) IP() netip.Addr {
|
|||||||
|
|
||||||
return addr
|
return addr
|
||||||
}
|
}
|
||||||
|
|
||||||
// GaragePeers returns a RemotePeer for each garage instance advertised by this
|
|
||||||
// Host.
|
|
||||||
func (h Host) GaragePeers() []garage.RemotePeer {
|
|
||||||
var peers []garage.RemotePeer
|
|
||||||
for _, instance := range h.Garage.Instances {
|
|
||||||
peers = append(peers, garage.RemotePeer{
|
|
||||||
ID: instance.ID,
|
|
||||||
IP: h.IP().String(),
|
|
||||||
RPCPort: instance.RPCPort,
|
|
||||||
S3APIPort: instance.S3APIPort,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return peers
|
|
||||||
}
|
|
||||||
|
@ -184,6 +184,7 @@ func (c *Children) reloadNebula(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO this doesn't handle removing garage nodes
|
||||||
func (c *Children) reloadGarage(
|
func (c *Children) reloadGarage(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
networkConfig daecommon.NetworkConfig,
|
networkConfig daecommon.NetworkConfig,
|
||||||
@ -205,8 +206,6 @@ func (c *Children) reloadGarage(
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO it's possible that the config changed, but only the bootstrap
|
|
||||||
// peers, in which case we don't need to restart the node.
|
|
||||||
childConfigPath, changed, err := garageWriteChildConfig(
|
childConfigPath, changed, err := garageWriteChildConfig(
|
||||||
ctx,
|
ctx,
|
||||||
c.logger,
|
c.logger,
|
||||||
|
@ -12,7 +12,6 @@ import (
|
|||||||
|
|
||||||
_ "embed"
|
_ "embed"
|
||||||
|
|
||||||
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
|
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -61,16 +60,6 @@ type ConfigStorageAllocation struct {
|
|||||||
Zone string `yaml:"zone"`
|
Zone string `yaml:"zone"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Annotate implements the mctx.Annotator interface.
|
|
||||||
func (csa ConfigStorageAllocation) Annotate(aa mctx.Annotations) {
|
|
||||||
aa["allocDataPath"] = csa.DataPath
|
|
||||||
aa["allocMetaPath"] = csa.MetaPath
|
|
||||||
aa["allocCapacity"] = csa.Capacity
|
|
||||||
aa["allocS3APIPort"] = csa.S3APIPort
|
|
||||||
aa["allocRPCPort"] = csa.RPCPort
|
|
||||||
aa["allocAdminPort"] = csa.AdminPort
|
|
||||||
}
|
|
||||||
|
|
||||||
// NetworkConfig describes the configuration of a single network.
|
// NetworkConfig describes the configuration of a single network.
|
||||||
type NetworkConfig struct {
|
type NetworkConfig struct {
|
||||||
DNS struct {
|
DNS struct {
|
||||||
|
@ -12,16 +12,12 @@ import (
|
|||||||
"isle/nebula"
|
"isle/nebula"
|
||||||
"isle/secrets"
|
"isle/secrets"
|
||||||
"net"
|
"net"
|
||||||
"net/netip"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"slices"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
|
||||||
|
|
||||||
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
|
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
|
||||||
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
|
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
|
||||||
"github.com/minio/minio-go/v7"
|
"github.com/minio/minio-go/v7"
|
||||||
"golang.org/x/exp/maps"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Paths within garage's global bucket.
|
// Paths within garage's global bucket.
|
||||||
@ -122,8 +118,6 @@ func garageApplyLayout(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, _ = adminClient.Status(ctx) // TODO remove this
|
|
||||||
|
|
||||||
return adminClient.ApplyLayout(ctx, peers, idsToRemove)
|
return adminClient.ApplyLayout(ctx, peers, idsToRemove)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -304,109 +298,3 @@ func removeGarageBootstrapHost(
|
|||||||
ctx, garage.GlobalBucket, filePath, minio.RemoveObjectOptions{},
|
ctx, garage.GlobalBucket, filePath, minio.RemoveObjectOptions{},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We can wait for the garage instance to appear healthy, but there are cases
|
|
||||||
// where they still haven't fully synced the list of buckets and bucket
|
|
||||||
// credentials. For those cases it's necessary to do this as an additional
|
|
||||||
// check.
|
|
||||||
func garageWaitForAlloc(
|
|
||||||
ctx context.Context,
|
|
||||||
logger *mlog.Logger,
|
|
||||||
alloc daecommon.ConfigStorageAllocation,
|
|
||||||
adminToken string,
|
|
||||||
host bootstrap.Host,
|
|
||||||
) error {
|
|
||||||
var (
|
|
||||||
hostIP = host.IP().String()
|
|
||||||
adminClient = garage.NewAdminClient(
|
|
||||||
garageAdminClientLogger(logger),
|
|
||||||
net.JoinHostPort(hostIP, strconv.Itoa(alloc.AdminPort)),
|
|
||||||
adminToken,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
defer adminClient.Close()
|
|
||||||
ctx = mctx.WithAnnotator(ctx, alloc)
|
|
||||||
|
|
||||||
for {
|
|
||||||
logger.Info(ctx, "Checking if node has synced bucket list")
|
|
||||||
buckets, err := adminClient.ListBuckets(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("listing buckets: %w", err)
|
|
||||||
} else if len(buckets) == 0 {
|
|
||||||
logger.WarnString(ctx, "No buckets found, will wait a bit and try again")
|
|
||||||
select {
|
|
||||||
case <-time.After(1 * time.Second):
|
|
||||||
continue
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// garageNodeBuddyPeers returns the "buddy" peers of the given host, based on
|
|
||||||
// the given garage cluster status. It will return zero values if the host has
|
|
||||||
// no buddy.
|
|
||||||
//
|
|
||||||
// For situations where we want one host to affect the cluster layout of another
|
|
||||||
// host's peers, we use a simple system to determine a single host which is
|
|
||||||
// responsible. The goal is not to be 100% race-proof (garage handles that), but
|
|
||||||
// rather to try to prevent all hosts from modifying the same host's layout at
|
|
||||||
// the same time.
|
|
||||||
//
|
|
||||||
// The system is to order all hosts by their IP, and say that each host is
|
|
||||||
// responsible for (aka the "buddy" of) the host immediately after their own in
|
|
||||||
// that list. The last host in that list is responsible for the first.
|
|
||||||
func garageNodeBuddyPeers(
|
|
||||||
status garage.ClusterStatus, host bootstrap.Host,
|
|
||||||
) (
|
|
||||||
netip.Addr, []garage.PeerLayout,
|
|
||||||
) {
|
|
||||||
var (
|
|
||||||
thisIP = host.IP()
|
|
||||||
peersByID = make(
|
|
||||||
map[string]garage.PeerLayout, len(status.Layout.Peers),
|
|
||||||
)
|
|
||||||
nodePeersByIP = map[netip.Addr][]garage.PeerLayout{}
|
|
||||||
)
|
|
||||||
|
|
||||||
for _, peer := range status.Layout.Peers {
|
|
||||||
peersByID[peer.ID] = peer
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, node := range status.Nodes {
|
|
||||||
peer, ok := peersByID[node.ID]
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
ip := node.Addr.Addr()
|
|
||||||
nodePeersByIP[ip] = append(nodePeersByIP[ip], peer)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If there is only a single host in the cluster (or, somehow, none) then
|
|
||||||
// that host has no buddy.
|
|
||||||
if len(nodePeersByIP) < 2 {
|
|
||||||
return netip.Addr{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeIPs := maps.Keys(nodePeersByIP)
|
|
||||||
slices.SortFunc(nodeIPs, netip.Addr.Compare)
|
|
||||||
|
|
||||||
for i, nodeIP := range nodeIPs {
|
|
||||||
var buddyIP netip.Addr
|
|
||||||
if i == len(nodeIPs)-1 {
|
|
||||||
buddyIP = nodeIPs[0]
|
|
||||||
} else if nodeIP == thisIP {
|
|
||||||
buddyIP = nodeIPs[i+1]
|
|
||||||
} else {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return buddyIP, nodePeersByIP[buddyIP]
|
|
||||||
}
|
|
||||||
|
|
||||||
panic("Unreachable")
|
|
||||||
}
|
|
||||||
|
@ -23,7 +23,6 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
|
|
||||||
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
|
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
|
||||||
"golang.org/x/exp/maps"
|
"golang.org/x/exp/maps"
|
||||||
)
|
)
|
||||||
@ -35,8 +34,6 @@ type GarageClientParams struct {
|
|||||||
GlobalBucketS3APICredentials garage.S3APICredentials
|
GlobalBucketS3APICredentials garage.S3APICredentials
|
||||||
|
|
||||||
// RPCSecret may be empty, if the secret is not available on the host.
|
// RPCSecret may be empty, if the secret is not available on the host.
|
||||||
//
|
|
||||||
// TODO this shouldn't really be here I don't think, remove it?
|
|
||||||
RPCSecret string
|
RPCSecret string
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -184,15 +181,13 @@ type network struct {
|
|||||||
networkConfig daecommon.NetworkConfig
|
networkConfig daecommon.NetworkConfig
|
||||||
currBootstrap bootstrap.Bootstrap
|
currBootstrap bootstrap.Bootstrap
|
||||||
|
|
||||||
workerCtx context.Context
|
shutdownCh chan struct{}
|
||||||
workerCancel context.CancelFunc
|
wg sync.WaitGroup
|
||||||
wg sync.WaitGroup
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// instatiateNetwork returns an instantiated *network instance which has not yet
|
// instatiateNetwork returns an instantiated *network instance which has not yet
|
||||||
// been initialized.
|
// been initialized.
|
||||||
func instatiateNetwork(
|
func instatiateNetwork(
|
||||||
ctx context.Context,
|
|
||||||
logger *mlog.Logger,
|
logger *mlog.Logger,
|
||||||
networkConfig daecommon.NetworkConfig,
|
networkConfig daecommon.NetworkConfig,
|
||||||
envBinDirPath string,
|
envBinDirPath string,
|
||||||
@ -200,8 +195,6 @@ func instatiateNetwork(
|
|||||||
runtimeDir toolkit.Dir,
|
runtimeDir toolkit.Dir,
|
||||||
opts *Opts,
|
opts *Opts,
|
||||||
) *network {
|
) *network {
|
||||||
ctx = context.WithoutCancel(ctx)
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
|
||||||
return &network{
|
return &network{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
networkConfig: networkConfig,
|
networkConfig: networkConfig,
|
||||||
@ -209,8 +202,7 @@ func instatiateNetwork(
|
|||||||
stateDir: stateDir,
|
stateDir: stateDir,
|
||||||
runtimeDir: runtimeDir,
|
runtimeDir: runtimeDir,
|
||||||
opts: opts.withDefaults(),
|
opts: opts.withDefaults(),
|
||||||
workerCtx: ctx,
|
shutdownCh: make(chan struct{}),
|
||||||
workerCancel: cancel,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -252,7 +244,6 @@ func Load(
|
|||||||
Network, error,
|
Network, error,
|
||||||
) {
|
) {
|
||||||
n := instatiateNetwork(
|
n := instatiateNetwork(
|
||||||
ctx,
|
|
||||||
logger,
|
logger,
|
||||||
networkConfig,
|
networkConfig,
|
||||||
envBinDirPath,
|
envBinDirPath,
|
||||||
@ -274,7 +265,7 @@ func Load(
|
|||||||
return nil, fmt.Errorf(
|
return nil, fmt.Errorf(
|
||||||
"loading bootstrap from %q: %w", bootstrapFilePath, err,
|
"loading bootstrap from %q: %w", bootstrapFilePath, err,
|
||||||
)
|
)
|
||||||
} else if err := n.initialize(ctx, currBootstrap, false); err != nil {
|
} else if err := n.initialize(ctx, currBootstrap); err != nil {
|
||||||
return nil, fmt.Errorf("initializing with bootstrap: %w", err)
|
return nil, fmt.Errorf("initializing with bootstrap: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -298,7 +289,6 @@ func Join(
|
|||||||
Network, error,
|
Network, error,
|
||||||
) {
|
) {
|
||||||
n := instatiateNetwork(
|
n := instatiateNetwork(
|
||||||
ctx,
|
|
||||||
logger,
|
logger,
|
||||||
networkConfig,
|
networkConfig,
|
||||||
envBinDirPath,
|
envBinDirPath,
|
||||||
@ -317,7 +307,7 @@ func Join(
|
|||||||
return nil, fmt.Errorf("importing secrets: %w", err)
|
return nil, fmt.Errorf("importing secrets: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := n.initialize(ctx, joiningBootstrap.Bootstrap, false); err != nil {
|
if err := n.initialize(ctx, joiningBootstrap.Bootstrap); err != nil {
|
||||||
return nil, fmt.Errorf("initializing with bootstrap: %w", err)
|
return nil, fmt.Errorf("initializing with bootstrap: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -365,7 +355,6 @@ func Create(
|
|||||||
garageRPCSecret := toolkit.RandStr(32)
|
garageRPCSecret := toolkit.RandStr(32)
|
||||||
|
|
||||||
n := instatiateNetwork(
|
n := instatiateNetwork(
|
||||||
ctx,
|
|
||||||
logger,
|
logger,
|
||||||
networkConfig,
|
networkConfig,
|
||||||
envBinDirPath,
|
envBinDirPath,
|
||||||
@ -401,7 +390,7 @@ func Create(
|
|||||||
return nil, fmt.Errorf("initializing bootstrap data: %w", err)
|
return nil, fmt.Errorf("initializing bootstrap data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := n.initialize(ctx, hostBootstrap, true); err != nil {
|
if err := n.initialize(ctx, hostBootstrap); err != nil {
|
||||||
return nil, fmt.Errorf("initializing with bootstrap: %w", err)
|
return nil, fmt.Errorf("initializing with bootstrap: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -418,42 +407,8 @@ func (n *network) initializeDirs(mayExist bool) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *network) periodically(
|
|
||||||
label string,
|
|
||||||
fn func(context.Context) error,
|
|
||||||
period time.Duration,
|
|
||||||
) {
|
|
||||||
n.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer n.wg.Done()
|
|
||||||
|
|
||||||
ctx := mctx.Annotate(n.workerCtx, "workerLabel", label)
|
|
||||||
|
|
||||||
ticker := time.NewTicker(period)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
n.logger.Info(ctx, "Starting background job runner")
|
|
||||||
defer n.logger.Info(ctx, "Stopping background job runner")
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
|
|
||||||
case <-ticker.C:
|
|
||||||
n.logger.Info(ctx, "Background job running")
|
|
||||||
if err := fn(ctx); err != nil {
|
|
||||||
n.logger.Error(ctx, "Background job failed", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *network) initialize(
|
func (n *network) initialize(
|
||||||
ctx context.Context,
|
ctx context.Context, prevBootstrap bootstrap.Bootstrap,
|
||||||
prevBootstrap bootstrap.Bootstrap,
|
|
||||||
isCreate bool,
|
|
||||||
) error {
|
) error {
|
||||||
prevThisHost := prevBootstrap.ThisHost()
|
prevThisHost := prevBootstrap.ThisHost()
|
||||||
|
|
||||||
@ -494,37 +449,41 @@ func (n *network) initialize(
|
|||||||
|
|
||||||
n.logger.Info(ctx, "Child processes created")
|
n.logger.Info(ctx, "Child processes created")
|
||||||
|
|
||||||
createGarageGlobalBucket := isCreate
|
if err := n.postInit(ctx, prevThisHost); err != nil {
|
||||||
err = n.postChildrenInit(ctx, prevThisHost, createGarageGlobalBucket)
|
|
||||||
if err != nil {
|
|
||||||
n.logger.Error(ctx, "Post-initialization failed, stopping child processes", err)
|
n.logger.Error(ctx, "Post-initialization failed, stopping child processes", err)
|
||||||
n.children.Shutdown()
|
n.children.Shutdown()
|
||||||
return fmt.Errorf("performing post-initialization: %w", err)
|
return fmt.Errorf("performing post-initialization: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do this now so that everything is stable before returning. This also
|
// Do this now so that everything is stable before returning. This also
|
||||||
// serves a dual-purpose, as it makes sure that the PUT from the postChildrenInit
|
// serves a dual-purpose, as it makes sure that the PUT from the postInit
|
||||||
// above has propagated from the local garage instance, if there is one.
|
// above has propagated from the local garage instance, if there is one.
|
||||||
n.logger.Info(ctx, "Reloading hosts from network storage")
|
n.logger.Info(ctx, "Reloading hosts from network storage")
|
||||||
if err = n.reloadHosts(ctx); err != nil {
|
if err = n.reloadHosts(ctx); err != nil {
|
||||||
return fmt.Errorf("Reloading network bootstrap: %w", err)
|
return fmt.Errorf("Reloading network bootstrap: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
n.periodically("reloadHosts", n.reloadHosts, 3*time.Minute)
|
ctx = context.WithoutCancel(ctx)
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
n.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer n.wg.Done()
|
||||||
|
<-n.shutdownCh
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
n.periodically(
|
n.wg.Add(1)
|
||||||
"removeOrphanGarageNodes", n.removeOrphanGarageNodes, 1*time.Minute,
|
go func() {
|
||||||
)
|
defer n.wg.Done()
|
||||||
|
n.reloadLoop(ctx)
|
||||||
|
n.logger.Debug(ctx, "Daemon reload loop stopped")
|
||||||
|
}()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// postChildrenInit performs steps which are required after children have been
|
func (n *network) postInit(
|
||||||
// initialized.
|
ctx context.Context, prevThisHost bootstrap.Host,
|
||||||
func (n *network) postChildrenInit(
|
|
||||||
ctx context.Context,
|
|
||||||
prevThisHost bootstrap.Host,
|
|
||||||
createGarageGlobalBucket bool,
|
|
||||||
) error {
|
) error {
|
||||||
n.l.RLock()
|
n.l.RLock()
|
||||||
defer n.l.RUnlock()
|
defer n.l.RUnlock()
|
||||||
@ -544,7 +503,15 @@ func (n *network) postChildrenInit(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if createGarageGlobalBucket {
|
// This is only necessary during network creation, otherwise the bootstrap
|
||||||
|
// should already have these credentials built in.
|
||||||
|
//
|
||||||
|
// TODO this is pretty hacky, but there doesn't seem to be a better way to
|
||||||
|
// manage it at the moment.
|
||||||
|
_, err := daecommon.GetGarageS3APIGlobalBucketCredentials(
|
||||||
|
ctx, n.secretsStore,
|
||||||
|
)
|
||||||
|
if errors.Is(err, secrets.ErrNotFound) {
|
||||||
n.logger.Info(ctx, "Initializing garage shared global bucket")
|
n.logger.Info(ctx, "Initializing garage shared global bucket")
|
||||||
garageGlobalBucketCreds, err := garageInitializeGlobalBucket(
|
garageGlobalBucketCreds, err := garageInitializeGlobalBucket(
|
||||||
ctx,
|
ctx,
|
||||||
@ -565,18 +532,8 @@ func (n *network) postChildrenInit(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, alloc := range n.networkConfig.Storage.Allocations {
|
|
||||||
if err := garageWaitForAlloc(
|
|
||||||
ctx, n.logger, alloc, n.opts.GarageAdminToken, thisHost,
|
|
||||||
); err != nil {
|
|
||||||
return fmt.Errorf(
|
|
||||||
"waiting for alloc %+v to initialize: %w", alloc, err,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
n.logger.Info(ctx, "Updating host info in garage")
|
n.logger.Info(ctx, "Updating host info in garage")
|
||||||
err := putGarageBoostrapHost(ctx, n.secretsStore, n.currBootstrap)
|
err = putGarageBoostrapHost(ctx, n.secretsStore, n.currBootstrap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("updating host info in garage: %w", err)
|
return fmt.Errorf("updating host info in garage: %w", err)
|
||||||
}
|
}
|
||||||
@ -618,75 +575,23 @@ func (n *network) reloadHosts(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// In general each host will manage the garage cluster layout of its own storage
|
func (n *network) reloadLoop(ctx context.Context) {
|
||||||
// allocations via garageApplyLayout. There are three situations which are
|
const period = 3 * time.Minute
|
||||||
// handled here, rather than garageApplyLayout:
|
ticker := time.NewTicker(period)
|
||||||
//
|
defer ticker.Stop()
|
||||||
// - A host removes all of its allocations via SetConfig.
|
|
||||||
// - A host removes all of its allocations by calling Load with no allocations
|
|
||||||
// in the provided daecommon.NetworkConfig.
|
|
||||||
// - A host is removed from the network by another host.
|
|
||||||
//
|
|
||||||
// In all of these cases the host no longer has any garage instances running,
|
|
||||||
// and so can't call garageApplyLayout on itself. To combat this we have all
|
|
||||||
// hosts which do have garage instances running periodically check that there's
|
|
||||||
// not some garage nodes orphaned in the cluster layout, and remove them if so.
|
|
||||||
func (n *network) removeOrphanGarageNodes(ctx context.Context) error {
|
|
||||||
n.l.RLock()
|
|
||||||
defer n.l.RUnlock()
|
|
||||||
|
|
||||||
thisHost := n.currBootstrap.ThisHost()
|
for {
|
||||||
if len(thisHost.Garage.Instances) == 0 {
|
select {
|
||||||
n.logger.Info(ctx, "No local garage instances, cannot remove orphans")
|
case <-ctx.Done():
|
||||||
return nil
|
return
|
||||||
}
|
|
||||||
|
|
||||||
adminClient := newGarageAdminClient(
|
case <-ticker.C:
|
||||||
n.logger, n.networkConfig, n.opts.GarageAdminToken, thisHost,
|
if err := n.reloadHosts(ctx); err != nil {
|
||||||
)
|
n.logger.Error(ctx, "Attempting to reload", err)
|
||||||
defer adminClient.Close()
|
continue
|
||||||
|
}
|
||||||
clusterStatus, err := adminClient.Status(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("retrieving garage cluster status: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
buddyIP, buddyNodes := garageNodeBuddyPeers(clusterStatus, thisHost)
|
|
||||||
if len(buddyNodes) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx = mctx.Annotate(ctx, "buddyIP", buddyIP)
|
|
||||||
|
|
||||||
for _, host := range n.currBootstrap.Hosts {
|
|
||||||
if host.IP() != buddyIP {
|
|
||||||
continue
|
|
||||||
} else if len(host.Garage.Instances) > 0 {
|
|
||||||
n.logger.Info(ctx, "Buddy instance has garage nodes configured in its bootstrap, doing nothing")
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Either the host is no longer in the network, or it no longer has any
|
|
||||||
// garage instances set on it. Either way, remove its nodes from the cluster
|
|
||||||
// layout.
|
|
||||||
|
|
||||||
buddyNodeIDs := make([]string, len(buddyNodes))
|
|
||||||
for i, buddyNode := range buddyNodes {
|
|
||||||
buddyNodeIDs[i] = buddyNode.ID
|
|
||||||
}
|
|
||||||
|
|
||||||
n.logger.Info(ctx, "Applying garage layout to remove orphaned garage nodes")
|
|
||||||
if err := adminClient.ApplyLayout(ctx, nil, buddyNodeIDs); err != nil {
|
|
||||||
return fmt.Errorf(
|
|
||||||
"applying garage cluster layout, removing nodes %+v: %w",
|
|
||||||
buddyNodes,
|
|
||||||
err,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns the bootstrap prior to the reload being applied.
|
// returns the bootstrap prior to the reload being applied.
|
||||||
@ -1039,7 +944,7 @@ func (n *network) SetConfig(
|
|||||||
return fmt.Errorf("reloading config: %w", err)
|
return fmt.Errorf("reloading config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := n.postChildrenInit(ctx, prevBootstrap.ThisHost(), false); err != nil {
|
if err := n.postInit(ctx, prevBootstrap.ThisHost()); err != nil {
|
||||||
return fmt.Errorf("performing post-initialization: %w", err)
|
return fmt.Errorf("performing post-initialization: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1061,11 +966,10 @@ func (n *network) GetNetworkCreationParams(
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *network) Shutdown() error {
|
func (n *network) Shutdown() error {
|
||||||
n.workerCancel()
|
close(n.shutdownCh)
|
||||||
n.wg.Wait()
|
n.wg.Wait()
|
||||||
|
|
||||||
if n.children != nil {
|
if n.children != nil {
|
||||||
n.logger.Info(context.Background(), "Shutting down children")
|
|
||||||
n.children.Shutdown()
|
n.children.Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,44 +54,19 @@ func TestLoad(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestJoin(t *testing.T) {
|
func TestJoin(t *testing.T) {
|
||||||
t.Run("simple", func(t *testing.T) {
|
var (
|
||||||
var (
|
h = newIntegrationHarness(t)
|
||||||
h = newIntegrationHarness(t)
|
primus = h.createNetwork(t, "primus", nil)
|
||||||
primus = h.createNetwork(t, "primus", nil)
|
secondus = h.joinNetwork(t, primus, "secondus", nil)
|
||||||
secondus = h.joinNetwork(t, primus, "secondus", nil)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
primusHosts, err := primus.GetHosts(h.ctx)
|
primusHosts, err := primus.GetHosts(h.ctx)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
secondusHosts, err := secondus.GetHosts(h.ctx)
|
secondusHosts, err := secondus.GetHosts(h.ctx)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
assert.Equal(t, primusHosts, secondusHosts)
|
assert.Equal(t, primusHosts, secondusHosts)
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("with alloc", func(t *testing.T) {
|
|
||||||
var (
|
|
||||||
h = newIntegrationHarness(t)
|
|
||||||
primus = h.createNetwork(t, "primus", nil)
|
|
||||||
secondus = h.joinNetwork(t, primus, "secondus", &joinNetworkOpts{
|
|
||||||
networkConfigOpts: &networkConfigOpts{
|
|
||||||
numStorageAllocs: 1,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
)
|
|
||||||
|
|
||||||
t.Log("reloading primus' hosts")
|
|
||||||
assert.NoError(t, primus.Network.(*network).reloadHosts(h.ctx))
|
|
||||||
|
|
||||||
primusHosts, err := primus.GetHosts(h.ctx)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
secondusHosts, err := secondus.GetHosts(h.ctx)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
assert.Equal(t, primusHosts, secondusHosts)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNetwork_GetConfig(t *testing.T) {
|
func TestNetwork_GetConfig(t *testing.T) {
|
||||||
@ -214,33 +189,5 @@ func TestNetwork_SetConfig(t *testing.T) {
|
|||||||
assert.ElementsMatch(t, expPeers, layout.Peers)
|
assert.ElementsMatch(t, expPeers, layout.Peers)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("remove all storage allocs", func(t *testing.T) {
|
// TODO a host having allocs but removing all of them
|
||||||
var (
|
|
||||||
h = newIntegrationHarness(t)
|
|
||||||
primus = h.createNetwork(t, "primus", nil)
|
|
||||||
secondus = h.joinNetwork(t, primus, "secondus", &joinNetworkOpts{
|
|
||||||
networkConfigOpts: &networkConfigOpts{
|
|
||||||
numStorageAllocs: 1,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
networkConfig = secondus.getConfig(t)
|
|
||||||
|
|
||||||
prevHost = secondus.getHostsByName(t)[secondus.hostName]
|
|
||||||
//removedAlloc = networkConfig.Storage.Allocations[0]
|
|
||||||
removedPeer = allocsToPeerLayouts(
|
|
||||||
secondus.hostName, prevHost.Garage.Instances,
|
|
||||||
)[0]
|
|
||||||
//removedGarageInst = daecommon.BootstrapGarageHostForAlloc(
|
|
||||||
// prevHost, removedAlloc,
|
|
||||||
//)
|
|
||||||
)
|
|
||||||
|
|
||||||
networkConfig.Storage.Allocations = nil
|
|
||||||
assert.NoError(t, secondus.SetConfig(h.ctx, networkConfig))
|
|
||||||
|
|
||||||
t.Log("Checking that garage layout still contains the old allocation")
|
|
||||||
layout, err := secondus.garageAdminClient(t).GetLayout(h.ctx)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Contains(t, layout.Peers, removedPeer)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,6 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httputil"
|
"net/http/httputil"
|
||||||
"net/netip"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
|
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
|
||||||
@ -30,12 +29,6 @@ const (
|
|||||||
BucketPermissionOwner BucketPermission = "owner"
|
BucketPermissionOwner BucketPermission = "owner"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Bucket defines a bucket which has been created in a cluster
|
|
||||||
type Bucket struct {
|
|
||||||
ID BucketID `json:"id"`
|
|
||||||
GlobalAliases []string `json:"globalAliases"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// AdminClientError gets returned from AdminClient Do methods for non-200
|
// AdminClientError gets returned from AdminClient Do methods for non-200
|
||||||
// errors.
|
// errors.
|
||||||
type AdminClientError struct {
|
type AdminClientError struct {
|
||||||
@ -159,47 +152,6 @@ func (c *AdminClient) do(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// KnownNode describes the fields of a known node in the cluster, as returned
|
|
||||||
// as part of [ClusterStatus].
|
|
||||||
type KnownNode struct {
|
|
||||||
ID string `json:"id"`
|
|
||||||
Addr netip.AddrPort `json:"addr"`
|
|
||||||
IsUp bool `json:"isUp"`
|
|
||||||
LastSeenSecsAgo int `json:"lastSeenSecsAgo"`
|
|
||||||
HostName string `json:"hostname"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// PeerLayout describes the properties of a garage peer in the context of the
|
|
||||||
// layout of the cluster.
|
|
||||||
//
|
|
||||||
// TODO This should be called Role.
|
|
||||||
type PeerLayout struct {
|
|
||||||
ID string `json:"id"`
|
|
||||||
Capacity int `json:"capacity"` // Gb (SI units)
|
|
||||||
Zone string `json:"zone"`
|
|
||||||
Tags []string `json:"tags"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClusterLayout describes the layout of the cluster as a whole.
|
|
||||||
type ClusterLayout struct {
|
|
||||||
Peers []PeerLayout `json:"roles"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClusterStatus is returned from the Status endpoint, describing the currently
|
|
||||||
// known state of the cluster.
|
|
||||||
type ClusterStatus struct {
|
|
||||||
Nodes []KnownNode `json:"nodes"`
|
|
||||||
Layout ClusterLayout `json:"layout"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// Status returns the current state of the cluster.
|
|
||||||
func (c *AdminClient) Status(ctx context.Context) (ClusterStatus, error) {
|
|
||||||
// https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Nodes/operation/GetNodes
|
|
||||||
var clusterStatus ClusterStatus
|
|
||||||
err := c.do(ctx, &clusterStatus, "GET", "/v1/status", nil)
|
|
||||||
return clusterStatus, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait will block until the instance connected to can see at least
|
// Wait will block until the instance connected to can see at least
|
||||||
// ReplicationFactor other garage instances. If the context is canceled it
|
// ReplicationFactor other garage instances. If the context is canceled it
|
||||||
// will return the context error.
|
// will return the context error.
|
||||||
@ -208,32 +160,30 @@ func (c *AdminClient) Wait(ctx context.Context) error {
|
|||||||
for first := true; ; first = false {
|
for first := true; ; first = false {
|
||||||
|
|
||||||
if !first {
|
if !first {
|
||||||
select {
|
time.Sleep(250 * time.Millisecond)
|
||||||
case <-time.After(500 * time.Millisecond):
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Debug(ctx, "Getting cluster status")
|
// https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Nodes/operation/GetNodes
|
||||||
clusterStatus, err := c.Status(ctx)
|
var clusterStatus struct {
|
||||||
|
Nodes []struct {
|
||||||
|
IsUp bool `json:"isUp"`
|
||||||
|
} `json:"nodes"`
|
||||||
|
}
|
||||||
|
|
||||||
|
err := c.do(ctx, &clusterStatus, "GET", "/v1/status", nil)
|
||||||
|
|
||||||
if ctxErr := ctx.Err(); ctxErr != nil {
|
if ctxErr := ctx.Err(); ctxErr != nil {
|
||||||
return ctxErr
|
return ctxErr
|
||||||
|
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
c.logger.Warn(ctx, "Instance is not yet ready", err)
|
c.logger.Warn(ctx, "waiting for instance to become ready", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var numUp int
|
var numUp int
|
||||||
|
|
||||||
for _, node := range clusterStatus.Nodes {
|
for _, node := range clusterStatus.Nodes {
|
||||||
// There seems to be some kind of step between IsUp becoming true
|
if node.IsUp {
|
||||||
// and garage actually loading the full state of a node, so we check
|
|
||||||
// for the HostName as well. We could also use LastSeenSecsAgo, but
|
|
||||||
// that remains null for the node being queried so it's more
|
|
||||||
// annoying to use.
|
|
||||||
if node.IsUp && node.HostName != "" {
|
|
||||||
numUp++
|
numUp++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -248,7 +198,7 @@ func (c *AdminClient) Wait(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Debug(ctx, "Instance not online yet, will continue waiting")
|
c.logger.Debug(ctx, "instance not online yet, will continue waiting")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -299,13 +249,6 @@ func (c *AdminClient) CreateBucket(
|
|||||||
return BucketID(res.ID), err
|
return BucketID(res.ID), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListBuckets returns all buckets known to this garage node.
|
|
||||||
func (c *AdminClient) ListBuckets(ctx context.Context) ([]Bucket, error) {
|
|
||||||
var res []Bucket
|
|
||||||
err := c.do(ctx, &res, "GET", "/v1/bucket?list", nil)
|
|
||||||
return res, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// GrantBucketPermissions grants the S3APICredentials with the given ID
|
// GrantBucketPermissions grants the S3APICredentials with the given ID
|
||||||
// permission(s) to interact with the bucket of the given ID.
|
// permission(s) to interact with the bucket of the given ID.
|
||||||
func (c *AdminClient) GrantBucketPermissions(
|
func (c *AdminClient) GrantBucketPermissions(
|
||||||
@ -327,6 +270,20 @@ func (c *AdminClient) GrantBucketPermissions(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PeerLayout describes the properties of a garage peer in the context of the
|
||||||
|
// layout of the cluster.
|
||||||
|
type PeerLayout struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Capacity int `json:"capacity"` // Gb (SI units)
|
||||||
|
Zone string `json:"zone"`
|
||||||
|
Tags []string `json:"tags"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClusterLayout describes the layout of the cluster as a whole.
|
||||||
|
type ClusterLayout struct {
|
||||||
|
Peers []PeerLayout `json:"roles"`
|
||||||
|
}
|
||||||
|
|
||||||
// GetLayout returns the currently applied ClusterLayout.
|
// GetLayout returns the currently applied ClusterLayout.
|
||||||
func (c *AdminClient) GetLayout(ctx context.Context) (ClusterLayout, error) {
|
func (c *AdminClient) GetLayout(ctx context.Context) (ClusterLayout, error) {
|
||||||
// https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Layout/operation/GetLayout
|
// https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Layout/operation/GetLayout
|
||||||
|
Loading…
Reference in New Issue
Block a user