You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
isle/entrypoint/src/cmd/entrypoint/garage_util.go

315 lines
7.5 KiB

package main
import (
"context"
"isle/bootstrap"
"isle/daemon"
"isle/garage"
"fmt"
"net"
"path/filepath"
"strconv"
"code.betamike.com/micropelago/pmux/pmuxlib"
"github.com/mediocregopher/mediocre-go-lib/v2/mctx"
"github.com/mediocregopher/mediocre-go-lib/v2/mlog"
)
func garageAdminClientLogger(logger *mlog.Logger) *mlog.Logger {
return logger.WithNamespace("garageAdminClient")
}
// newGarageAdminClient will return an AdminClient for a local garage instance,
// or it will _panic_ if there is no local instance configured.
func newGarageAdminClient(
logger *mlog.Logger,
hostBootstrap bootstrap.Bootstrap,
daemonConfig daemon.Config,
) *garage.AdminClient {
thisHost := hostBootstrap.ThisHost()
return garage.NewAdminClient(
net.JoinHostPort(
thisHost.IP().String(),
strconv.Itoa(daemonConfig.Storage.Allocations[0].AdminPort),
),
hostBootstrap.Garage.AdminToken,
garageAdminClientLogger(logger),
)
}
func waitForGarageAndNebula(
ctx context.Context,
logger *mlog.Logger,
hostBootstrap bootstrap.Bootstrap,
daemonConfig daemon.Config,
) error {
if err := waitForNebula(ctx, hostBootstrap); err != nil {
return fmt.Errorf("waiting for nebula to start: %w", err)
}
allocs := daemonConfig.Storage.Allocations
// if this host doesn't have any allocations specified then fall back to
// waiting for nebula
if len(allocs) == 0 {
return nil
}
adminClientLogger := garageAdminClientLogger(logger)
for _, alloc := range allocs {
adminAddr := net.JoinHostPort(
hostBootstrap.ThisHost().IP().String(),
strconv.Itoa(alloc.AdminPort),
)
adminClient := garage.NewAdminClient(
adminAddr,
hostBootstrap.Garage.AdminToken,
adminClientLogger,
)
ctx := mctx.Annotate(ctx, "garageAdminAddr", adminAddr)
logger.Debug(ctx, "wating for garage instance to start")
if err := adminClient.Wait(ctx); err != nil {
return fmt.Errorf("waiting for garage instance %q to start up: %w", adminAddr, err)
}
}
return nil
}
// bootstrapGarageHostForAlloc returns the bootstrap.GarageHostInstance which
// corresponds with the given alloc from the daemon config. This will panic if
// no associated instance can be found.
//
// This assumes that coalesceDaemonConfigAndBootstrap has already been called.
func bootstrapGarageHostForAlloc(
host bootstrap.Host,
alloc daemon.ConfigStorageAllocation,
) bootstrap.GarageHostInstance {
for _, inst := range host.Garage.Instances {
if inst.RPCPort == alloc.RPCPort {
return inst
}
}
panic(fmt.Sprintf("could not find alloc %+v in the bootstrap data", alloc))
}
func garageWriteChildConfig(
hostBootstrap bootstrap.Bootstrap,
alloc daemon.ConfigStorageAllocation,
) (
string, error,
) {
thisHost := hostBootstrap.ThisHost()
id := bootstrapGarageHostForAlloc(thisHost, alloc).ID
peer := garage.LocalPeer{
RemotePeer: garage.RemotePeer{
ID: id,
IP: thisHost.IP().String(),
RPCPort: alloc.RPCPort,
S3APIPort: alloc.S3APIPort,
},
AdminPort: alloc.AdminPort,
}
garageTomlPath := filepath.Join(
envRuntimeDirPath, fmt.Sprintf("garage-%d.toml", alloc.RPCPort),
)
err := garage.WriteGarageTomlFile(garageTomlPath, garage.GarageTomlData{
MetaPath: alloc.MetaPath,
DataPath: alloc.DataPath,
RPCSecret: hostBootstrap.Garage.RPCSecret,
AdminToken: hostBootstrap.Garage.AdminToken,
RPCAddr: peer.RPCAddr(),
S3APIAddr: peer.S3APIAddr(),
AdminAddr: peer.AdminAddr(),
BootstrapPeers: hostBootstrap.GarageRPCPeerAddrs(),
})
if err != nil {
return "", fmt.Errorf("creating garage.toml file at %q: %w", garageTomlPath, err)
}
return garageTomlPath, nil
}
func garagePmuxProcConfigs(
hostBootstrap bootstrap.Bootstrap,
daemonConfig daemon.Config,
) (
[]pmuxlib.ProcessConfig, error,
) {
var pmuxProcConfigs []pmuxlib.ProcessConfig
for _, alloc := range daemonConfig.Storage.Allocations {
childConfigPath, err := garageWriteChildConfig(hostBootstrap, alloc)
if err != nil {
return nil, fmt.Errorf("writing child config file for alloc %+v: %w", alloc, err)
}
pmuxProcConfigs = append(pmuxProcConfigs, pmuxlib.ProcessConfig{
Name: fmt.Sprintf("garage-%d", alloc.RPCPort),
Cmd: binPath("garage"),
Args: []string{"-c", childConfigPath, "server"},
StartAfterFunc: func(ctx context.Context) error {
return waitForNebula(ctx, hostBootstrap)
},
})
}
return pmuxProcConfigs, nil
}
func garageInitializeGlobalBucket(
ctx context.Context,
logger *mlog.Logger,
hostBootstrap bootstrap.Bootstrap,
daemonConfig daemon.Config,
) error {
var (
adminClient = newGarageAdminClient(logger, hostBootstrap, daemonConfig)
globalBucketCreds = hostBootstrap.Garage.GlobalBucketS3APICredentials
)
// first attempt to import the key
err := adminClient.Do(ctx, nil, "POST", "/v0/key/import", map[string]string{
"accessKeyId": globalBucketCreds.ID,
"secretAccessKey": globalBucketCreds.Secret,
"name": "shared-global-bucket-key",
})
if err != nil {
return fmt.Errorf("importing global bucket key into garage: %w", err)
}
// create global bucket
err = adminClient.Do(ctx, nil, "POST", "/v0/bucket", map[string]string{
"globalAlias": garage.GlobalBucket,
})
if err != nil {
return fmt.Errorf("creating global bucket: %w", err)
}
// retrieve newly created bucket's id
var getBucketRes struct {
ID string `json:"id"`
}
err = adminClient.Do(
ctx, &getBucketRes,
"GET", "/v0/bucket?globalAlias="+garage.GlobalBucket, nil,
)
if err != nil {
return fmt.Errorf("fetching global bucket id: %w", err)
}
// allow shared global bucket key to perform all operations
err = adminClient.Do(ctx, nil, "POST", "/v0/bucket/allow", map[string]interface{}{
"bucketId": getBucketRes.ID,
"accessKeyId": globalBucketCreds.ID,
"permissions": map[string]bool{
"read": true,
"write": true,
},
})
if err != nil {
return fmt.Errorf("granting permissions to shared global bucket key: %w", err)
}
return nil
}
func garageApplyLayout(
ctx context.Context,
logger *mlog.Logger,
hostBootstrap bootstrap.Bootstrap,
daemonConfig daemon.Config,
) error {
var (
adminClient = newGarageAdminClient(logger, hostBootstrap, daemonConfig)
thisHost = hostBootstrap.ThisHost()
hostName = thisHost.Name
allocs = daemonConfig.Storage.Allocations
)
type peerLayout struct {
Capacity int `json:"capacity"`
Zone string `json:"zone"`
Tags []string `json:"tags"`
}
{
clusterLayout := map[string]peerLayout{}
for _, alloc := range allocs {
id := bootstrapGarageHostForAlloc(thisHost, alloc).ID
zone := hostName
if alloc.Zone != "" {
zone = alloc.Zone
}
clusterLayout[id] = peerLayout{
Capacity: alloc.Capacity,
Zone: zone,
Tags: []string{},
}
}
err := adminClient.Do(ctx, nil, "POST", "/v0/layout", clusterLayout)
if err != nil {
return fmt.Errorf("staging layout changes: %w", err)
}
}
var clusterLayout struct {
Version int `json:"version"`
StagedRoleChanges map[string]peerLayout `json:"stagedRoleChanges"`
}
if err := adminClient.Do(ctx, &clusterLayout, "GET", "/v0/layout", nil); err != nil {
return fmt.Errorf("retrieving staged layout change: %w", err)
}
if len(clusterLayout.StagedRoleChanges) == 0 {
return nil
}
applyClusterLayout := struct {
Version int `json:"version"`
}{
Version: clusterLayout.Version + 1,
}
err := adminClient.Do(ctx, nil, "POST", "/v0/layout/apply", applyClusterLayout)
if err != nil {
return fmt.Errorf("applying new layout (new version:%d): %w", applyClusterLayout.Version, err)
}
return nil
}