Cleanup idle HTTP connections to make shutdown smoother

This commit is contained in:
Brian Picciano 2024-10-07 21:12:47 +02:00
parent f146b77187
commit 7f3cbf628f
8 changed files with 102 additions and 40 deletions

View File

@ -184,6 +184,7 @@ in rec {
buildInputs = [ buildInputs = [
pkgs.go pkgs.go
pkgs.golangci-lint pkgs.golangci-lint
pkgs.gopls
(pkgs.callPackage ./nix/gowrap.nix {}) (pkgs.callPackage ./nix/gowrap.nix {})
]; ];
shellHook = '' shellHook = ''

View File

@ -55,6 +55,8 @@ func waitForGarage(
if err := adminClient.Wait(ctx); err != nil { if err := adminClient.Wait(ctx); err != nil {
return fmt.Errorf("waiting for garage instance %q to start up: %w", adminAddr, err) return fmt.Errorf("waiting for garage instance %q to start up: %w", adminAddr, err)
} }
adminClient.Close()
} }
return nil return nil

View File

@ -92,6 +92,8 @@ func garageApplyLayout(
peers = make([]garage.PeerLayout, len(allocs)) peers = make([]garage.PeerLayout, len(allocs))
) )
defer adminClient.Close()
for i, alloc := range allocs { for i, alloc := range allocs {
id := daecommon.BootstrapGarageHostForAlloc(thisHost, alloc).ID id := daecommon.BootstrapGarageHostForAlloc(thisHost, alloc).ID
@ -124,6 +126,7 @@ func garageInitializeGlobalBucket(
adminClient := newGarageAdminClient( adminClient := newGarageAdminClient(
logger, networkConfig, adminToken, hostBootstrap, logger, networkConfig, adminToken, hostBootstrap,
) )
defer adminClient.Close()
creds, err := adminClient.CreateS3APICredentials( creds, err := adminClient.CreateS3APICredentials(
ctx, garage.GlobalBucketS3APICredentialsName, ctx, garage.GlobalBucketS3APICredentialsName,
@ -175,6 +178,8 @@ func (n *network) getGarageBootstrapHosts(
) )
) )
defer client.Close()
for objInfo := range objInfoCh { for objInfo := range objInfoCh {
ctx := mctx.Annotate(ctx, "objectKey", objInfo.Key) ctx := mctx.Annotate(ctx, "objectKey", objInfo.Key)
@ -228,6 +233,8 @@ func (n *network) putGarageBoostrapHost(
client = garageClientParams.GlobalBucketS3APIClient() client = garageClientParams.GlobalBucketS3APIClient()
) )
defer client.Close()
configured, err := nebula.Sign( configured, err := nebula.Sign(
host.HostConfigured, currBootstrap.PrivateCredentials.SigningPrivateKey, host.HostConfigured, currBootstrap.PrivateCredentials.SigningPrivateKey,
) )
@ -265,7 +272,7 @@ func (n *network) putGarageBoostrapHost(
} }
func removeGarageBootstrapHost( func removeGarageBootstrapHost(
ctx context.Context, client garage.S3APIClient, hostName nebula.HostName, ctx context.Context, client *garage.S3APIClient, hostName nebula.HostName,
) error { ) error {
filePath := filepath.Join( filePath := filepath.Join(

View File

@ -39,7 +39,7 @@ type GarageClientParams struct {
// GlobalBucketS3APIClient returns an S3 client pre-configured with access to // GlobalBucketS3APIClient returns an S3 client pre-configured with access to
// the global bucket. // the global bucket.
func (p GarageClientParams) GlobalBucketS3APIClient() garage.S3APIClient { func (p GarageClientParams) GlobalBucketS3APIClient() *garage.S3APIClient {
var ( var (
addr = p.Peer.S3APIAddr() addr = p.Peer.S3APIAddr()
creds = p.GlobalBucketS3APICredentials creds = p.GlobalBucketS3APICredentials
@ -499,11 +499,41 @@ func (n *network) postInit(ctx context.Context) error {
} }
n.logger.Info(ctx, "Updating host info in garage") n.logger.Info(ctx, "Updating host info in garage")
err = n.putGarageBoostrapHost(ctx, n.currBootstrap) if err = n.putGarageBoostrapHost(ctx, n.currBootstrap); err != nil {
if err != nil {
return fmt.Errorf("updating host info in garage: %w", err) return fmt.Errorf("updating host info in garage: %w", err)
} }
// Do this now so that everything is stable before returning. This also
// serves a dual-purpose, as it makes sure that the PUT above has propagated
// from the local garage instance, if there is one.
n.logger.Info(ctx, "Reloading bootstrap from network")
if err = n.reload(ctx); err != nil {
return fmt.Errorf("Reloading network bootstrap: %w", err)
}
return nil
}
func (n *network) reload(ctx context.Context) error {
n.l.RLock()
currBootstrap := n.currBootstrap
n.l.RUnlock()
n.logger.Info(ctx, "Checking for bootstrap changes")
newHosts, err := n.getGarageBootstrapHosts(ctx, currBootstrap)
if err != nil {
return fmt.Errorf("getting hosts from garage: %w", err)
}
// TODO there's some potential race conditions here, where
// CreateHost could be called at this point, write the new host to
// garage and the bootstrap, but then this reload call removes the
// host from this bootstrap/children until the next reload.
if err := n.reloadWithHosts(ctx, currBootstrap, newHosts); err != nil {
return fmt.Errorf("reloading with new host data: %w", err)
}
return nil return nil
} }
@ -517,34 +547,18 @@ func (n *network) reloadLoop(ctx context.Context) {
return return
case <-ticker.C: case <-ticker.C:
n.l.RLock() if err := n.reload(ctx); err != nil {
currBootstrap := n.currBootstrap n.logger.Error(ctx, "Attempting to reload", err)
n.l.RUnlock()
n.logger.Info(ctx, "Checking for bootstrap changes")
newHosts, err := n.getGarageBootstrapHosts(ctx, currBootstrap)
if err != nil {
n.logger.Error(ctx, "Failed to get hosts from garage", err)
continue
}
// TODO there's some potential race conditions here, where
// CreateHost could be called at this point, write the new host to
// garage and the bootstrap, but then this reload call removes the
// host from this bootstrap/children until the next reload.
if err := n.reload(ctx, currBootstrap, newHosts); err != nil {
n.logger.Error(ctx, "Reloading with new host data failed", err)
continue continue
} }
} }
} }
} }
// reload will check the existing hosts data from currBootstrap against a // reloadWithHosts will check the existing hosts data from currBootstrap against
// potentially updated set of hosts data, and if there are any differences will // a potentially updated set of hosts data, and if there are any differences
// perform whatever changes are necessary. // will perform whatever changes are necessary.
func (n *network) reload( func (n *network) reloadWithHosts(
ctx context.Context, ctx context.Context,
currBootstrap bootstrap.Bootstrap, currBootstrap bootstrap.Bootstrap,
newHosts map[nebula.HostName]bootstrap.Host, newHosts map[nebula.HostName]bootstrap.Host,
@ -661,6 +675,8 @@ func (n *network) RemoveHost(ctx context.Context, hostName nebula.HostName) erro
} }
client := garageClientParams.GlobalBucketS3APIClient() client := garageClientParams.GlobalBucketS3APIClient()
defer client.Close()
return struct{}{}, removeGarageBootstrapHost(ctx, client, hostName) return struct{}{}, removeGarageBootstrapHost(ctx, client, hostName)
}) })
return err return err
@ -826,7 +842,7 @@ func (n *network) CreateHost(
newHosts := joiningBootstrap.Bootstrap.Hosts newHosts := joiningBootstrap.Bootstrap.Hosts
n.logger.Info(ctx, "Reloading local state with new host") n.logger.Info(ctx, "Reloading local state with new host")
if err := n.reload(ctx, currBootstrap, newHosts); err != nil { if err := n.reloadWithHosts(ctx, currBootstrap, newHosts); err != nil {
return JoiningBootstrap{}, fmt.Errorf("reloading child processes: %w", err) return JoiningBootstrap{}, fmt.Errorf("reloading child processes: %w", err)
} }

View File

@ -6,9 +6,6 @@ import (
"isle/toolkit" "isle/toolkit"
) )
// TODO seeing more of these logs than I'd expect:
// INFO [network/children] Creating UDP socket from nebula addr "lUDPAddr"="172.16.1.1:0" "rUDPAddr"="172.16.1.1:45535"
func TestCreate(t *testing.T) { func TestCreate(t *testing.T) {
toolkit.MarkIntegrationTest(t) toolkit.MarkIntegrationTest(t)

View File

@ -3,6 +3,7 @@ package network
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"isle/bootstrap" "isle/bootstrap"
"isle/daemon/children" "isle/daemon/children"
"isle/daemon/daecommon" "isle/daemon/daecommon"
@ -83,6 +84,7 @@ func newIntegrationHarness(t *testing.T) *integrationHarness {
if err != nil { if err != nil {
t.Fatalf("creating root temp dir: %v", err) t.Fatalf("creating root temp dir: %v", err)
} }
t.Logf("Temporary test directory: %q", rootDir)
t.Cleanup(func() { t.Cleanup(func() {
if t.Failed() { if t.Failed() {
@ -179,6 +181,8 @@ func (h *integrationHarness) createNetwork(
ipNet = newIPNet() ipNet = newIPNet()
hostName = nebula.HostName(hostNameStr) hostName = nebula.HostName(hostNameStr)
childrenOpts *children.Opts
) )
childrenLogFile, err := os.Create(childrenLogFilePath) childrenLogFile, err := os.Create(childrenLogFilePath)
@ -191,6 +195,18 @@ func (h *integrationHarness) createNetwork(
} }
}) })
if os.Getenv("ISLE_INTEGRATION_TEST_CHILDREN_LOG_STDOUT") == "" {
childrenOpts = &children.Opts{
Stdout: childrenLogFile,
Stderr: childrenLogFile,
}
} else {
childrenOpts = &children.Opts{
Stdout: io.MultiWriter(os.Stdout, childrenLogFile),
Stderr: io.MultiWriter(os.Stdout, childrenLogFile),
}
}
network, err := Create( network, err := Create(
h.ctx, h.ctx,
h.logger.WithNamespace("network"), h.logger.WithNamespace("network"),
@ -202,10 +218,7 @@ func (h *integrationHarness) createNetwork(
ipNet, ipNet,
hostName, hostName,
&Opts{ &Opts{
ChildrenOpts: &children.Opts{ ChildrenOpts: childrenOpts,
Stdout: childrenLogFile,
Stderr: childrenLogFile,
},
}, },
) )
if err != nil { if err != nil {

View File

@ -47,6 +47,8 @@ type AdminClient struct {
c *http.Client c *http.Client
addr string addr string
adminToken string adminToken string
transport *http.Transport
} }
// NewAdminClient initializes and returns an AdminClient which will use the // NewAdminClient initializes and returns an AdminClient which will use the
@ -54,16 +56,25 @@ type AdminClient struct {
// //
// If Logger is nil then logs will be suppressed. // If Logger is nil then logs will be suppressed.
func NewAdminClient(logger *mlog.Logger, addr, adminToken string) *AdminClient { func NewAdminClient(logger *mlog.Logger, addr, adminToken string) *AdminClient {
transport := http.DefaultTransport.(*http.Transport).Clone()
return &AdminClient{ return &AdminClient{
logger: logger, logger: logger,
c: &http.Client{ c: &http.Client{
Transport: http.DefaultTransport.(*http.Transport).Clone(), Transport: transport,
}, },
addr: addr, addr: addr,
adminToken: adminToken, adminToken: adminToken,
transport: transport,
} }
} }
// Close cleans up all resources held by the lient.
func (c *AdminClient) Close() error {
c.transport.CloseIdleConnections()
return nil
}
// do performs an HTTP request with the given method (GET, POST) and path, and // do performs an HTTP request with the given method (GET, POST) and path, and
// using the json marshaling of the given body as the request body (unless body // using the json marshaling of the given body as the request body (unless body
// is nil). It will JSON unmarshal the response into rcv, unless rcv is nil. // is nil). It will JSON unmarshal the response into rcv, unless rcv is nil.

View File

@ -3,6 +3,7 @@ package garage
import ( import (
"errors" "errors"
"fmt" "fmt"
"net/http"
"github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/credentials"
@ -16,7 +17,16 @@ func IsKeyNotFound(err error) bool {
} }
// S3APIClient is a client used to interact with garage's S3 API. // S3APIClient is a client used to interact with garage's S3 API.
type S3APIClient = *minio.Client type S3APIClient struct {
*minio.Client
transport *http.Transport
}
// Close cleans up all resources held by the client.
func (c *S3APIClient) Close() error {
c.transport.CloseIdleConnections()
return nil
}
// S3APICredentials describe data fields necessary for authenticating with a // S3APICredentials describe data fields necessary for authenticating with a
// garage S3 API endpoint. // garage S3 API endpoint.
@ -27,16 +37,21 @@ type S3APICredentials struct {
// NewS3APIClient returns a minio client configured to use the given garage S3 API // NewS3APIClient returns a minio client configured to use the given garage S3 API
// endpoint. // endpoint.
func NewS3APIClient(addr string, creds S3APICredentials) S3APIClient { func NewS3APIClient(addr string, creds S3APICredentials) *S3APIClient {
transport := http.DefaultTransport.(*http.Transport).Clone()
client, err := minio.New(addr, &minio.Options{ client, err := minio.New(addr, &minio.Options{
Creds: credentials.NewStaticV4(creds.ID, creds.Secret, ""), Creds: credentials.NewStaticV4(creds.ID, creds.Secret, ""),
Region: Region, Region: Region,
Transport: transport,
}) })
if err != nil { if err != nil {
panic(fmt.Sprintf("initializing minio client at addr %q and with creds %+v", addr, creds)) panic(fmt.Sprintf("initializing minio client at addr %q and with creds %+v", addr, creds))
} }
return client return &S3APIClient{
Client: client,
transport: transport,
}
} }