isle/go/garage/admin_client.go

178 lines
4.0 KiB
Go
Raw Normal View History

package garage
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httputil"
"time"
"github.com/mediocregopher/mediocre-go-lib/v2/mctx"
"github.com/mediocregopher/mediocre-go-lib/v2/mlog"
)
// AdminClientError gets returned from AdminClient's Do method for non-200
// errors.
type AdminClientError struct {
StatusCode int
Body []byte
}
func (e AdminClientError) Error() string {
return fmt.Sprintf("%d response from admin: %q", e.StatusCode, e.Body)
}
// AdminClient is a helper type for performing actions against the garage admin
// interface.
type AdminClient struct {
c *http.Client
addr string
adminToken string
logger *mlog.Logger
}
// NewAdminClient initializes and returns an AdminClient which will use the
// given address and adminToken for all requests made.
//
// If Logger is nil then logs will be suppressed.
func NewAdminClient(addr, adminToken string, logger *mlog.Logger) *AdminClient {
return &AdminClient{
c: &http.Client{
Transport: http.DefaultTransport.(*http.Transport).Clone(),
},
addr: addr,
adminToken: adminToken,
logger: logger,
}
}
// Do performs an HTTP request with the given method (GET, POST) and path, and
// using the json marshaling of the given body as the request body (unless body
// is nil). It will JSON unmarshal the response into rcv, unless rcv is nil.
func (c *AdminClient) Do(
ctx context.Context, rcv interface{}, method, path string, body interface{},
) error {
var bodyR io.Reader
if body != nil {
bodyBuf := new(bytes.Buffer)
bodyR = bodyBuf
if err := json.NewEncoder(bodyBuf).Encode(body); err != nil {
return fmt.Errorf("json marshaling body: %w", err)
}
}
urlStr := fmt.Sprintf("http://%s%s", c.addr, path)
req, err := http.NewRequestWithContext(ctx, method, urlStr, bodyR)
if err != nil {
return fmt.Errorf("initializing request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+c.adminToken)
if c.logger.MaxLevel() >= mlog.LevelDebug.Int() {
reqB, err := httputil.DumpRequestOut(req, true)
if err != nil {
c.logger.Error(ctx, "failed to dump http request", err)
} else {
c.logger.Debug(ctx, "------ request ------\n"+string(reqB)+"\n")
}
}
res, err := c.c.Do(req)
if err != nil {
return fmt.Errorf("performing http request: %w", err)
}
if c.logger.MaxLevel() >= mlog.LevelDebug.Int() {
resB, err := httputil.DumpResponse(res, true)
if err != nil {
c.logger.Error(ctx, "failed to dump http response", err)
} else {
c.logger.Debug(ctx, "------ response ------\n"+string(resB)+"\n")
}
}
defer res.Body.Close()
if res.StatusCode >= 300 {
body, _ := io.ReadAll(res.Body)
return AdminClientError{
StatusCode: res.StatusCode,
Body: body,
}
}
if rcv == nil {
if _, err := io.Copy(io.Discard, res.Body); err != nil {
return fmt.Errorf("discarding response body: %w", err)
}
return nil
}
if err := json.NewDecoder(res.Body).Decode(rcv); err != nil {
return fmt.Errorf("decoding json response body: %w", err)
}
return nil
}
// Wait will block until the instance connected to can see at least
// ReplicationFactor-1 other garage instances. If the context is canceled it
// will return the context error.
func (c *AdminClient) Wait(ctx context.Context) error {
for first := true; ; first = false {
if !first {
time.Sleep(250 * time.Millisecond)
}
var clusterStatus struct {
Nodes []struct {
IsUp bool `json:"isUp"`
} `json:"nodes"`
}
err := c.Do(ctx, &clusterStatus, "GET", "/v1/status", nil)
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
} else if err != nil {
c.logger.Warn(ctx, "waiting for instance to become ready", err)
continue
}
var numUp int
for _, node := range clusterStatus.Nodes {
if node.IsUp {
numUp++
}
}
ctx := mctx.Annotate(ctx,
"numNodes", len(clusterStatus.Nodes),
2022-11-16 16:27:42 +00:00
"numUp", numUp,
)
if numUp >= ReplicationFactor-1 {
c.logger.Debug(ctx, "instance appears to be online")
return nil
}
c.logger.Debug(ctx, "instance not online yet, will continue waiting")
}
}