Refactor pmuxlib to allow for restarting a single process within a set

This commit is contained in:
Brian Picciano 2024-07-19 15:35:38 +02:00
parent b1bc3a1df2
commit 7f5c354d04
4 changed files with 137 additions and 70 deletions

18
main.go
View File

@ -1,7 +1,6 @@
package main package main
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@ -29,19 +28,18 @@ func main() {
panic(fmt.Sprintf("couldn't parse cfg file: %v", err)) panic(fmt.Sprintf("couldn't parse cfg file: %v", err))
} }
ctx, cancel := context.WithCancel(context.Background()) p := pmuxlib.NewPmux(cfg, os.Stdout, os.Stderr)
defer p.Stop()
sigCh := make(chan os.Signal, 2)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
go func() { go func() {
sigCh := make(chan os.Signal, 2)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
cancel()
<-sigCh <-sigCh
fmt.Fprintln(os.Stderr, "forcefully exiting pmux process, there may be zombie child processes being left behind, good luck!") fmt.Fprintln(os.Stderr, "forcefully exiting pmux process, there may be zombie child processes being left behind, good luck!")
os.Stderr.Sync() os.Stderr.Sync()
os.Exit(1) os.Exit(1)
}() }()
pmuxlib.Run(ctx, os.Stdout, os.Stderr, cfg)
} }

View File

@ -12,7 +12,7 @@
processes: processes:
# each process must have a name and cmd. # each process must have a name and cmd.
- name: pinger pinger:
cmd: /bin/bash cmd: /bin/bash
args: args:
- "-c" - "-c"
@ -37,7 +37,7 @@ processes:
# This process will not immediately exit when pmux tells it to do so, but pmux # This process will not immediately exit when pmux tells it to do so, but pmux
# will SIGKILL it after sigKillWait has elapsed. # will SIGKILL it after sigKillWait has elapsed.
- name: stubborn-pinger stubborn-pinger:
cmd: /bin/bash cmd: /bin/bash
args: args:
- "-c" - "-c"

View File

@ -3,25 +3,30 @@
package pmuxlib package pmuxlib
import ( import (
"context" "fmt"
"io" "io"
"sync" "sync"
) )
type Config struct { type Config struct {
TimeFormat string `yaml:"timeFormat"` TimeFormat string `yaml:"timeFormat"`
Processes []ProcessConfig `yaml:"processes"`
// Set of processes to run, keyed by their name.
Processes map[string]ProcessConfig `yaml:"processes"`
} }
// Run runs the given configuration as if this was a real pmux process. It will // Pmux manages multiple child Processes. Methods on a Pmux instance are _not_
// block until the context is canceled and all child processes have been cleaned // thread-safe.
// up. //
func Run( // Stop must be called on a Pmux before the program has exited, or there may be
ctx context.Context, // a leftover zombie child process.
stdout, stderr io.Writer, type Pmux struct {
cfg Config, processes map[string]*Process
) { sysLogger Logger
}
// NewPmux starts a Pmux with the given configuration.
func NewPmux(cfg Config, stdout, stderr io.Writer) *Pmux {
stdoutLogger := newLogger(stdout, logSepStdout, cfg.TimeFormat) stdoutLogger := newLogger(stdout, logSepStdout, cfg.TimeFormat)
defer stdoutLogger.Close() defer stdoutLogger.Close()
@ -29,27 +34,51 @@ func Run(
defer stderrLogger.Close() defer stderrLogger.Close()
sysLogger := stderrLogger.withSep(logSepSys) sysLogger := stderrLogger.withSep(logSepSys)
defer sysLogger.Println("exited gracefully, ciao!")
var wg sync.WaitGroup p := &Pmux{
defer wg.Wait() processes: map[string]*Process{},
sysLogger: sysLogger,
for _, cfgProc := range cfg.Processes {
wg.Add(1)
go func(procCfg ProcessConfig) {
defer wg.Done()
stdoutLogger := stdoutLogger.withPName(procCfg.Name)
stderrLogger := stderrLogger.withPName(procCfg.Name)
sysLogger := sysLogger.withPName(procCfg.Name)
sysLogger.Println("starting process")
defer sysLogger.Println("stopped process handler")
RunProcess(
ctx, stdoutLogger, stderrLogger, sysLogger, procCfg,
)
}(cfgProc)
} }
for name, cfgProc := range cfg.Processes {
stdoutLogger := stdoutLogger.withPName(name)
stderrLogger := stderrLogger.withPName(name)
sysLogger := sysLogger.withPName(name)
p.processes[name] = NewProcess(
cfgProc, stdoutLogger, stderrLogger, sysLogger,
)
}
return p
}
// Restart will block until the child process of the given name has been killed
// and a new one has been spawned. If there is no child of the given name then
// Restart panics.
func (p *Pmux) Restart(name string) {
proc, ok := p.processes[name]
if !ok {
panic(fmt.Sprintf("no process named %q", name))
}
proc.Restart()
}
// Stop will block until all child processes have been killed. The Pmux should
// not be used again after this.
func (p *Pmux) Stop() {
var wg sync.WaitGroup
p.sysLogger.Println("killing child processes")
for _, proc := range p.processes {
proc := proc
wg.Add(1)
go func() {
defer wg.Done()
proc.Stop()
}()
}
wg.Wait()
p.sysLogger.Println("exited gracefully, ciao!")
} }

View File

@ -14,12 +14,8 @@ import (
"time" "time"
) )
// ProcessConfig is used to configure a process via RunProcess. // ProcessConfig is used to configure a process.
type ProcessConfig struct { type ProcessConfig struct {
// Name of the process to be run. This only gets used by RunPmux.
Name string
// Cmd and Args describe the actual process to run. // Cmd and Args describe the actual process to run.
Cmd string `yaml:"cmd"` Cmd string `yaml:"cmd"`
Args []string `yaml:"args"` Args []string `yaml:"args"`
@ -32,7 +28,7 @@ type ProcessConfig struct {
Dir string `yaml:"dir"` Dir string `yaml:"dir"`
// MinWait and MaxWait are the minimum and maximum amount of time between // MinWait and MaxWait are the minimum and maximum amount of time between
// restarts that RunProcess will wait. // restarts that Process will wait.
// //
// MinWait defaults to 1 second. // MinWait defaults to 1 second.
// MaxWait defaults to 64 seconds. // MaxWait defaults to 64 seconds.
@ -40,7 +36,7 @@ type ProcessConfig struct {
MaxWait time.Duration `yaml:"maxWait"` MaxWait time.Duration `yaml:"maxWait"`
// SigKillWait is the amount of time after the process is sent a SIGINT // SigKillWait is the amount of time after the process is sent a SIGINT
// before RunProcess sends it a SIGKILL. // before a SIGKILL is sent.
// //
// Defalts to 10 seconds. // Defalts to 10 seconds.
SigKillWait time.Duration `yaml:"sigKillWait"` SigKillWait time.Duration `yaml:"sigKillWait"`
@ -207,8 +203,11 @@ func RunProcessOnce(
return cmd.ProcessState.ExitCode(), nil return cmd.ProcessState.ExitCode(), nil
} }
// RunProcess runs a process (configured by ProcessConfig) until the context is // Process is used to manage a running process. Methods on a Process are _not_
// canceled, at which point the process is killed and RunProcess returns. // thread-safe.
//
// Stop must be called on a Process before the program has exited, or there may
// be a leftover zombie child process.
// //
// The process will be restarted if it exits of its own accord. There will be a // The process will be restarted if it exits of its own accord. There will be a
// brief wait time between each restart, with an exponential backoff mechanism // brief wait time between each restart, with an exponential backoff mechanism
@ -216,50 +215,77 @@ func RunProcessOnce(
// //
// The stdout and stderr of the process will be written to the corresponding // The stdout and stderr of the process will be written to the corresponding
// Loggers. Various runtime events will be written to the sysLogger. // Loggers. Various runtime events will be written to the sysLogger.
func RunProcess( type Process struct {
ctx context.Context, cfg ProcessConfig
stdoutLogger, stderrLogger, sysLogger Logger, stdoutLogger, stderrLogger, sysLogger Logger
cfg ProcessConfig,
) {
cfg = cfg.withDefaults() stopFn context.CancelFunc
doneCh chan struct{}
}
// NewProcess returns a new Process instance based on the given config.
func NewProcess(
cfg ProcessConfig, stdoutLogger, stderrLogger, sysLogger Logger,
) *Process {
p := &Process{
cfg: cfg.withDefaults(),
stdoutLogger: stdoutLogger,
stderrLogger: stderrLogger,
sysLogger: sysLogger,
}
p.run()
return p
}
func (p *Process) run() {
var ctx context.Context
ctx, p.stopFn = context.WithCancel(context.Background())
p.doneCh = make(chan struct{})
go func() {
defer close(p.doneCh)
p.restartLoop(ctx)
}()
}
func (p *Process) restartLoop(ctx context.Context) {
var wait time.Duration var wait time.Duration
for { for {
start := time.Now() start := time.Now()
exitCode, err := RunProcessOnce( exitCode, err := RunProcessOnce(
ctx, ctx,
stdoutLogger, stderrLogger, sysLogger, p.stdoutLogger, p.stderrLogger, p.sysLogger,
cfg, p.cfg,
) )
took := time.Since(start) took := time.Since(start)
// TODO check if error was due to StartAfterFunc, change the log if so.
if err != nil { if err != nil {
sysLogger.Printf("exited: %v", err) p.sysLogger.Printf("exited: %v", err)
} else { } else {
sysLogger.Printf("exit code: %d", exitCode) p.sysLogger.Printf("exit code: %d", exitCode)
} }
if err := ctx.Err(); err != nil { if err := ctx.Err(); err != nil {
return return
} }
for i := range cfg.NoRestartOn { for i := range p.cfg.NoRestartOn {
if cfg.NoRestartOn[i] == exitCode { if p.cfg.NoRestartOn[i] == exitCode {
return return
} }
} }
wait = ((wait * 2) - took).Truncate(time.Millisecond) wait = ((wait * 2) - took).Truncate(time.Millisecond)
if wait < cfg.MinWait { if wait < p.cfg.MinWait {
wait = cfg.MinWait wait = p.cfg.MinWait
} else if wait > cfg.MaxWait { } else if wait > p.cfg.MaxWait {
wait = cfg.MaxWait wait = p.cfg.MaxWait
} }
sysLogger.Printf("will restart process in %v", wait) p.sysLogger.Printf("will restart process in %v", wait)
select { select {
case <-time.After(wait): case <-time.After(wait):
@ -268,3 +294,17 @@ func RunProcess(
} }
} }
} }
// Restart will block until the currently running child process has been killed
// and a new one has been spawned.
func (p *Process) Restart() {
p.Stop()
p.run()
}
// Stop will block until the child process has been killed. The Process should
// not be used again after this.
func (p *Process) Stop() {
p.stopFn()
<-p.doneCh
}