Brian Picciano
68f417b5ba
This required switching all garage admin API calls to the new v1 versions, and redoing how the global bucket key is created so it is created via the "create key" API call.
178 lines
4.0 KiB
Go
178 lines
4.0 KiB
Go
package garage
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httputil"
|
|
"time"
|
|
|
|
"github.com/mediocregopher/mediocre-go-lib/v2/mctx"
|
|
"github.com/mediocregopher/mediocre-go-lib/v2/mlog"
|
|
)
|
|
|
|
// AdminClientError gets returned from AdminClient's Do method for non-200
|
|
// errors.
|
|
type AdminClientError struct {
|
|
StatusCode int
|
|
Body []byte
|
|
}
|
|
|
|
func (e AdminClientError) Error() string {
|
|
return fmt.Sprintf("%d response from admin: %q", e.StatusCode, e.Body)
|
|
}
|
|
|
|
// AdminClient is a helper type for performing actions against the garage admin
|
|
// interface.
|
|
type AdminClient struct {
|
|
c *http.Client
|
|
addr string
|
|
adminToken string
|
|
logger *mlog.Logger
|
|
}
|
|
|
|
// NewAdminClient initializes and returns an AdminClient which will use the
|
|
// given address and adminToken for all requests made.
|
|
//
|
|
// If Logger is nil then logs will be suppressed.
|
|
func NewAdminClient(addr, adminToken string, logger *mlog.Logger) *AdminClient {
|
|
return &AdminClient{
|
|
c: &http.Client{
|
|
Transport: http.DefaultTransport.(*http.Transport).Clone(),
|
|
},
|
|
addr: addr,
|
|
adminToken: adminToken,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// Do performs an HTTP request with the given method (GET, POST) and path, and
|
|
// using the json marshaling of the given body as the request body (unless body
|
|
// is nil). It will JSON unmarshal the response into rcv, unless rcv is nil.
|
|
func (c *AdminClient) Do(
|
|
ctx context.Context, rcv interface{}, method, path string, body interface{},
|
|
) error {
|
|
|
|
var bodyR io.Reader
|
|
|
|
if body != nil {
|
|
bodyBuf := new(bytes.Buffer)
|
|
bodyR = bodyBuf
|
|
|
|
if err := json.NewEncoder(bodyBuf).Encode(body); err != nil {
|
|
return fmt.Errorf("json marshaling body: %w", err)
|
|
}
|
|
}
|
|
|
|
urlStr := fmt.Sprintf("http://%s%s", c.addr, path)
|
|
|
|
req, err := http.NewRequestWithContext(ctx, method, urlStr, bodyR)
|
|
if err != nil {
|
|
return fmt.Errorf("initializing request: %w", err)
|
|
}
|
|
|
|
req.Header.Set("Authorization", "Bearer "+c.adminToken)
|
|
|
|
if c.logger.MaxLevel() >= mlog.LevelDebug.Int() {
|
|
|
|
reqB, err := httputil.DumpRequestOut(req, true)
|
|
if err != nil {
|
|
c.logger.Error(ctx, "failed to dump http request", err)
|
|
} else {
|
|
c.logger.Debug(ctx, "------ request ------\n"+string(reqB)+"\n")
|
|
}
|
|
}
|
|
|
|
res, err := c.c.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("performing http request: %w", err)
|
|
}
|
|
|
|
if c.logger.MaxLevel() >= mlog.LevelDebug.Int() {
|
|
|
|
resB, err := httputil.DumpResponse(res, true)
|
|
if err != nil {
|
|
c.logger.Error(ctx, "failed to dump http response", err)
|
|
} else {
|
|
c.logger.Debug(ctx, "------ response ------\n"+string(resB)+"\n")
|
|
}
|
|
}
|
|
|
|
defer res.Body.Close()
|
|
|
|
if res.StatusCode >= 300 {
|
|
body, _ := io.ReadAll(res.Body)
|
|
return AdminClientError{
|
|
StatusCode: res.StatusCode,
|
|
Body: body,
|
|
}
|
|
}
|
|
|
|
if rcv == nil {
|
|
|
|
if _, err := io.Copy(io.Discard, res.Body); err != nil {
|
|
return fmt.Errorf("discarding response body: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
if err := json.NewDecoder(res.Body).Decode(rcv); err != nil {
|
|
return fmt.Errorf("decoding json response body: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Wait will block until the instance connected to can see at least
|
|
// ReplicationFactor-1 other garage instances. If the context is canceled it
|
|
// will return the context error.
|
|
func (c *AdminClient) Wait(ctx context.Context) error {
|
|
|
|
for first := true; ; first = false {
|
|
|
|
if !first {
|
|
time.Sleep(250 * time.Millisecond)
|
|
}
|
|
|
|
var clusterStatus struct {
|
|
Nodes []struct {
|
|
IsUp bool `json:"isUp"`
|
|
} `json:"nodes"`
|
|
}
|
|
|
|
err := c.Do(ctx, &clusterStatus, "GET", "/v1/status", nil)
|
|
|
|
if ctxErr := ctx.Err(); ctxErr != nil {
|
|
return ctxErr
|
|
|
|
} else if err != nil {
|
|
c.logger.Warn(ctx, "waiting for instance to become ready", err)
|
|
continue
|
|
}
|
|
|
|
var numUp int
|
|
|
|
for _, node := range clusterStatus.Nodes {
|
|
if node.IsUp {
|
|
numUp++
|
|
}
|
|
}
|
|
|
|
ctx := mctx.Annotate(ctx,
|
|
"numNodes", len(clusterStatus.Nodes),
|
|
"numUp", numUp,
|
|
)
|
|
|
|
if numUp >= ReplicationFactor-1 {
|
|
c.logger.Debug(ctx, "instance appears to be online")
|
|
return nil
|
|
}
|
|
|
|
c.logger.Debug(ctx, "instance not online yet, will continue waiting")
|
|
}
|
|
}
|