package garage import ( "bytes" "context" "encoding/json" "fmt" "io" "isle/toolkit" "net/http" "net/netip" "time" "dev.mediocregopher.com/mediocre-go-lib.git/mctx" "dev.mediocregopher.com/mediocre-go-lib.git/mlog" ) // BucketID is a unique identifier for a bucket in garage. It is different than // the bucket's alias, since a bucket may have multiple aliases. type BucketID string // BucketPermission describes a permission an S3APICredentials may have when // interacting with a bucket. type BucketPermission string // Enumeration of BucketPermissions. const ( BucketPermissionRead BucketPermission = "read" BucketPermissionWrite BucketPermission = "write" BucketPermissionOwner BucketPermission = "owner" ) // Bucket defines a bucket which has been created in a cluster type Bucket struct { ID BucketID `json:"id"` GlobalAliases []string `json:"globalAliases"` } // AdminClientError gets returned from AdminClient Do methods for non-200 // errors. type AdminClientError struct { StatusCode int Body []byte } func (e AdminClientError) Error() string { return fmt.Sprintf("%d response from admin: %q", e.StatusCode, e.Body) } // AdminClient is a helper type for performing actions against the garage admin // interface. type AdminClient struct { logger *mlog.Logger c toolkit.HTTPClient addr string adminToken string } // NewAdminClient initializes and returns an AdminClient which will use the // given address and adminToken for all requests made. // // If Logger is nil then logs will be suppressed. func NewAdminClient( logger *mlog.Logger, addr, adminToken string, ) *AdminClient { httpClient := toolkit.NewHTTPClient(logger.WithNamespace("http")) return &AdminClient{ logger: logger, c: httpClient, addr: addr, adminToken: adminToken, } } // Close cleans up all resources held by the lient. func (c *AdminClient) Close() error { return c.c.Close() } // do performs an HTTP request with the given method (GET, POST) and path, and // using the json marshaling of the given body as the request body (unless body // is nil). It will JSON unmarshal the response into rcv, unless rcv is nil. func (c *AdminClient) do( ctx context.Context, rcv any, method, path string, body any, ) error { var bodyR io.Reader if body != nil { bodyBuf := new(bytes.Buffer) bodyR = bodyBuf if err := json.NewEncoder(bodyBuf).Encode(body); err != nil { return fmt.Errorf("json marshaling body: %w", err) } } urlStr := fmt.Sprintf("http://%s%s", c.addr, path) req, err := http.NewRequestWithContext(ctx, method, urlStr, bodyR) if err != nil { return fmt.Errorf("initializing request: %w", err) } req.Header.Set("Authorization", "Bearer "+c.adminToken) res, err := c.c.Do(req) if err != nil { return fmt.Errorf("performing http request: %w", err) } defer res.Body.Close() if res.StatusCode >= 300 { body, _ := io.ReadAll(res.Body) return AdminClientError{ StatusCode: res.StatusCode, Body: body, } } if rcv == nil { if _, err := io.Copy(io.Discard, res.Body); err != nil { return fmt.Errorf("discarding response body: %w", err) } return nil } if err := json.NewDecoder(res.Body).Decode(rcv); err != nil { return fmt.Errorf("decoding json response body: %w", err) } return nil } // KnownNode describes the fields of a known node in the cluster, as returned // as part of [ClusterStatus]. type KnownNode struct { ID string `json:"id"` Role *Role `json:"role"` Addr netip.AddrPort `json:"addr"` IsUp bool `json:"isUp"` LastSeenSecsAgo int `json:"lastSeenSecsAgo"` Draining bool `json:"draining"` HostName string `json:"hostname"` } // Role descibes a node's role in the garage cluster, i.e. what storage it is // providing. type Role struct { ID string `json:"id"` Capacity int `json:"capacity"` // Gb (SI units) Zone string `json:"zone"` Tags []string `json:"tags"` } // ClusterLayout describes the layout of the cluster as a whole. type ClusterLayout struct { Version int `json:"version"` Roles []Role `json:"roles"` StagedRoleChanges []Role `json:"stagedRoleChanges"` } // ClusterStatus is returned from the Status endpoint, describing the currently // known state of the cluster. type ClusterStatus struct { LayoutVersion int `json:"layoutVersion"` Nodes []KnownNode `json:"nodes"` } // Status returns the current state of the cluster. func (c *AdminClient) Status(ctx context.Context) (ClusterStatus, error) { // https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Nodes/operation/GetNodes var clusterStatus ClusterStatus err := c.do(ctx, &clusterStatus, "GET", "/v1/status", nil) return clusterStatus, err } // Wait will block until the instance connected to can see at least // ReplicationFactor other garage instances. If the context is canceled it // will return the context error. func (c *AdminClient) Wait( ctx context.Context, clusterAlreadyInitialized bool, ) error { c.logger.Info(ctx, "Waiting for instance to connect to other instances") err := toolkit.UntilTrue( ctx, c.logger, 2*time.Second, func() (bool, error) { clusterStatus, err := c.Status(ctx) if ctxErr := ctx.Err(); ctxErr != nil { return false, ctxErr } else if err != nil { ctx := mctx.Annotate(ctx, "errMsg", err.Error()) c.logger.Info(ctx, "Instance is not online yet") return false, nil } var numUp int for _, node := range clusterStatus.Nodes { // There seems to be some kind of step between IsUp becoming // true and garage actually loading the full state of a node, so // we check for the HostName as well. We could also use // LastSeenSecsAgo, but that remains null for the node being // queried so it's more annoying to use. if node.IsUp && node.HostName != "" { numUp++ } } ctx := mctx.Annotate(ctx, "layoutVersion", clusterStatus.LayoutVersion, "numNodes", len(clusterStatus.Nodes), "numUp", numUp, ) if clusterAlreadyInitialized && clusterStatus.LayoutVersion == 0 { c.logger.Info(ctx, "Cluster layout has not yet propagated") return false, nil } if numUp < ReplicationFactor { c.logger.Info(ctx, "Not enough other instances in the cluster are seen as 'up'") return false, nil } return true, nil }, ) if err != nil { return fmt.Errorf("waiting for cluster status: %w", err) } return err } // CreateS3APICredentials creates an S3APICredentials with the given name. The // credentials must be associated with a bucket via a call to // GrantBucketPermissions in order to be useful. func (c *AdminClient) CreateS3APICredentials( ctx context.Context, name string, ) ( S3APICredentials, error, ) { // https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Key/operation/AddKey var res struct { ID string `json:"accessKeyId"` Secret string `json:"secretAccessKey"` } // first attempt to import the key err := c.do( ctx, &res, "POST", "/v1/key", map[string]string{"name": name}, ) return S3APICredentials{ ID: res.ID, Secret: res.Secret, }, err } // S3APICredentialsPublic are the publicly available fields of an // S3APICredentials. These are visibile to all nodes in the cluster. type S3APICredentialsPublic struct { ID string Name string } // ListS3APICredentials returns all S3APICredentials in the cluster, without // returning their func (c *AdminClient) ListS3APICredentials( ctx context.Context, ) ( []S3APICredentialsPublic, error, ) { // https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Key/operation/ListKeys var res []S3APICredentialsPublic err := c.do(ctx, &res, "GET", "/v1/key?list", nil) return res, err } // CreateBucket creates a bucket with the given global alias, returning its ID. func (c *AdminClient) CreateBucket( ctx context.Context, globalAlias string, ) ( BucketID, error, ) { // https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Bucket/operation/PutBucketGlobalAlias var res struct { ID string `json:"id"` } err := c.do( ctx, &res, "POST", "/v1/bucket", map[string]string{ "globalAlias": globalAlias, }, ) return BucketID(res.ID), err } // ListBuckets returns all buckets known to this garage node. func (c *AdminClient) ListBuckets(ctx context.Context) ([]Bucket, error) { var res []Bucket err := c.do(ctx, &res, "GET", "/v1/bucket?list", nil) return res, err } // GrantBucketPermissions grants the S3APICredentials with the given ID // permission(s) to interact with the bucket of the given ID. func (c *AdminClient) GrantBucketPermissions( ctx context.Context, bucketID BucketID, credentialsID string, perms ...BucketPermission, ) error { permsMap := map[BucketPermission]bool{} for _, perm := range perms { permsMap[perm] = true } // https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Bucket/operation/AllowBucketKey return c.do(ctx, nil, "POST", "/v1/bucket/allow", map[string]any{ "bucketId": bucketID, "accessKeyId": credentialsID, "permissions": permsMap, }) } // GetLayout returns the currently applied ClusterLayout. func (c *AdminClient) GetLayout(ctx context.Context) (ClusterLayout, error) { // https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Layout/operation/GetLayout var res ClusterLayout err := c.do(ctx, &res, "GET", "/v1/layout", nil) return res, err } // ApplyLayout modifies the layout of the garage cluster. Only layout of the // given roles will be modified/created/removed, other roles are not affected. func (c *AdminClient) ApplyLayout( ctx context.Context, addModifyRoles []Role, removeRoleIDs []string, ) error { type removeRole struct { ID string `json:"id"` Remove bool `json:"remove"` } roles := make([]any, 0, len(addModifyRoles)+len(removeRoleIDs)) for _, p := range addModifyRoles { roles = append(roles, p) } for _, id := range removeRoleIDs { roles = append(roles, removeRole{ID: id, Remove: true}) } var clusterLayout ClusterLayout err := c.do(ctx, &clusterLayout, "POST", "/v1/layout", roles) if err != nil { return fmt.Errorf("staging layout changes: %w", err) } else if len(clusterLayout.StagedRoleChanges) == 0 { return nil } applyClusterLayout := struct { Version int `json:"version"` }{ Version: clusterLayout.Version + 1, } err = c.do(ctx, nil, "POST", "/v1/layout/apply", applyClusterLayout) if err != nil { return fmt.Errorf("applying new layout (new version:%d): %w", applyClusterLayout.Version, err) } return nil }