isle/go/daemon/network/garage.go

413 lines
9.8 KiB
Go
Raw Normal View History

package network
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"isle/bootstrap"
"isle/daemon/daecommon"
"isle/garage"
"isle/nebula"
"isle/secrets"
"net"
2024-11-05 21:31:57 +00:00
"net/netip"
"path/filepath"
2024-11-05 21:31:57 +00:00
"slices"
"strconv"
"time"
2024-06-22 15:49:56 +00:00
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
"github.com/minio/minio-go/v7"
2024-11-05 21:31:57 +00:00
"golang.org/x/exp/maps"
)
// Paths within garage's global bucket.
const (
garageGlobalBucketBootstrapHostsDirPath = "bootstrap/hosts"
)
2024-10-24 20:14:13 +00:00
func getGarageClientParams(
ctx context.Context,
secretsStore secrets.Store,
currBootstrap bootstrap.Bootstrap,
) (
GarageClientParams, error,
) {
creds, err := daecommon.GetGarageS3APIGlobalBucketCredentials(
2024-10-24 20:14:13 +00:00
ctx, secretsStore,
)
if err != nil {
return GarageClientParams{}, fmt.Errorf("getting garage global bucket creds: %w", err)
}
2024-10-24 20:14:13 +00:00
rpcSecret, err := daecommon.GetGarageRPCSecret(ctx, secretsStore)
if err != nil && !errors.Is(err, secrets.ErrNotFound) {
return GarageClientParams{}, fmt.Errorf("getting garage rpc secret: %w", err)
}
return GarageClientParams{
Peer: currBootstrap.ChooseGaragePeer(),
GlobalBucketS3APICredentials: creds,
RPCSecret: rpcSecret,
}, nil
}
func garageAdminClientLogger(logger *mlog.Logger) *mlog.Logger {
return logger.WithNamespace("garageAdminClient")
}
// newGarageAdminClient will return an AdminClient for a local garage instance,
// or it will _panic_ if there is no local instance configured.
func newGarageAdminClient(
logger *mlog.Logger,
networkConfig daecommon.NetworkConfig,
adminToken string,
host bootstrap.Host,
) *garage.AdminClient {
return garage.NewAdminClient(
garageAdminClientLogger(logger),
net.JoinHostPort(
host.IP().String(),
strconv.Itoa(networkConfig.Storage.Allocations[0].AdminPort),
),
adminToken,
)
}
func garageApplyLayout(
ctx context.Context,
logger *mlog.Logger,
networkConfig daecommon.NetworkConfig,
adminToken string,
prevHost, currHost bootstrap.Host,
) error {
var (
adminClient = newGarageAdminClient(
logger, networkConfig, adminToken, currHost,
)
hostName = currHost.Name
allocs = networkConfig.Storage.Allocations
peers = make([]garage.PeerLayout, len(allocs))
peerIDs = map[string]struct{}{}
idsToRemove = make([]string, 0, len(prevHost.Garage.Instances))
)
defer adminClient.Close()
for i, alloc := range allocs {
id := daecommon.BootstrapGarageHostForAlloc(currHost, alloc).ID
peerIDs[id] = struct{}{}
zone := string(hostName)
if alloc.Zone != "" {
zone = alloc.Zone
}
peers[i] = garage.PeerLayout{
ID: id,
Capacity: alloc.Capacity * 1_000_000_000,
Zone: zone,
Tags: []string{},
}
}
for _, prevInst := range prevHost.Garage.Instances {
if _, ok := peerIDs[prevInst.ID]; !ok {
idsToRemove = append(idsToRemove, prevInst.ID)
}
}
2024-11-05 21:31:57 +00:00
_, _ = adminClient.Status(ctx) // TODO remove this
return adminClient.ApplyLayout(ctx, peers, idsToRemove)
}
func garageInitializeGlobalBucket(
ctx context.Context,
logger *mlog.Logger,
networkConfig daecommon.NetworkConfig,
adminToken string,
host bootstrap.Host,
) (
garage.S3APICredentials, error,
) {
adminClient := newGarageAdminClient(logger, networkConfig, adminToken, host)
defer adminClient.Close()
creds, err := adminClient.CreateS3APICredentials(
ctx, garage.GlobalBucketS3APICredentialsName,
)
if err != nil {
return creds, fmt.Errorf("creating global bucket credentials: %w", err)
}
bucketID, err := adminClient.CreateBucket(ctx, garage.GlobalBucket)
if err != nil {
return creds, fmt.Errorf("creating global bucket: %w", err)
}
if err := adminClient.GrantBucketPermissions(
ctx,
bucketID,
creds.ID,
garage.BucketPermissionRead,
garage.BucketPermissionWrite,
); err != nil {
return creds, fmt.Errorf(
"granting permissions to shared global bucket key: %w", err,
)
}
return creds, nil
}
2024-10-24 20:14:13 +00:00
func getGarageBootstrapHosts(
ctx context.Context,
logger *mlog.Logger,
secretsStore secrets.Store,
currBootstrap bootstrap.Bootstrap,
) (
map[nebula.HostName]bootstrap.Host, error,
) {
2024-10-24 20:14:13 +00:00
garageClientParams, err := getGarageClientParams(
ctx, secretsStore, currBootstrap,
)
if err != nil {
return nil, fmt.Errorf("getting garage client params: %w", err)
}
var (
client = garageClientParams.GlobalBucketS3APIClient()
hosts = map[nebula.HostName]bootstrap.Host{}
objInfoCh = client.ListObjects(
ctx, garage.GlobalBucket,
minio.ListObjectsOptions{
Prefix: garageGlobalBucketBootstrapHostsDirPath,
Recursive: true,
},
)
)
defer client.Close()
for objInfo := range objInfoCh {
ctx := mctx.Annotate(ctx, "objectKey", objInfo.Key)
if objInfo.Err != nil {
return nil, fmt.Errorf("listing objects: %w", objInfo.Err)
}
obj, err := client.GetObject(
ctx, garage.GlobalBucket, objInfo.Key, minio.GetObjectOptions{},
)
if err != nil {
return nil, fmt.Errorf("retrieving object %q: %w", objInfo.Key, err)
}
var authedHost bootstrap.AuthenticatedHost
err = json.NewDecoder(obj).Decode(&authedHost)
obj.Close()
if err != nil {
2024-10-24 20:14:13 +00:00
logger.Warn(ctx, "Object contains invalid json", err)
continue
}
host, err := authedHost.Unwrap(currBootstrap.CAPublicCredentials)
if err != nil {
2024-10-24 20:14:13 +00:00
logger.Warn(ctx, "Host could not be authenticated", err)
}
hosts[host.Name] = host
}
return hosts, nil
}
2024-07-12 15:05:39 +00:00
// putGarageBoostrapHost places the <hostname>.json.signed file for this host
// into garage so that other hosts are able to see relevant configuration for
// it.
2024-10-24 20:14:13 +00:00
func putGarageBoostrapHost(
ctx context.Context,
secretsStore secrets.Store,
currBootstrap bootstrap.Bootstrap,
) error {
2024-10-24 20:14:13 +00:00
garageClientParams, err := getGarageClientParams(
ctx, secretsStore, currBootstrap,
)
if err != nil {
return fmt.Errorf("getting garage client params: %w", err)
}
var (
host = currBootstrap.ThisHost()
client = garageClientParams.GlobalBucketS3APIClient()
)
defer client.Close()
configured, err := nebula.Sign(
host.HostConfigured, currBootstrap.PrivateCredentials.SigningPrivateKey,
)
if err != nil {
return fmt.Errorf("signing host configured data: %w", err)
}
hostB, err := json.Marshal(bootstrap.AuthenticatedHost{
Assigned: currBootstrap.SignedHostAssigned,
Configured: configured,
})
if err != nil {
return fmt.Errorf("encoding host data: %w", err)
}
filePath := filepath.Join(
garageGlobalBucketBootstrapHostsDirPath,
string(host.Name)+".json.signed",
)
_, err = client.PutObject(
ctx,
garage.GlobalBucket,
filePath,
bytes.NewReader(hostB),
int64(len(hostB)),
minio.PutObjectOptions{},
)
if err != nil {
return fmt.Errorf("writing to %q in global bucket: %w", filePath, err)
}
return nil
}
2024-07-12 15:05:39 +00:00
func removeGarageBootstrapHost(
ctx context.Context, client *garage.S3APIClient, hostName nebula.HostName,
2024-07-12 15:05:39 +00:00
) error {
filePath := filepath.Join(
garageGlobalBucketBootstrapHostsDirPath,
string(hostName)+".json.signed",
)
return client.RemoveObject(
ctx, garage.GlobalBucket, filePath, minio.RemoveObjectOptions{},
)
}
// We can wait for the garage instance to appear healthy, but there are cases
// where they still haven't fully synced the list of buckets and bucket
// credentials. For those cases it's necessary to do this as an additional
// check.
func garageWaitForAlloc(
ctx context.Context,
logger *mlog.Logger,
alloc daecommon.ConfigStorageAllocation,
adminToken string,
host bootstrap.Host,
) error {
var (
hostIP = host.IP().String()
adminClient = garage.NewAdminClient(
garageAdminClientLogger(logger),
net.JoinHostPort(hostIP, strconv.Itoa(alloc.AdminPort)),
adminToken,
)
)
defer adminClient.Close()
ctx = mctx.WithAnnotator(ctx, alloc)
for {
logger.Info(ctx, "Checking if node has synced bucket list")
buckets, err := adminClient.ListBuckets(ctx)
if err != nil {
return fmt.Errorf("listing buckets: %w", err)
} else if len(buckets) == 0 {
logger.WarnString(ctx, "No buckets found, will wait a bit and try again")
select {
case <-time.After(1 * time.Second):
continue
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
}
2024-11-05 21:31:57 +00:00
// garageNodeBuddyPeers returns the "buddy" peers of the given host, based on
// the given garage cluster status. It will return zero values if the host has
// no buddy.
//
// For situations where we want one host to affect the cluster layout of another
// host's peers, we use a simple system to determine a single host which is
// responsible. The goal is not to be 100% race-proof (garage handles that), but
// rather to try to prevent all hosts from modifying the same host's layout at
// the same time.
//
// The system is to order all hosts by their IP, and say that each host is
// responsible for (aka the "buddy" of) the host immediately after their own in
// that list. The last host in that list is responsible for the first.
func garageNodeBuddyPeers(
status garage.ClusterStatus, host bootstrap.Host,
) (
netip.Addr, []garage.PeerLayout,
) {
var (
thisIP = host.IP()
peersByID = make(
map[string]garage.PeerLayout, len(status.Layout.Peers),
)
nodePeersByIP = map[netip.Addr][]garage.PeerLayout{}
)
for _, peer := range status.Layout.Peers {
peersByID[peer.ID] = peer
}
for _, node := range status.Nodes {
peer, ok := peersByID[node.ID]
if !ok {
continue
}
ip := node.Addr.Addr()
nodePeersByIP[ip] = append(nodePeersByIP[ip], peer)
}
// If there is only a single host in the cluster (or, somehow, none) then
// that host has no buddy.
if len(nodePeersByIP) < 2 {
return netip.Addr{}, nil
}
nodeIPs := maps.Keys(nodePeersByIP)
slices.SortFunc(nodeIPs, netip.Addr.Compare)
for i, nodeIP := range nodeIPs {
var buddyIP netip.Addr
if i == len(nodeIPs)-1 {
buddyIP = nodeIPs[0]
} else if nodeIP == thisIP {
buddyIP = nodeIPs[i+1]
} else {
continue
}
return buddyIP, nodePeersByIP[buddyIP]
}
panic("Unreachable")
}