isle/go/garage/admin_client.go

377 lines
10 KiB
Go
Raw Normal View History

package garage
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httputil"
"net/netip"
"time"
2024-06-22 15:49:56 +00:00
"dev.mediocregopher.com/mediocre-go-lib.git/mctx"
"dev.mediocregopher.com/mediocre-go-lib.git/mlog"
)
// BucketID is a unique identifier for a bucket in garage. It is different than
// the bucket's alias, since a bucket may have multiple aliases.
type BucketID string
// BucketPermission describes a permission an S3APICredentials may have when
// interacting with a bucket.
type BucketPermission string
// Enumeration of BucketPermissions.
const (
BucketPermissionRead BucketPermission = "read"
BucketPermissionWrite BucketPermission = "write"
BucketPermissionOwner BucketPermission = "owner"
)
// Bucket defines a bucket which has been created in a cluster
type Bucket struct {
ID BucketID `json:"id"`
GlobalAliases []string `json:"globalAliases"`
}
// AdminClientError gets returned from AdminClient Do methods for non-200
// errors.
type AdminClientError struct {
StatusCode int
Body []byte
}
func (e AdminClientError) Error() string {
return fmt.Sprintf("%d response from admin: %q", e.StatusCode, e.Body)
}
// AdminClient is a helper type for performing actions against the garage admin
// interface.
type AdminClient struct {
logger *mlog.Logger
c *http.Client
addr string
adminToken string
transport *http.Transport
}
// NewAdminClient initializes and returns an AdminClient which will use the
// given address and adminToken for all requests made.
//
// If Logger is nil then logs will be suppressed.
func NewAdminClient(logger *mlog.Logger, addr, adminToken string) *AdminClient {
transport := http.DefaultTransport.(*http.Transport).Clone()
return &AdminClient{
logger: logger,
c: &http.Client{
Transport: transport,
},
addr: addr,
adminToken: adminToken,
transport: transport,
}
}
// Close cleans up all resources held by the lient.
func (c *AdminClient) Close() error {
c.transport.CloseIdleConnections()
return nil
}
// do performs an HTTP request with the given method (GET, POST) and path, and
// using the json marshaling of the given body as the request body (unless body
// is nil). It will JSON unmarshal the response into rcv, unless rcv is nil.
func (c *AdminClient) do(
ctx context.Context, rcv any, method, path string, body any,
) error {
var bodyR io.Reader
if body != nil {
bodyBuf := new(bytes.Buffer)
bodyR = bodyBuf
if err := json.NewEncoder(bodyBuf).Encode(body); err != nil {
return fmt.Errorf("json marshaling body: %w", err)
}
}
urlStr := fmt.Sprintf("http://%s%s", c.addr, path)
req, err := http.NewRequestWithContext(ctx, method, urlStr, bodyR)
if err != nil {
return fmt.Errorf("initializing request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+c.adminToken)
if c.logger.MaxLevel() >= mlog.LevelDebug.Int() {
reqB, err := httputil.DumpRequestOut(req, true)
if err != nil {
c.logger.Error(ctx, "failed to dump http request", err)
} else {
c.logger.Debug(ctx, "------ request ------\n"+string(reqB)+"\n")
}
}
res, err := c.c.Do(req)
if err != nil {
return fmt.Errorf("performing http request: %w", err)
}
if c.logger.MaxLevel() >= mlog.LevelDebug.Int() {
resB, err := httputil.DumpResponse(res, true)
if err != nil {
c.logger.Error(ctx, "failed to dump http response", err)
} else {
c.logger.Debug(ctx, "------ response ------\n"+string(resB)+"\n")
}
}
defer res.Body.Close()
if res.StatusCode >= 300 {
body, _ := io.ReadAll(res.Body)
return AdminClientError{
StatusCode: res.StatusCode,
Body: body,
}
}
if rcv == nil {
if _, err := io.Copy(io.Discard, res.Body); err != nil {
return fmt.Errorf("discarding response body: %w", err)
}
return nil
}
if err := json.NewDecoder(res.Body).Decode(rcv); err != nil {
return fmt.Errorf("decoding json response body: %w", err)
}
return nil
}
// KnownNode describes the fields of a known node in the cluster, as returned
// as part of [ClusterStatus].
type KnownNode struct {
ID string `json:"id"`
Role *Role `json:"role"`
Addr netip.AddrPort `json:"addr"`
IsUp bool `json:"isUp"`
LastSeenSecsAgo int `json:"lastSeenSecsAgo"`
HostName string `json:"hostname"`
}
// Role descibes a node's role in the garage cluster, i.e. what storage it is
// providing.
type Role struct {
ID string `json:"id"`
Capacity int `json:"capacity"` // Gb (SI units)
Zone string `json:"zone"`
Tags []string `json:"tags"`
}
// ClusterLayout describes the layout of the cluster as a whole.
type ClusterLayout struct {
Version int `json:"version"`
Roles []Role `json:"roles"`
StagedRoleChanges []Role `json:"stagedRoleChanges"`
}
// ClusterStatus is returned from the Status endpoint, describing the currently
// known state of the cluster.
type ClusterStatus struct {
Nodes []KnownNode `json:"nodes"`
}
// Status returns the current state of the cluster.
func (c *AdminClient) Status(ctx context.Context) (ClusterStatus, error) {
// https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Nodes/operation/GetNodes
var clusterStatus ClusterStatus
err := c.do(ctx, &clusterStatus, "GET", "/v1/status", nil)
return clusterStatus, err
}
// Wait will block until the instance connected to can see at least
// ReplicationFactor other garage instances. If the context is canceled it
// will return the context error.
func (c *AdminClient) Wait(ctx context.Context) error {
for first := true; ; first = false {
if !first {
select {
case <-time.After(2 * time.Second):
case <-ctx.Done():
return ctx.Err()
}
}
c.logger.Debug(ctx, "Getting cluster status")
clusterStatus, err := c.Status(ctx)
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
} else if err != nil {
ctx := mctx.Annotate(ctx, "errMsg", err.Error())
c.logger.Info(ctx, "Instance is not online yet")
continue
}
var numUp int
for _, node := range clusterStatus.Nodes {
// There seems to be some kind of step between IsUp becoming true
// and garage actually loading the full state of a node, so we check
// for the HostName as well. We could also use LastSeenSecsAgo, but
// that remains null for the node being queried so it's more
// annoying to use.
if node.IsUp && node.HostName != "" {
numUp++
}
}
ctx := mctx.Annotate(ctx,
"numNodes", len(clusterStatus.Nodes),
2022-11-16 16:27:42 +00:00
"numUp", numUp,
)
if numUp >= ReplicationFactor {
c.logger.Debug(ctx, "instance appears to be online")
return nil
}
c.logger.Info(ctx, "Instance is not joined to the cluster yet")
}
}
// CreateS3APICredentials creates an S3APICredentials with the given name. The
// credentials must be associated with a bucket via a call to
// GrantBucketPermissions in order to be useful.
func (c *AdminClient) CreateS3APICredentials(
ctx context.Context, name string,
) (
S3APICredentials, error,
) {
// https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Key/operation/AddKey
var res struct {
ID string `json:"accessKeyId"`
Secret string `json:"secretAccessKey"`
}
// first attempt to import the key
err := c.do(
ctx, &res, "POST", "/v1/key", map[string]string{"name": name},
)
return S3APICredentials{
ID: res.ID,
Secret: res.Secret,
}, err
}
// CreateBucket creates a bucket with the given global alias, returning its ID.
func (c *AdminClient) CreateBucket(
ctx context.Context, globalAlias string,
) (
BucketID, error,
) {
// https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Bucket/operation/PutBucketGlobalAlias
var res struct {
ID string `json:"id"`
}
err := c.do(
ctx, &res, "POST", "/v1/bucket", map[string]string{
"globalAlias": globalAlias,
},
)
return BucketID(res.ID), err
}
// ListBuckets returns all buckets known to this garage node.
func (c *AdminClient) ListBuckets(ctx context.Context) ([]Bucket, error) {
var res []Bucket
err := c.do(ctx, &res, "GET", "/v1/bucket?list", nil)
return res, err
}
// GrantBucketPermissions grants the S3APICredentials with the given ID
// permission(s) to interact with the bucket of the given ID.
func (c *AdminClient) GrantBucketPermissions(
ctx context.Context,
bucketID BucketID,
credentialsID string,
perms ...BucketPermission,
) error {
permsMap := map[BucketPermission]bool{}
for _, perm := range perms {
permsMap[perm] = true
}
// https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Bucket/operation/AllowBucketKey
return c.do(ctx, nil, "POST", "/v1/bucket/allow", map[string]any{
"bucketId": bucketID,
"accessKeyId": credentialsID,
"permissions": permsMap,
})
}
// GetLayout returns the currently applied ClusterLayout.
func (c *AdminClient) GetLayout(ctx context.Context) (ClusterLayout, error) {
// https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Layout/operation/GetLayout
var res ClusterLayout
err := c.do(ctx, &res, "GET", "/v1/layout", nil)
return res, err
}
// ApplyLayout modifies the layout of the garage cluster. Only layout of the
// given roles will be modified/created/removed, other roles are not affected.
func (c *AdminClient) ApplyLayout(
ctx context.Context, addModifyRoles []Role, removeRoleIDs []string,
) error {
type removeRole struct {
ID string `json:"id"`
Remove bool `json:"remove"`
}
roles := make([]any, 0, len(addModifyRoles)+len(removeRoleIDs))
for _, p := range addModifyRoles {
roles = append(roles, p)
}
for _, id := range removeRoleIDs {
roles = append(roles, removeRole{ID: id, Remove: true})
}
var clusterLayout ClusterLayout
err := c.do(ctx, &clusterLayout, "POST", "/v1/layout", roles)
if err != nil {
return fmt.Errorf("staging layout changes: %w", err)
} else if len(clusterLayout.StagedRoleChanges) == 0 {
return nil
}
applyClusterLayout := struct {
Version int `json:"version"`
}{
Version: clusterLayout.Version + 1,
}
err = c.do(ctx, nil, "POST", "/v1/layout/apply", applyClusterLayout)
if err != nil {
return fmt.Errorf("applying new layout (new version:%d): %w", applyClusterLayout.Version, err)
}
return nil
}