package pmuxlib import ( "bufio" "context" "errors" "fmt" "io" "os" "os/exec" "strings" "sync" "syscall" "time" ) // ProcessConfig is used to configure a process. type ProcessConfig struct { // Cmd and Args describe the actual process to run. Cmd string `yaml:"cmd"` Args []string `yaml:"args"` // Env describes the environment variables to set on the process. Env map[string]string `yaml:"env"` // Dir is the directory the process will be run in. If not set then the // process is run in the same directory as this parent process. Dir string `yaml:"dir"` // MinWait and MaxWait are the minimum and maximum amount of time between // restarts that Process will wait. // // MinWait defaults to 1 second. // MaxWait defaults to 64 seconds. MinWait time.Duration `yaml:"minWait"` MaxWait time.Duration `yaml:"maxWait"` // SigKillWait is the amount of time after the process is sent a SIGINT // before a SIGKILL is sent. // // Defalts to 10 seconds. SigKillWait time.Duration `yaml:"sigKillWait"` // NoRestartOn indicates which exit codes should result in the process not // being restarted any further. NoRestartOn []int `yaml:"noRestartOn"` // StartAfterFunc will cause the starting of the process to be blocked until // this function returns. If used with RunProcessOnce and an error is // returned, RunProcessOnce will return that error. StartAfterFunc func(context.Context) error // Group can be used to control the order and grouping of processes as they // shut down. // // Processes will not be shut down until all processes with a higher group // number are already shut down. Processes with the same group number will // be shut down simultaneously. Group int `yaml:"group"` // Where to send stdout, stderr log lines, and lines from pmux itself. StdoutLogger Logger `yaml:"-"` StderrLogger Logger `yaml:"-"` SysLogger Logger `yaml:"-"` } func (cfg ProcessConfig) withDefaults() ProcessConfig { if cfg.MinWait == 0 { cfg.MinWait = 1 * time.Second } if cfg.MaxWait == 0 { cfg.MaxWait = 64 * time.Second } if cfg.SigKillWait == 0 { cfg.SigKillWait = 10 * time.Second } return cfg } func sigProcessGroup(sysLogger Logger, proc *os.Process, sig syscall.Signal) { sysLogger.Printf("sending %v signal", sig) // Because we use Setpgid when starting child processes, child processes // will have the same PGID as their PID. To send a signal to all processes // in a group, you send the signal to the negation of the PGID, which in // this case is equivalent to -PID. // // POSIX is a fucking joke. if err := syscall.Kill(-proc.Pid, sig); err != nil { panic(fmt.Errorf( "failed to send %v signal to %d: %w", sig, -proc.Pid, err, )) } } // RunProcessOnce runs the process described by the ProcessConfig (though it // doesn't use all fields from the ProcessConfig). // // The process is killed if-and-only-if the context is canceled, returning -1 // and context.Canceled. Otherwise the exit status of the process is returned, // or -1 and an error. // // The stdout and stderr of the process will be written to the corresponding // Loggers. Various runtime events will be written to the sysLogger. func RunProcessOnce(ctx context.Context, cfg ProcessConfig) (int, error) { cfg = cfg.withDefaults() if cfg.StartAfterFunc != nil { if err := cfg.StartAfterFunc(ctx); err != nil { return -1, err } } var wg sync.WaitGroup fwdOutPipe := func(logger Logger, r io.Reader) { wg.Add(1) go func() { defer wg.Done() bufR := bufio.NewReader(r) for { line, err := bufR.ReadString('\n') if errors.Is(err, io.EOF) { return } else if err != nil { logger.Printf("reading output: %v", err) return } logger.Println(strings.TrimSuffix(line, "\n")) } }() } cmd := exec.Command(cfg.Cmd, cfg.Args...) cmd.Dir = cfg.Dir cmd.SysProcAttr = &syscall.SysProcAttr{ // Indicates that the child process should be a part of a separate // process group than the parent, so that it does not receive signals // that the parent receives. This is what ensures that context // cancellation is the only way to interrupt the child processes. Setpgid: true, } cmd.Env = os.Environ() for k, v := range cfg.Env { cmd.Env = append(cmd.Env, k+"="+v) } stdout, err := cmd.StdoutPipe() if err != nil { return -1, fmt.Errorf("getting stdout pipe: %w", err) } defer stdout.Close() stderr, err := cmd.StderrPipe() if err != nil { return -1, fmt.Errorf("getting stderr pipe: %w", err) } defer stderr.Close() fwdOutPipe(cfg.StdoutLogger, stdout) fwdOutPipe(cfg.StderrLogger, stderr) if err := cmd.Start(); err != nil { return -1, fmt.Errorf("starting process: %w", err) } stopCh := make(chan struct{}) go func(proc *os.Process) { select { case <-ctx.Done(): sigProcessGroup(cfg.SysLogger, proc, syscall.SIGINT) case <-stopCh: return } select { case <-time.After(cfg.SigKillWait): sigProcessGroup(cfg.SysLogger, proc, syscall.SIGKILL) case <-stopCh: } }(cmd.Process) wg.Wait() err = cmd.Wait() close(stopCh) if err := ctx.Err(); err != nil { return -1, err } if err != nil { return -1, fmt.Errorf("process exited: %w", err) } return cmd.ProcessState.ExitCode(), nil } // Process is used to manage a running process. Methods on a Process are _not_ // thread-safe. // // Stop must be called on a Process before the program has exited, or there may // be a leftover zombie child process. // // The process will be restarted if it exits of its own accord. There will be a // brief wait time between each restart, with an exponential backoff mechanism // so that the wait time increases upon repeated restarts. // // The stdout and stderr of the process will be written to the corresponding // Loggers. Various runtime events will be written to the sysLogger. type Process struct { cfg ProcessConfig stopFn context.CancelFunc doneCh chan struct{} } // NewProcess returns a new Process instance based on the given config. func NewProcess(cfg ProcessConfig) *Process { p := &Process{ cfg: cfg.withDefaults(), } p.run() return p } func (p *Process) run() { var ctx context.Context ctx, p.stopFn = context.WithCancel(context.Background()) p.doneCh = make(chan struct{}) go func() { defer close(p.doneCh) p.restartLoop(ctx) }() } func (p *Process) restartLoop(ctx context.Context) { var wait time.Duration for { start := time.Now() exitCode, err := RunProcessOnce(ctx, p.cfg) took := time.Since(start) // TODO check if error was due to StartAfterFunc, change the log if so. if err != nil { p.cfg.SysLogger.Printf("exited: %v", err) } else { p.cfg.SysLogger.Printf("exit code: %d", exitCode) } if err := ctx.Err(); err != nil { return } for i := range p.cfg.NoRestartOn { if p.cfg.NoRestartOn[i] == exitCode { return } } wait = ((wait * 2) - took).Truncate(time.Millisecond) if wait < p.cfg.MinWait { wait = p.cfg.MinWait } else if wait > p.cfg.MaxWait { wait = p.cfg.MaxWait } p.cfg.SysLogger.Printf("will restart process in %v", wait) select { case <-time.After(wait): case <-ctx.Done(): return } } } // Restart will block until the currently running child process has been killed // and a new one has been spawned. func (p *Process) Restart() { p.Stop() p.run() } // Stop will block until the child process has been killed. The Process should // not be used again after this. func (p *Process) Stop() { p.stopFn() <-p.doneCh }