Refactor initialize/reload logic in Network
This commit is contained in:
parent
e8e6e29ec6
commit
e9f318f34c
@ -49,7 +49,7 @@ func loadBootstrapFromStateDir(
|
|||||||
return currBootstrap, nil
|
return currBootstrap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func coalesceNetworkConfigAndBootstrap(
|
func applyNetworkConfigToBootstrap(
|
||||||
networkConfig daecommon.NetworkConfig, hostBootstrap bootstrap.Bootstrap,
|
networkConfig daecommon.NetworkConfig, hostBootstrap bootstrap.Bootstrap,
|
||||||
) (
|
) (
|
||||||
bootstrap.Bootstrap, error,
|
bootstrap.Bootstrap, error,
|
||||||
|
@ -418,119 +418,35 @@ func (constructorsImpl) create(
|
|||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *network) periodically(
|
// preChildrenInit performs steps which are required prior to children being
|
||||||
label string,
|
// initializes/reloaded. The lock must be held when this is called (if not being
|
||||||
fn func(context.Context) error,
|
// called as part of initialize).
|
||||||
period time.Duration,
|
func (n *network) preChildrenInit(ctx context.Context) error {
|
||||||
) {
|
var err error
|
||||||
n.wg.Add(1)
|
if n.currBootstrap, err = applyNetworkConfigToBootstrap(
|
||||||
go func() {
|
n.networkConfig, n.currBootstrap,
|
||||||
defer n.wg.Done()
|
); err != nil {
|
||||||
|
|
||||||
ctx := mctx.Annotate(n.workerCtx, "workerLabel", label)
|
|
||||||
|
|
||||||
ticker := time.NewTicker(period)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
n.logger.Info(ctx, "Starting background job runner")
|
|
||||||
defer n.logger.Info(ctx, "Stopping background job runner")
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
|
|
||||||
case <-ticker.C:
|
|
||||||
n.logger.Info(ctx, "Background job running")
|
|
||||||
if err := fn(ctx); err != nil {
|
|
||||||
n.logger.Error(ctx, "Background job failed", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *network) initialize(
|
|
||||||
ctx context.Context,
|
|
||||||
isCreate bool,
|
|
||||||
) error {
|
|
||||||
var (
|
|
||||||
prevBootstrap = n.currBootstrap
|
|
||||||
prevThisHost = prevBootstrap.ThisHost()
|
|
||||||
)
|
|
||||||
|
|
||||||
// we update this Host's data using whatever configuration has been provided
|
|
||||||
// by the daemon config. This way the network has the most up-to-date
|
|
||||||
// possible bootstrap. This updated bootstrap will later get updated in
|
|
||||||
// garage as a background task, so other hosts will see it as well.
|
|
||||||
currBootstrap, err := coalesceNetworkConfigAndBootstrap(
|
|
||||||
n.networkConfig, prevBootstrap,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("combining configuration into bootstrap: %w", err)
|
return fmt.Errorf("combining configuration into bootstrap: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
n.logger.Info(ctx, "Writing updated bootstrap to state dir")
|
n.logger.Info(ctx, "Writing updated bootstrap to state dir")
|
||||||
err = writeBootstrapToStateDir(n.stateDir.Path, currBootstrap)
|
if err = writeBootstrapToStateDir(
|
||||||
if err != nil {
|
n.stateDir.Path, n.currBootstrap,
|
||||||
|
); err != nil {
|
||||||
return fmt.Errorf("writing bootstrap to state dir: %w", err)
|
return fmt.Errorf("writing bootstrap to state dir: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
n.currBootstrap = currBootstrap
|
|
||||||
|
|
||||||
n.logger.Info(ctx, "Creating child processes")
|
|
||||||
n.children, err = children.New(
|
|
||||||
ctx,
|
|
||||||
n.logger.WithNamespace("children"),
|
|
||||||
n.envBinDirPath,
|
|
||||||
n.secretsStore,
|
|
||||||
n.networkConfig,
|
|
||||||
n.runtimeDir,
|
|
||||||
n.opts.GarageAdminToken,
|
|
||||||
n.nebulaDeviceNamer,
|
|
||||||
n.currBootstrap,
|
|
||||||
&children.Opts{
|
|
||||||
GarageNewCluster: isCreate,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("creating child processes: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
n.logger.Info(ctx, "Child processes created")
|
|
||||||
|
|
||||||
createGarageGlobalBucket := isCreate
|
|
||||||
err = n.postChildrenInit(ctx, prevThisHost, createGarageGlobalBucket)
|
|
||||||
if err != nil {
|
|
||||||
n.logger.Error(ctx, "Post-initialization failed, stopping child processes", err)
|
|
||||||
n.children.Shutdown()
|
|
||||||
return fmt.Errorf("performing post-initialization: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do this now so that everything is stable before returning. This also
|
|
||||||
// serves a dual-purpose, as it makes sure that the PUT from the
|
|
||||||
// postChildrenInit above has propagated from the local garage instance, if
|
|
||||||
// there is one.
|
|
||||||
n.logger.Info(ctx, "Reloading hosts from network storage")
|
|
||||||
if err = n.reloadHosts(ctx); err != nil {
|
|
||||||
return fmt.Errorf("Reloading network bootstrap: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
n.periodically("reloadHosts", n.reloadHosts, 3*time.Minute)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// postChildrenInit performs steps which are required after children have been
|
// postChildrenInit performs steps which are required after children have been
|
||||||
// initialized.
|
// initialized/reloaded. The lock must be held when this is called (if not being
|
||||||
|
// called as part of initialize).
|
||||||
func (n *network) postChildrenInit(
|
func (n *network) postChildrenInit(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
prevThisHost bootstrap.Host,
|
prevThisHost bootstrap.Host,
|
||||||
createGarageGlobalBucket bool,
|
createGarageGlobalBucket bool,
|
||||||
) error {
|
) error {
|
||||||
n.l.RLock()
|
|
||||||
defer n.l.RUnlock()
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
thisHost = n.currBootstrap.ThisHost()
|
thisHost = n.currBootstrap.ThisHost()
|
||||||
garageAdminClient, hasGarage = n.children.GarageAdminClient()
|
garageAdminClient, hasGarage = n.children.GarageAdminClient()
|
||||||
@ -601,55 +517,6 @@ func (n *network) postChildrenInit(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns the bootstrap prior to the reload being applied.
|
|
||||||
func (n *network) reload(
|
|
||||||
ctx context.Context,
|
|
||||||
newNetworkConfig *daecommon.NetworkConfig,
|
|
||||||
newBootstrap *bootstrap.Bootstrap,
|
|
||||||
) (
|
|
||||||
bootstrap.Bootstrap, error,
|
|
||||||
) {
|
|
||||||
n.l.Lock()
|
|
||||||
defer n.l.Unlock()
|
|
||||||
|
|
||||||
prevBootstrap := n.currBootstrap
|
|
||||||
|
|
||||||
if newBootstrap != nil {
|
|
||||||
n.currBootstrap = *newBootstrap
|
|
||||||
}
|
|
||||||
|
|
||||||
if newNetworkConfig != nil {
|
|
||||||
n.networkConfig = *newNetworkConfig
|
|
||||||
}
|
|
||||||
|
|
||||||
var err error
|
|
||||||
if n.currBootstrap, err = coalesceNetworkConfigAndBootstrap(
|
|
||||||
n.networkConfig, n.currBootstrap,
|
|
||||||
); err != nil {
|
|
||||||
return bootstrap.Bootstrap{}, fmt.Errorf(
|
|
||||||
"combining configuration into bootstrap: %w", err,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
n.logger.Info(ctx, "Writing updated bootstrap to state dir")
|
|
||||||
err = writeBootstrapToStateDir(n.stateDir.Path, n.currBootstrap)
|
|
||||||
if err != nil {
|
|
||||||
return bootstrap.Bootstrap{}, fmt.Errorf(
|
|
||||||
"writing bootstrap to state dir: %w", err,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
n.logger.Info(ctx, "Reloading child processes")
|
|
||||||
err = n.children.Reload(ctx, n.networkConfig, n.currBootstrap)
|
|
||||||
if err != nil {
|
|
||||||
return bootstrap.Bootstrap{}, fmt.Errorf(
|
|
||||||
"reloading child processes: %w", err,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
return prevBootstrap, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *network) reloadHosts(ctx context.Context) error {
|
func (n *network) reloadHosts(ctx context.Context) error {
|
||||||
n.l.RLock()
|
n.l.RLock()
|
||||||
currBootstrap := n.currBootstrap
|
currBootstrap := n.currBootstrap
|
||||||
@ -685,6 +552,89 @@ func (n *network) reloadHosts(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *network) periodically(
|
||||||
|
label string,
|
||||||
|
fn func(context.Context) error,
|
||||||
|
period time.Duration,
|
||||||
|
) {
|
||||||
|
n.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer n.wg.Done()
|
||||||
|
|
||||||
|
ctx := mctx.Annotate(n.workerCtx, "workerLabel", label)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(period)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
n.logger.Info(ctx, "Starting background job runner")
|
||||||
|
defer n.logger.Info(ctx, "Stopping background job runner")
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
|
||||||
|
case <-ticker.C:
|
||||||
|
n.logger.Info(ctx, "Background job running")
|
||||||
|
if err := fn(ctx); err != nil {
|
||||||
|
n.logger.Error(ctx, "Background job failed", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *network) initialize(ctx context.Context, isCreate bool) error {
|
||||||
|
var (
|
||||||
|
prevThisHost = n.currBootstrap.ThisHost()
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := n.preChildrenInit(ctx); err != nil {
|
||||||
|
return fmt.Errorf("performing pre-initialization: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
n.logger.Info(ctx, "Creating child processes")
|
||||||
|
n.children, err = children.New(
|
||||||
|
ctx,
|
||||||
|
n.logger.WithNamespace("children"),
|
||||||
|
n.envBinDirPath,
|
||||||
|
n.secretsStore,
|
||||||
|
n.networkConfig,
|
||||||
|
n.runtimeDir,
|
||||||
|
n.opts.GarageAdminToken,
|
||||||
|
n.nebulaDeviceNamer,
|
||||||
|
n.currBootstrap,
|
||||||
|
&children.Opts{
|
||||||
|
GarageNewCluster: isCreate,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating child processes: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
createGarageGlobalBucket := isCreate
|
||||||
|
err = n.postChildrenInit(ctx, prevThisHost, createGarageGlobalBucket)
|
||||||
|
if err != nil {
|
||||||
|
n.logger.Error(ctx, "Post-initialization failed, stopping child processes", err)
|
||||||
|
n.children.Shutdown()
|
||||||
|
return fmt.Errorf("performing post-initialization: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do this now so that everything is stable before returning. This also
|
||||||
|
// serves a dual-purpose, as it makes sure that the PUT from the
|
||||||
|
// postChildrenInit above has propagated from the local garage instance, if
|
||||||
|
// there is one.
|
||||||
|
n.logger.Info(ctx, "Reloading hosts from network storage")
|
||||||
|
if err = n.reloadHosts(ctx); err != nil {
|
||||||
|
return fmt.Errorf("Reloading network bootstrap: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
n.periodically("reloadHosts", n.reloadHosts, 3*time.Minute)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func withCurrBootstrap[Res any](
|
func withCurrBootstrap[Res any](
|
||||||
n *network, fn func(bootstrap.Bootstrap) (Res, error),
|
n *network, fn func(bootstrap.Bootstrap) (Res, error),
|
||||||
) (Res, error) {
|
) (Res, error) {
|
||||||
@ -971,16 +921,31 @@ func (n *network) GetConfig(context.Context) (daecommon.NetworkConfig, error) {
|
|||||||
func (n *network) SetConfig(
|
func (n *network) SetConfig(
|
||||||
ctx context.Context, config daecommon.NetworkConfig,
|
ctx context.Context, config daecommon.NetworkConfig,
|
||||||
) error {
|
) error {
|
||||||
|
n.l.Lock()
|
||||||
|
defer n.l.Unlock()
|
||||||
|
|
||||||
|
var (
|
||||||
|
prevThisHost = n.currBootstrap.ThisHost()
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
if _, err := loadStoreConfig(n.stateDir, &config); err != nil {
|
if _, err := loadStoreConfig(n.stateDir, &config); err != nil {
|
||||||
return fmt.Errorf("storing new config: %w", err)
|
return fmt.Errorf("storing new config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
prevBootstrap, err := n.reload(ctx, &config, nil)
|
n.networkConfig = config
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("reloading config: %w", err)
|
if err := n.preChildrenInit(ctx); err != nil {
|
||||||
|
return fmt.Errorf("performing pre-initialization: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := n.postChildrenInit(ctx, prevBootstrap.ThisHost(), false); err != nil {
|
n.logger.Info(ctx, "Reloading child processes")
|
||||||
|
err = n.children.Reload(ctx, n.networkConfig, n.currBootstrap)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("reloading child processes: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := n.postChildrenInit(ctx, prevThisHost, false); err != nil {
|
||||||
return fmt.Errorf("performing post-initialization: %w", err)
|
return fmt.Errorf("performing post-initialization: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user