Move creation of children into daemon initialize method
This commit is contained in:
parent
7aa11ebe29
commit
c94f8e3475
@ -59,7 +59,7 @@ var subCmdDaemon = subCmd{
|
|||||||
}
|
}
|
||||||
|
|
||||||
daemonInst, err := daemon.NewDaemon(
|
daemonInst, err := daemon.NewDaemon(
|
||||||
logger, daemonConfig, envBinDirPath, nil,
|
ctx, logger, daemonConfig, envBinDirPath, nil,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("starting daemon: %w", err)
|
return fmt.Errorf("starting daemon: %w", err)
|
||||||
|
@ -71,15 +71,7 @@ func NewChildren(
|
|||||||
)
|
)
|
||||||
if initErr != nil {
|
if initErr != nil {
|
||||||
logger.Warn(ctx, "failed to initialize Children, shutting down child processes", err)
|
logger.Warn(ctx, "failed to initialize Children, shutting down child processes", err)
|
||||||
if err := c.Shutdown(); err != nil {
|
c.Shutdown()
|
||||||
panic(fmt.Sprintf(
|
|
||||||
"failed to shut down child processes after initialization"+
|
|
||||||
" error, there may be zombie children leftover."+
|
|
||||||
" Original error: %v",
|
|
||||||
initErr,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, initErr
|
return nil, initErr
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -114,10 +106,6 @@ func (c *Children) RestartNebula(hostBootstrap bootstrap.Bootstrap) error {
|
|||||||
|
|
||||||
// Shutdown blocks until all child processes have gracefully shut themselves
|
// Shutdown blocks until all child processes have gracefully shut themselves
|
||||||
// down.
|
// down.
|
||||||
//
|
func (c *Children) Shutdown() {
|
||||||
// If this returns an error then it's possible that child processes are
|
|
||||||
// still running and are no longer managed.
|
|
||||||
func (c *Children) Shutdown() error {
|
|
||||||
c.pmux.Stop()
|
c.pmux.Stop()
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
@ -181,7 +181,11 @@ type daemon struct {
|
|||||||
// ErrInitializing, except Shutdown which will block until initialization is
|
// ErrInitializing, except Shutdown which will block until initialization is
|
||||||
// canceled.
|
// canceled.
|
||||||
func NewDaemon(
|
func NewDaemon(
|
||||||
logger *mlog.Logger, daemonConfig Config, envBinDirPath string, opts *Opts,
|
ctx context.Context,
|
||||||
|
logger *mlog.Logger,
|
||||||
|
daemonConfig Config,
|
||||||
|
envBinDirPath string,
|
||||||
|
opts *Opts,
|
||||||
) (
|
) (
|
||||||
Daemon, error,
|
Daemon, error,
|
||||||
) {
|
) {
|
||||||
@ -220,21 +224,21 @@ func NewDaemon(
|
|||||||
return nil, fmt.Errorf(
|
return nil, fmt.Errorf(
|
||||||
"loading bootstrap from %q: %w", bootstrapFilePath, err,
|
"loading bootstrap from %q: %w", bootstrapFilePath, err,
|
||||||
)
|
)
|
||||||
} else if err := d.initialize(currBootstrap, nil); err != nil {
|
} else if err := d.initialize(ctx, currBootstrap); err != nil {
|
||||||
return nil, fmt.Errorf("initializing with bootstrap: %w", err)
|
return nil, fmt.Errorf("initializing with bootstrap: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return d, nil
|
return d, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize must be called with d.l write lock held, _but_ the lock should be
|
// initialize must be called with d.l write lock held, but it will unlock the
|
||||||
// released just after initialize returns.
|
// lock itself. Once initialize returns successfully the d.l write lock should
|
||||||
//
|
// not be held again, except by the reloadLoop.
|
||||||
// readyCh will be written to everytime daemon changes state to daemonStateOk,
|
|
||||||
// unless it is nil or blocks.
|
|
||||||
func (d *daemon) initialize(
|
func (d *daemon) initialize(
|
||||||
currBootstrap bootstrap.Bootstrap, readyCh chan<- struct{},
|
ctx context.Context, currBootstrap bootstrap.Bootstrap,
|
||||||
) error {
|
) error {
|
||||||
|
defer d.l.Unlock()
|
||||||
|
|
||||||
// we update this Host's data using whatever configuration has been provided
|
// we update this Host's data using whatever configuration has been provided
|
||||||
// by the daemon config. This way the daemon has the most up-to-date
|
// by the daemon config. This way the daemon has the most up-to-date
|
||||||
// possible bootstrap. This updated bootstrap will later get updated in
|
// possible bootstrap. This updated bootstrap will later get updated in
|
||||||
@ -254,6 +258,31 @@ func (d *daemon) initialize(
|
|||||||
d.currBootstrap = currBootstrap
|
d.currBootstrap = currBootstrap
|
||||||
d.state = daemonStateInitializing
|
d.state = daemonStateInitializing
|
||||||
|
|
||||||
|
d.logger.Info(ctx, "Creating child processes")
|
||||||
|
d.children, err = NewChildren(
|
||||||
|
ctx,
|
||||||
|
d.logger.WithNamespace("children"),
|
||||||
|
d.envBinDirPath,
|
||||||
|
d.secretsStore,
|
||||||
|
d.daemonConfig,
|
||||||
|
d.garageAdminToken,
|
||||||
|
currBootstrap,
|
||||||
|
d.opts,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating child processes: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
d.logger.Info(ctx, "Child processes created")
|
||||||
|
|
||||||
|
if err := d.postInit(ctx); err != nil {
|
||||||
|
d.logger.Error(ctx, "Post-initialization failed, stopping child processes", err)
|
||||||
|
d.children.Shutdown()
|
||||||
|
return fmt.Errorf("performing post-initialization: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
d.state = daemonStateOk
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
d.wg.Add(1)
|
d.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
@ -265,7 +294,7 @@ func (d *daemon) initialize(
|
|||||||
d.wg.Add(1)
|
d.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer d.wg.Done()
|
defer d.wg.Done()
|
||||||
d.restartLoop(ctx, readyCh)
|
d.reloadLoop(ctx)
|
||||||
d.logger.Debug(ctx, "Daemon restart loop stopped")
|
d.logger.Debug(ctx, "Daemon restart loop stopped")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -340,23 +369,13 @@ func (d *daemon) reload(
|
|||||||
return errors.Join(errs...)
|
return errors.Join(errs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *daemon) postInit(ctx context.Context) bool {
|
func (d *daemon) postInit(ctx context.Context) error {
|
||||||
if len(d.daemonConfig.Storage.Allocations) > 0 {
|
if len(d.daemonConfig.Storage.Allocations) > 0 {
|
||||||
if !until(
|
d.logger.Info(ctx, "Applying garage layout")
|
||||||
ctx,
|
if err := garageApplyLayout(
|
||||||
d.logger,
|
ctx, d.logger, d.daemonConfig, d.garageAdminToken, d.currBootstrap,
|
||||||
"Applying garage layout",
|
); err != nil {
|
||||||
func(ctx context.Context) error {
|
return fmt.Errorf("applying garage layout: %w", err)
|
||||||
return garageApplyLayout(
|
|
||||||
ctx,
|
|
||||||
d.logger,
|
|
||||||
d.daemonConfig,
|
|
||||||
d.garageAdminToken,
|
|
||||||
d.currBootstrap,
|
|
||||||
)
|
|
||||||
},
|
|
||||||
) {
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -367,119 +386,36 @@ func (d *daemon) postInit(ctx context.Context) bool {
|
|||||||
// manage it at the moment.
|
// manage it at the moment.
|
||||||
_, err := getGarageS3APIGlobalBucketCredentials(ctx, d.secretsStore)
|
_, err := getGarageS3APIGlobalBucketCredentials(ctx, d.secretsStore)
|
||||||
if errors.Is(err, secrets.ErrNotFound) {
|
if errors.Is(err, secrets.ErrNotFound) {
|
||||||
if !until(
|
d.logger.Info(ctx, "Initializing garage shared global bucket")
|
||||||
|
garageGlobalBucketCreds, err := garageInitializeGlobalBucket(
|
||||||
ctx,
|
ctx,
|
||||||
d.logger,
|
d.logger,
|
||||||
"Initializing garage shared global bucket",
|
|
||||||
func(ctx context.Context) error {
|
|
||||||
garageGlobalBucketCreds, err := garageInitializeGlobalBucket(
|
|
||||||
ctx,
|
|
||||||
d.logger,
|
|
||||||
d.daemonConfig,
|
|
||||||
d.garageAdminToken,
|
|
||||||
d.currBootstrap,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("initializing global bucket: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = setGarageS3APIGlobalBucketCredentials(
|
|
||||||
ctx, d.secretsStore, garageGlobalBucketCreds,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("storing global bucket creds: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !until(
|
|
||||||
ctx,
|
|
||||||
d.logger,
|
|
||||||
"Updating host info in garage",
|
|
||||||
func(ctx context.Context) error {
|
|
||||||
return d.putGarageBoostrapHost(ctx, d.logger, d.currBootstrap)
|
|
||||||
},
|
|
||||||
) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *daemon) restartLoop(
|
|
||||||
ctx context.Context,
|
|
||||||
readyCh chan<- struct{},
|
|
||||||
) {
|
|
||||||
wait := func(d time.Duration) bool {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return false
|
|
||||||
case <-time.After(d):
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ready := func() {
|
|
||||||
select {
|
|
||||||
case readyCh <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
readyCh = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
children *Children
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
|
|
||||||
for {
|
|
||||||
d.logger.Info(ctx, "Creating child processes")
|
|
||||||
// TODO this could probably get moved outside of restartLoop, and into
|
|
||||||
// initialize. If it fails the error can get passed up to the caller and
|
|
||||||
// no changes can be made.
|
|
||||||
children, err = NewChildren(
|
|
||||||
ctx,
|
|
||||||
d.logger.WithNamespace("children"),
|
|
||||||
d.envBinDirPath,
|
|
||||||
d.secretsStore,
|
|
||||||
d.daemonConfig,
|
d.daemonConfig,
|
||||||
d.garageAdminToken,
|
d.garageAdminToken,
|
||||||
d.currBootstrap,
|
d.currBootstrap,
|
||||||
d.opts,
|
|
||||||
)
|
)
|
||||||
if errors.Is(err, context.Canceled) {
|
if err != nil {
|
||||||
return
|
return fmt.Errorf("initializing global bucket: %w", err)
|
||||||
} else if err != nil {
|
|
||||||
d.logger.Error(ctx, "failed to initialize child processes", err)
|
|
||||||
if !wait(1 * time.Second) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
d.logger.Info(ctx, "Child processes created")
|
err = setGarageS3APIGlobalBucketCredentials(
|
||||||
|
ctx, d.secretsStore, garageGlobalBucketCreds,
|
||||||
d.l.Lock()
|
)
|
||||||
d.children = children
|
if err != nil {
|
||||||
d.l.Unlock()
|
return fmt.Errorf("storing global bucket creds: %w", err)
|
||||||
|
|
||||||
if !d.postInit(ctx) {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
d.l.Lock()
|
|
||||||
d.state = daemonStateOk
|
|
||||||
d.l.Unlock()
|
|
||||||
|
|
||||||
ready()
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
d.logger.Info(ctx, "Updating host info in garage")
|
||||||
|
err = d.putGarageBoostrapHost(ctx, d.logger, d.currBootstrap)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("updating host info in garage: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *daemon) reloadLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(3 * time.Minute)
|
ticker := time.NewTicker(3 * time.Minute)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
@ -501,8 +437,6 @@ func (d *daemon) restartLoop(
|
|||||||
d.logger.Error(ctx, "Reloading with new host data failed", err)
|
d.logger.Error(ctx, "Reloading with new host data failed", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
ready()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -560,20 +494,12 @@ func (d *daemon) CreateNetwork(
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
readyCh := make(chan struct{}, 1)
|
// initialize will unlock d.l
|
||||||
|
err = d.initialize(ctx, hostBootstrap)
|
||||||
err = d.initialize(hostBootstrap, readyCh)
|
|
||||||
d.l.Unlock()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("initializing daemon: %w", err)
|
return fmt.Errorf("initializing daemon: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
|
||||||
case <-readyCh:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -587,26 +513,19 @@ func (d *daemon) JoinNetwork(
|
|||||||
return ErrAlreadyJoined
|
return ErrAlreadyJoined
|
||||||
}
|
}
|
||||||
|
|
||||||
readyCh := make(chan struct{}, 1)
|
|
||||||
|
|
||||||
err := secrets.Import(ctx, d.secretsStore, newBootstrap.Secrets)
|
err := secrets.Import(ctx, d.secretsStore, newBootstrap.Secrets)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.l.Unlock()
|
d.l.Unlock()
|
||||||
return fmt.Errorf("importing secrets: %w", err)
|
return fmt.Errorf("importing secrets: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = d.initialize(newBootstrap.Bootstrap, readyCh)
|
// initialize will unlock d.l
|
||||||
d.l.Unlock()
|
err = d.initialize(ctx, newBootstrap.Bootstrap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("initializing daemon: %w", err)
|
return fmt.Errorf("initializing daemon: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
return nil
|
||||||
case <-readyCh:
|
|
||||||
return nil
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *daemon) GetBootstrap(ctx context.Context) (bootstrap.Bootstrap, error) {
|
func (d *daemon) GetBootstrap(ctx context.Context) (bootstrap.Bootstrap, error) {
|
||||||
@ -763,9 +682,7 @@ func (d *daemon) Shutdown() error {
|
|||||||
d.state = daemonStateShutdown
|
d.state = daemonStateShutdown
|
||||||
|
|
||||||
if d.children != nil {
|
if d.children != nil {
|
||||||
if err := d.children.Shutdown(); err != nil {
|
d.children.Shutdown()
|
||||||
return fmt.Errorf("shutting down children: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
Reference in New Issue
Block a user