From 7f5c354d046ffc569ed1502b09b5c234723a0523 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Fri, 19 Jul 2024 15:35:38 +0200 Subject: [PATCH] Refactor pmuxlib to allow for restarting a single process within a set --- main.go | 18 +++---- pmux-example.yml | 4 +- pmuxlib/pmuxlib.go | 93 +++++++++++++++++++++------------ pmuxlib/{proc.go => process.go} | 92 +++++++++++++++++++++++--------- 4 files changed, 137 insertions(+), 70 deletions(-) rename pmuxlib/{proc.go => process.go} (74%) diff --git a/main.go b/main.go index 8839685..4948520 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,6 @@ package main import ( - "context" "flag" "fmt" "io/ioutil" @@ -29,19 +28,18 @@ func main() { panic(fmt.Sprintf("couldn't parse cfg file: %v", err)) } - ctx, cancel := context.WithCancel(context.Background()) + p := pmuxlib.NewPmux(cfg, os.Stdout, os.Stderr) + defer p.Stop() + + sigCh := make(chan os.Signal, 2) + signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) + + <-sigCh + go func() { - sigCh := make(chan os.Signal, 2) - signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) - - <-sigCh - cancel() - <-sigCh fmt.Fprintln(os.Stderr, "forcefully exiting pmux process, there may be zombie child processes being left behind, good luck!") os.Stderr.Sync() os.Exit(1) }() - - pmuxlib.Run(ctx, os.Stdout, os.Stderr, cfg) } diff --git a/pmux-example.yml b/pmux-example.yml index 289804d..72fbe73 100644 --- a/pmux-example.yml +++ b/pmux-example.yml @@ -12,7 +12,7 @@ processes: # each process must have a name and cmd. - - name: pinger + pinger: cmd: /bin/bash args: - "-c" @@ -37,7 +37,7 @@ processes: # This process will not immediately exit when pmux tells it to do so, but pmux # will SIGKILL it after sigKillWait has elapsed. - - name: stubborn-pinger + stubborn-pinger: cmd: /bin/bash args: - "-c" diff --git a/pmuxlib/pmuxlib.go b/pmuxlib/pmuxlib.go index 6d5dfaa..9081277 100644 --- a/pmuxlib/pmuxlib.go +++ b/pmuxlib/pmuxlib.go @@ -3,25 +3,30 @@ package pmuxlib import ( - "context" + "fmt" "io" "sync" ) type Config struct { - TimeFormat string `yaml:"timeFormat"` - Processes []ProcessConfig `yaml:"processes"` + TimeFormat string `yaml:"timeFormat"` + + // Set of processes to run, keyed by their name. + Processes map[string]ProcessConfig `yaml:"processes"` } -// Run runs the given configuration as if this was a real pmux process. It will -// block until the context is canceled and all child processes have been cleaned -// up. -func Run( - ctx context.Context, - stdout, stderr io.Writer, - cfg Config, -) { +// Pmux manages multiple child Processes. Methods on a Pmux instance are _not_ +// thread-safe. +// +// Stop must be called on a Pmux before the program has exited, or there may be +// a leftover zombie child process. +type Pmux struct { + processes map[string]*Process + sysLogger Logger +} +// NewPmux starts a Pmux with the given configuration. +func NewPmux(cfg Config, stdout, stderr io.Writer) *Pmux { stdoutLogger := newLogger(stdout, logSepStdout, cfg.TimeFormat) defer stdoutLogger.Close() @@ -29,27 +34,51 @@ func Run( defer stderrLogger.Close() sysLogger := stderrLogger.withSep(logSepSys) - defer sysLogger.Println("exited gracefully, ciao!") - var wg sync.WaitGroup - defer wg.Wait() - - for _, cfgProc := range cfg.Processes { - wg.Add(1) - go func(procCfg ProcessConfig) { - defer wg.Done() - - stdoutLogger := stdoutLogger.withPName(procCfg.Name) - stderrLogger := stderrLogger.withPName(procCfg.Name) - sysLogger := sysLogger.withPName(procCfg.Name) - - sysLogger.Println("starting process") - defer sysLogger.Println("stopped process handler") - - RunProcess( - ctx, stdoutLogger, stderrLogger, sysLogger, procCfg, - ) - - }(cfgProc) + p := &Pmux{ + processes: map[string]*Process{}, + sysLogger: sysLogger, } + + for name, cfgProc := range cfg.Processes { + stdoutLogger := stdoutLogger.withPName(name) + stderrLogger := stderrLogger.withPName(name) + sysLogger := sysLogger.withPName(name) + p.processes[name] = NewProcess( + cfgProc, stdoutLogger, stderrLogger, sysLogger, + ) + } + + return p +} + +// Restart will block until the child process of the given name has been killed +// and a new one has been spawned. If there is no child of the given name then +// Restart panics. +func (p *Pmux) Restart(name string) { + proc, ok := p.processes[name] + if !ok { + panic(fmt.Sprintf("no process named %q", name)) + } + proc.Restart() +} + +// Stop will block until all child processes have been killed. The Pmux should +// not be used again after this. +func (p *Pmux) Stop() { + var wg sync.WaitGroup + + p.sysLogger.Println("killing child processes") + for _, proc := range p.processes { + proc := proc + + wg.Add(1) + go func() { + defer wg.Done() + proc.Stop() + }() + } + + wg.Wait() + p.sysLogger.Println("exited gracefully, ciao!") } diff --git a/pmuxlib/proc.go b/pmuxlib/process.go similarity index 74% rename from pmuxlib/proc.go rename to pmuxlib/process.go index 2e91d9d..760ba07 100644 --- a/pmuxlib/proc.go +++ b/pmuxlib/process.go @@ -14,12 +14,8 @@ import ( "time" ) -// ProcessConfig is used to configure a process via RunProcess. +// ProcessConfig is used to configure a process. type ProcessConfig struct { - - // Name of the process to be run. This only gets used by RunPmux. - Name string - // Cmd and Args describe the actual process to run. Cmd string `yaml:"cmd"` Args []string `yaml:"args"` @@ -32,7 +28,7 @@ type ProcessConfig struct { Dir string `yaml:"dir"` // MinWait and MaxWait are the minimum and maximum amount of time between - // restarts that RunProcess will wait. + // restarts that Process will wait. // // MinWait defaults to 1 second. // MaxWait defaults to 64 seconds. @@ -40,7 +36,7 @@ type ProcessConfig struct { MaxWait time.Duration `yaml:"maxWait"` // SigKillWait is the amount of time after the process is sent a SIGINT - // before RunProcess sends it a SIGKILL. + // before a SIGKILL is sent. // // Defalts to 10 seconds. SigKillWait time.Duration `yaml:"sigKillWait"` @@ -207,8 +203,11 @@ func RunProcessOnce( return cmd.ProcessState.ExitCode(), nil } -// RunProcess runs a process (configured by ProcessConfig) until the context is -// canceled, at which point the process is killed and RunProcess returns. +// Process is used to manage a running process. Methods on a Process are _not_ +// thread-safe. +// +// Stop must be called on a Process before the program has exited, or there may +// be a leftover zombie child process. // // The process will be restarted if it exits of its own accord. There will be a // brief wait time between each restart, with an exponential backoff mechanism @@ -216,50 +215,77 @@ func RunProcessOnce( // // The stdout and stderr of the process will be written to the corresponding // Loggers. Various runtime events will be written to the sysLogger. -func RunProcess( - ctx context.Context, - stdoutLogger, stderrLogger, sysLogger Logger, - cfg ProcessConfig, -) { +type Process struct { + cfg ProcessConfig + stdoutLogger, stderrLogger, sysLogger Logger - cfg = cfg.withDefaults() + stopFn context.CancelFunc + doneCh chan struct{} +} +// NewProcess returns a new Process instance based on the given config. +func NewProcess( + cfg ProcessConfig, stdoutLogger, stderrLogger, sysLogger Logger, +) *Process { + p := &Process{ + cfg: cfg.withDefaults(), + stdoutLogger: stdoutLogger, + stderrLogger: stderrLogger, + sysLogger: sysLogger, + } + p.run() + return p +} + +func (p *Process) run() { + var ctx context.Context + ctx, p.stopFn = context.WithCancel(context.Background()) + p.doneCh = make(chan struct{}) + + go func() { + defer close(p.doneCh) + p.restartLoop(ctx) + }() +} + +func (p *Process) restartLoop(ctx context.Context) { var wait time.Duration for { start := time.Now() exitCode, err := RunProcessOnce( ctx, - stdoutLogger, stderrLogger, sysLogger, - cfg, + p.stdoutLogger, p.stderrLogger, p.sysLogger, + p.cfg, ) took := time.Since(start) + // TODO check if error was due to StartAfterFunc, change the log if so. if err != nil { - sysLogger.Printf("exited: %v", err) + p.sysLogger.Printf("exited: %v", err) } else { - sysLogger.Printf("exit code: %d", exitCode) + p.sysLogger.Printf("exit code: %d", exitCode) } if err := ctx.Err(); err != nil { return } - for i := range cfg.NoRestartOn { - if cfg.NoRestartOn[i] == exitCode { + for i := range p.cfg.NoRestartOn { + if p.cfg.NoRestartOn[i] == exitCode { return } } wait = ((wait * 2) - took).Truncate(time.Millisecond) - if wait < cfg.MinWait { - wait = cfg.MinWait - } else if wait > cfg.MaxWait { - wait = cfg.MaxWait + if wait < p.cfg.MinWait { + wait = p.cfg.MinWait + } else if wait > p.cfg.MaxWait { + wait = p.cfg.MaxWait } - sysLogger.Printf("will restart process in %v", wait) + p.sysLogger.Printf("will restart process in %v", wait) select { case <-time.After(wait): @@ -268,3 +294,17 @@ func RunProcess( } } } + +// Restart will block until the currently running child process has been killed +// and a new one has been spawned. +func (p *Process) Restart() { + p.Stop() + p.run() +} + +// Stop will block until the child process has been killed. The Process should +// not be used again after this. +func (p *Process) Stop() { + p.stopFn() + <-p.doneCh +}