package garage import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "net/http/httputil" "time" "dev.mediocregopher.com/mediocre-go-lib.git/mctx" "dev.mediocregopher.com/mediocre-go-lib.git/mlog" ) // BucketID is a unique identifier for a bucket in garage. It is different than // the bucket's alias, since a bucket may have multiple aliases. type BucketID string // BucketPermission describes a permission an S3APICredentials may have when // interacting with a bucket. type BucketPermission string // Enumeration of BucketPermissions. const ( BucketPermissionRead BucketPermission = "read" BucketPermissionWrite BucketPermission = "write" BucketPermissionOwner BucketPermission = "owner" ) // AdminClientError gets returned from AdminClient Do methods for non-200 // errors. type AdminClientError struct { StatusCode int Body []byte } func (e AdminClientError) Error() string { return fmt.Sprintf("%d response from admin: %q", e.StatusCode, e.Body) } // AdminClient is a helper type for performing actions against the garage admin // interface. type AdminClient struct { logger *mlog.Logger c *http.Client addr string adminToken string } // NewAdminClient initializes and returns an AdminClient which will use the // given address and adminToken for all requests made. // // If Logger is nil then logs will be suppressed. func NewAdminClient(logger *mlog.Logger, addr, adminToken string) *AdminClient { return &AdminClient{ logger: logger, c: &http.Client{ Transport: http.DefaultTransport.(*http.Transport).Clone(), }, addr: addr, adminToken: adminToken, } } // do performs an HTTP request with the given method (GET, POST) and path, and // using the json marshaling of the given body as the request body (unless body // is nil). It will JSON unmarshal the response into rcv, unless rcv is nil. func (c *AdminClient) do( ctx context.Context, rcv any, method, path string, body any, ) error { var bodyR io.Reader if body != nil { bodyBuf := new(bytes.Buffer) bodyR = bodyBuf if err := json.NewEncoder(bodyBuf).Encode(body); err != nil { return fmt.Errorf("json marshaling body: %w", err) } } urlStr := fmt.Sprintf("http://%s%s", c.addr, path) req, err := http.NewRequestWithContext(ctx, method, urlStr, bodyR) if err != nil { return fmt.Errorf("initializing request: %w", err) } req.Header.Set("Authorization", "Bearer "+c.adminToken) if c.logger.MaxLevel() >= mlog.LevelDebug.Int() { reqB, err := httputil.DumpRequestOut(req, true) if err != nil { c.logger.Error(ctx, "failed to dump http request", err) } else { c.logger.Debug(ctx, "------ request ------\n"+string(reqB)+"\n") } } res, err := c.c.Do(req) if err != nil { return fmt.Errorf("performing http request: %w", err) } if c.logger.MaxLevel() >= mlog.LevelDebug.Int() { resB, err := httputil.DumpResponse(res, true) if err != nil { c.logger.Error(ctx, "failed to dump http response", err) } else { c.logger.Debug(ctx, "------ response ------\n"+string(resB)+"\n") } } defer res.Body.Close() if res.StatusCode >= 300 { body, _ := io.ReadAll(res.Body) return AdminClientError{ StatusCode: res.StatusCode, Body: body, } } if rcv == nil { if _, err := io.Copy(io.Discard, res.Body); err != nil { return fmt.Errorf("discarding response body: %w", err) } return nil } if err := json.NewDecoder(res.Body).Decode(rcv); err != nil { return fmt.Errorf("decoding json response body: %w", err) } return nil } // Wait will block until the instance connected to can see at least // ReplicationFactor-1 other garage instances. If the context is canceled it // will return the context error. func (c *AdminClient) Wait(ctx context.Context) error { for first := true; ; first = false { if !first { time.Sleep(250 * time.Millisecond) } // https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Nodes/operation/GetNodes var clusterStatus struct { Nodes []struct { IsUp bool `json:"isUp"` } `json:"nodes"` } err := c.do(ctx, &clusterStatus, "GET", "/v1/status", nil) if ctxErr := ctx.Err(); ctxErr != nil { return ctxErr } else if err != nil { c.logger.Warn(ctx, "waiting for instance to become ready", err) continue } var numUp int for _, node := range clusterStatus.Nodes { if node.IsUp { numUp++ } } ctx := mctx.Annotate(ctx, "numNodes", len(clusterStatus.Nodes), "numUp", numUp, ) if numUp >= ReplicationFactor-1 { c.logger.Debug(ctx, "instance appears to be online") return nil } c.logger.Debug(ctx, "instance not online yet, will continue waiting") } } // CreateS3APICredentials creates an S3APICredentials with the given name. The // credentials must be associated with a bucket via a call to // GrantBucketPermissions in order to be useful. func (c *AdminClient) CreateS3APICredentials( ctx context.Context, name string, ) ( S3APICredentials, error, ) { // https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Key/operation/AddKey var res struct { ID string `json:"accessKeyId"` Secret string `json:"secretAccessKey"` } // first attempt to import the key err := c.do( ctx, &res, "POST", "/v1/key", map[string]string{"name": name}, ) return S3APICredentials{ ID: res.ID, Secret: res.Secret, }, err } // CreateBucket creates a bucket with the given global alias, returning its ID. func (c *AdminClient) CreateBucket( ctx context.Context, globalAlias string, ) ( BucketID, error, ) { // https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Bucket/operation/PutBucketGlobalAlias var res struct { ID string `json:"id"` } err := c.do( ctx, &res, "POST", "/v1/bucket", map[string]string{ "globalAlias": globalAlias, }, ) return BucketID(res.ID), err } // GrantBucketPermissions grants the S3APICredentials with the given ID // permission(s) to interact with the bucket of the given ID. func (c *AdminClient) GrantBucketPermissions( ctx context.Context, bucketID BucketID, credentialsID string, perms ...BucketPermission, ) error { permsMap := map[BucketPermission]bool{} for _, perm := range perms { permsMap[perm] = true } // https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Bucket/operation/AllowBucketKey return c.do(ctx, nil, "POST", "/v1/bucket/allow", map[string]any{ "bucketId": bucketID, "accessKeyId": credentialsID, "permissions": permsMap, }) } // PeerLayout describes the properties of a garage peer in the context of the // layout of the cluster. type PeerLayout struct { ID string Capacity int // Gb (SI units) Zone string Tags []string } // ApplyLayout modifies the layout of the garage cluster. Only layout of the // given peers will be modified/created, other peers are not affected. func (c *AdminClient) ApplyLayout( ctx context.Context, peers []PeerLayout, ) error { type peerLayout struct { ID string `json:"id"` Capacity int `json:"capacity"` Zone string `json:"zone"` Tags []string `json:"tags"` } { // https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Layout/operation/ApplyLayout clusterLayout := make([]peerLayout, len(peers)) for i := range peers { clusterLayout[i] = peerLayout(peers[i]) } err := c.do(ctx, nil, "POST", "/v1/layout", clusterLayout) if err != nil { return fmt.Errorf("staging layout changes: %w", err) } } // https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Layout/operation/GetLayout var clusterLayout struct { Version int `json:"version"` StagedRoleChanges []peerLayout `json:"stagedRoleChanges"` } if err := c.do(ctx, &clusterLayout, "GET", "/v1/layout", nil); err != nil { return fmt.Errorf("retrieving staged layout change: %w", err) } if len(clusterLayout.StagedRoleChanges) == 0 { return nil } // https://garagehq.deuxfleurs.fr/api/garage-admin-v1.html#tag/Layout/operation/ApplyLayout applyClusterLayout := struct { Version int `json:"version"` }{ Version: clusterLayout.Version + 1, } err := c.do(ctx, nil, "POST", "/v1/layout/apply", applyClusterLayout) if err != nil { return fmt.Errorf("applying new layout (new version:%d): %w", applyClusterLayout.Version, err) } return nil }